reth_network/transactions/
mod.rs

1//! Transactions management for the p2p network.
2
3use alloy_consensus::transaction::TxHashRef;
4
5/// Aggregation on configurable parameters for [`TransactionsManager`].
6pub mod config;
7/// Default and spec'd bounds.
8pub mod constants;
9/// Component responsible for fetching transactions from [`NewPooledTransactionHashes`].
10pub mod fetcher;
11/// Defines the traits for transaction-related policies.
12pub mod policy;
13
14pub use self::constants::{
15    tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
16    SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
17};
18use config::AnnouncementAcceptance;
19pub use config::{
20    AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionIngressPolicy,
21    TransactionPropagationMode, TransactionPropagationPolicy, TransactionsManagerConfig,
22};
23use policy::NetworkPolicies;
24
25pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
26
27use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
28use crate::{
29    budget::{
30        DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
31        DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_STREAM,
32    },
33    cache::LruCache,
34    duration_metered_exec, metered_poll_nested_stream_with_budget,
35    metrics::{
36        AnnouncedTxTypesMetrics, TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
37    },
38    transactions::config::{StrictEthAnnouncementFilter, TransactionPropagationKind},
39    NetworkHandle, TxTypesCounter,
40};
41use alloy_primitives::{TxHash, B256};
42use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
43use futures::{stream::FuturesUnordered, Future, StreamExt};
44use reth_eth_wire::{
45    DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
46    HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
47    NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
48    RequestTxHashes, Transactions, ValidAnnouncementData,
49};
50use reth_ethereum_primitives::{TransactionSigned, TxType};
51use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
52use reth_network_api::{
53    events::{PeerEvent, SessionInfo},
54    NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
55};
56use reth_network_p2p::{
57    error::{RequestError, RequestResult},
58    sync::SyncStateProvider,
59};
60use reth_network_peers::PeerId;
61use reth_network_types::ReputationChangeKind;
62use reth_primitives_traits::SignedTransaction;
63use reth_tokio_util::EventStream;
64use reth_transaction_pool::{
65    error::{PoolError, PoolResult},
66    AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
67    PropagatedTransactions, TransactionPool, ValidPoolTransaction,
68};
69use std::{
70    collections::{hash_map::Entry, HashMap, HashSet},
71    pin::Pin,
72    sync::{
73        atomic::{AtomicUsize, Ordering},
74        Arc,
75    },
76    task::{Context, Poll},
77    time::{Duration, Instant},
78};
79use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
80use tokio_stream::wrappers::UnboundedReceiverStream;
81use tracing::{debug, trace};
82
83/// The future for importing transactions into the pool.
84///
85/// Resolves with the result of each transaction import.
86pub type PoolImportFuture =
87    Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
88
89/// Api to interact with [`TransactionsManager`] task.
90///
91/// This can be obtained via [`TransactionsManager::handle`] and can be used to manually interact
92/// with the [`TransactionsManager`] task once it is spawned.
93///
94/// For example [`TransactionsHandle::get_peer_transaction_hashes`] returns the transaction hashes
95/// known by a specific peer.
96#[derive(Debug, Clone)]
97pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
98    /// Command channel to the [`TransactionsManager`]
99    manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
100}
101
102impl<N: NetworkPrimitives> TransactionsHandle<N> {
103    fn send(&self, cmd: TransactionsCommand<N>) {
104        let _ = self.manager_tx.send(cmd);
105    }
106
107    /// Fetch the [`PeerRequestSender`] for the given peer.
108    async fn peer_handle(
109        &self,
110        peer_id: PeerId,
111    ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
112        let (tx, rx) = oneshot::channel();
113        self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
114        rx.await
115    }
116
117    /// Manually propagate the transaction that belongs to the hash.
118    pub fn propagate(&self, hash: TxHash) {
119        self.send(TransactionsCommand::PropagateHash(hash))
120    }
121
122    /// Manually propagate the transaction hash to a specific peer.
123    ///
124    /// Note: this only propagates if the pool contains the transaction.
125    pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
126        self.propagate_hashes_to(Some(hash), peer)
127    }
128
129    /// Manually propagate the transaction hashes to a specific peer.
130    ///
131    /// Note: this only propagates the transactions that are known to the pool.
132    pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
133        let hashes = hash.into_iter().collect::<Vec<_>>();
134        if hashes.is_empty() {
135            return
136        }
137        self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
138    }
139
140    /// Request the active peer IDs from the [`TransactionsManager`].
141    pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
142        let (tx, rx) = oneshot::channel();
143        self.send(TransactionsCommand::GetActivePeers(tx));
144        rx.await
145    }
146
147    /// Manually propagate full transaction hashes to a specific peer.
148    ///
149    /// Do nothing if transactions are empty.
150    pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
151        if transactions.is_empty() {
152            return
153        }
154        self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
155    }
156
157    /// Manually propagate the given transaction hashes to all peers.
158    ///
159    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
160    /// full.
161    pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
162        if transactions.is_empty() {
163            return
164        }
165        self.send(TransactionsCommand::PropagateTransactions(transactions))
166    }
167
168    /// Manually propagate the given transactions to all peers.
169    ///
170    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
171    /// full.
172    pub fn broadcast_transactions(
173        &self,
174        transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
175    ) {
176        let transactions =
177            transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
178        if transactions.is_empty() {
179            return
180        }
181        self.send(TransactionsCommand::BroadcastTransactions(transactions))
182    }
183
184    /// Request the transaction hashes known by specific peers.
185    pub async fn get_transaction_hashes(
186        &self,
187        peers: Vec<PeerId>,
188    ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
189        if peers.is_empty() {
190            return Ok(Default::default())
191        }
192        let (tx, rx) = oneshot::channel();
193        self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
194        rx.await
195    }
196
197    /// Request the transaction hashes known by a specific peer.
198    pub async fn get_peer_transaction_hashes(
199        &self,
200        peer: PeerId,
201    ) -> Result<HashSet<TxHash>, RecvError> {
202        let res = self.get_transaction_hashes(vec![peer]).await?;
203        Ok(res.into_values().next().unwrap_or_default())
204    }
205
206    /// Requests the transactions directly from the given peer.
207    ///
208    /// Returns `None` if the peer is not connected.
209    ///
210    /// **Note**: this returns the response from the peer as received.
211    pub async fn get_pooled_transactions_from(
212        &self,
213        peer_id: PeerId,
214        hashes: Vec<B256>,
215    ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
216        let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
217
218        let (tx, rx) = oneshot::channel();
219        let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
220        peer.try_send(request).ok();
221
222        rx.await?.map(|res| Some(res.0))
223    }
224}
225
226/// Manages transactions on top of the p2p network.
227///
228/// This can be spawned to another task and is supposed to be run as background service.
229/// [`TransactionsHandle`] can be used as frontend to programmatically send commands to it and
230/// interact with it.
231///
232/// The [`TransactionsManager`] is responsible for:
233///    - handling incoming eth messages for transactions.
234///    - serving transaction requests.
235///    - propagate transactions
236///
237/// This type communicates with the [`NetworkManager`](crate::NetworkManager) in both directions.
238///   - receives incoming network messages.
239///   - sends messages to dispatch (responses, propagate tx)
240///
241/// It is directly connected to the [`TransactionPool`] to retrieve requested transactions and
242/// propagate new transactions over the network.
243///
244/// It can be configured with different policies for transaction propagation and announcement
245/// filtering. See [`NetworkPolicies`] for more details.
246///
247/// ## Network Transaction Processing
248///
249/// ### Message Types
250///
251/// - **`Transactions`**: Full transaction broadcasts (rejects blob transactions)
252/// - **`NewPooledTransactionHashes`**: Hash announcements
253///
254/// ### Peer Tracking
255///
256/// - Maintains per-peer transaction cache (default: 10,240 entries)
257/// - Prevents duplicate imports and enables efficient propagation
258///
259/// ### Bad Transaction Handling
260///
261/// Caches and rejects transactions with consensus violations (gas, signature, chain ID).
262/// Penalizes peers sending invalid transactions.
263///
264/// ### Import Management
265///
266/// Limits concurrent pool imports and backs off when approaching capacity.
267///
268/// ### Transaction Fetching
269///
270/// For announced transactions: filters known → queues unknown → fetches → imports
271///
272/// ### Propagation Rules
273///
274/// Based on: origin (Local/External/Private), peer capabilities, and network state.
275/// Disabled during initial sync.
276///
277/// ### Security
278///
279/// Rate limiting via reputation, bad transaction isolation, peer scoring.
280#[derive(Debug)]
281#[must_use = "Manager does nothing unless polled."]
282pub struct TransactionsManager<Pool, N: NetworkPrimitives = EthNetworkPrimitives> {
283    /// Access to the transaction pool.
284    pool: Pool,
285    /// Network access.
286    network: NetworkHandle<N>,
287    /// Subscriptions to all network related events.
288    ///
289    /// From which we get all new incoming transaction related messages.
290    network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
291    /// Transaction fetcher to handle inflight and missing transaction requests.
292    transaction_fetcher: TransactionFetcher<N>,
293    /// All currently pending transactions grouped by peers.
294    ///
295    /// This way we can track incoming transactions and prevent multiple pool imports for the same
296    /// transaction
297    transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
298    /// Transactions that are currently imported into the `Pool`.
299    ///
300    /// The import process includes:
301    ///  - validation of the transactions, e.g. transaction is well formed: valid tx type, fees are
302    ///    valid, or for 4844 transaction the blobs are valid. See also
303    ///    [`EthTransactionValidator`](reth_transaction_pool::validate::EthTransactionValidator)
304    /// - if the transaction is valid, it is added into the pool.
305    ///
306    /// Once the new transaction reaches the __pending__ state it will be emitted by the pool via
307    /// [`TransactionPool::pending_transactions_listener`] and arrive at the `pending_transactions`
308    /// receiver.
309    pool_imports: FuturesUnordered<PoolImportFuture>,
310    /// Stats on pending pool imports that help the node self-monitor.
311    pending_pool_imports_info: PendingPoolImportsInfo,
312    /// Bad imports.
313    bad_imports: LruCache<TxHash>,
314    /// All the connected peers.
315    peers: HashMap<PeerId, PeerMetadata<N>>,
316    /// Send half for the command channel.
317    ///
318    /// This is kept so that a new [`TransactionsHandle`] can be created at any time.
319    command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
320    /// Incoming commands from [`TransactionsHandle`].
321    ///
322    /// This will only receive commands if a user manually sends a command to the manager through
323    /// the [`TransactionsHandle`] to interact with this type directly.
324    command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
325    /// A stream that yields new __pending__ transactions.
326    ///
327    /// A transaction is considered __pending__ if it is executable on the current state of the
328    /// chain. In other words, this only yields transactions that satisfy all consensus
329    /// requirements, these include:
330    ///   - no nonce gaps
331    ///   - all dynamic fee requirements are (currently) met
332    ///   - account has enough balance to cover the transaction's gas
333    pending_transactions: mpsc::Receiver<TxHash>,
334    /// Incoming events from the [`NetworkManager`](crate::NetworkManager).
335    transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
336    /// How the `TransactionsManager` is configured.
337    config: TransactionsManagerConfig,
338    /// Network Policies
339    policies: NetworkPolicies<N>,
340    /// `TransactionsManager` metrics
341    metrics: TransactionsManagerMetrics,
342    /// `AnnouncedTxTypes` metrics
343    announced_tx_types_metrics: AnnouncedTxTypesMetrics,
344}
345
346impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
347    /// Sets up a new instance.
348    ///
349    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
350    pub fn new(
351        network: NetworkHandle<N>,
352        pool: Pool,
353        from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
354        transactions_manager_config: TransactionsManagerConfig,
355    ) -> Self {
356        Self::with_policy(
357            network,
358            pool,
359            from_network,
360            transactions_manager_config,
361            NetworkPolicies::new(
362                TransactionPropagationKind::default(),
363                StrictEthAnnouncementFilter::default(),
364            ),
365        )
366    }
367}
368
369impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
370    /// Sets up a new instance with given the settings.
371    ///
372    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
373    pub fn with_policy(
374        network: NetworkHandle<N>,
375        pool: Pool,
376        from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
377        transactions_manager_config: TransactionsManagerConfig,
378        policies: NetworkPolicies<N>,
379    ) -> Self {
380        let network_events = network.event_listener();
381
382        let (command_tx, command_rx) = mpsc::unbounded_channel();
383
384        let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
385            &transactions_manager_config.transaction_fetcher_config,
386        );
387
388        // install a listener for new __pending__ transactions that are allowed to be propagated
389        // over the network
390        let pending = pool.pending_transactions_listener();
391        let pending_pool_imports_info = PendingPoolImportsInfo::default();
392        let metrics = TransactionsManagerMetrics::default();
393        metrics
394            .capacity_pending_pool_imports
395            .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
396
397        Self {
398            pool,
399            network,
400            network_events,
401            transaction_fetcher,
402            transactions_by_peers: Default::default(),
403            pool_imports: Default::default(),
404            pending_pool_imports_info: PendingPoolImportsInfo::new(
405                DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS,
406            ),
407            bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
408            peers: Default::default(),
409            command_tx,
410            command_rx: UnboundedReceiverStream::new(command_rx),
411            pending_transactions: pending,
412            transaction_events: UnboundedMeteredReceiver::new(
413                from_network,
414                NETWORK_POOL_TRANSACTIONS_SCOPE,
415            ),
416            config: transactions_manager_config,
417            policies,
418            metrics,
419            announced_tx_types_metrics: AnnouncedTxTypesMetrics::default(),
420        }
421    }
422
423    /// Returns a new handle that can send commands to this type.
424    pub fn handle(&self) -> TransactionsHandle<N> {
425        TransactionsHandle { manager_tx: self.command_tx.clone() }
426    }
427
428    /// Returns `true` if [`TransactionsManager`] has capacity to request pending hashes. Returns
429    /// `false` if [`TransactionsManager`] is operating close to full capacity.
430    fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
431        self.pending_pool_imports_info
432            .has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) &&
433            self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
434    }
435
436    fn report_peer_bad_transactions(&self, peer_id: PeerId) {
437        self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
438        self.metrics.reported_bad_transactions.increment(1);
439    }
440
441    fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
442        trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
443        self.network.reputation_change(peer_id, kind);
444    }
445
446    fn report_already_seen(&self, peer_id: PeerId) {
447        trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
448        self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
449    }
450
451    /// Clear the transaction
452    fn on_good_import(&mut self, hash: TxHash) {
453        self.transactions_by_peers.remove(&hash);
454    }
455
456    /// Penalize the peers that intentionally sent the bad transaction, and cache it to avoid
457    /// fetching or importing it again.
458    ///
459    /// Errors that count as bad transactions are:
460    ///
461    /// - intrinsic gas too low
462    /// - exceeds gas limit
463    /// - gas uint overflow
464    /// - exceeds max init code size
465    /// - oversized data
466    /// - signer account has bytecode
467    /// - chain id mismatch
468    /// - old legacy chain id
469    /// - tx type not supported
470    ///
471    /// (and additionally for blobs txns...)
472    ///
473    /// - no blobs
474    /// - too many blobs
475    /// - invalid kzg proof
476    /// - kzg error
477    /// - not blob transaction (tx type mismatch)
478    /// - wrong versioned kzg commitment hash
479    fn on_bad_import(&mut self, err: PoolError) {
480        let peers = self.transactions_by_peers.remove(&err.hash);
481
482        // if we're _currently_ syncing, we ignore a bad transaction
483        if !err.is_bad_transaction() || self.network.is_syncing() {
484            return
485        }
486        // otherwise we penalize the peer that sent the bad transaction, with the assumption that
487        // the peer should have known that this transaction is bad (e.g. violating consensus rules)
488        if let Some(peers) = peers {
489            for peer_id in peers {
490                self.report_peer_bad_transactions(peer_id);
491            }
492        }
493        self.metrics.bad_imports.increment(1);
494        self.bad_imports.insert(err.hash);
495    }
496
497    /// Runs an operation to fetch hashes that are cached in [`TransactionFetcher`].
498    fn on_fetch_hashes_pending_fetch(&mut self) {
499        // try drain transaction hashes pending fetch
500        let info = &self.pending_pool_imports_info;
501        let max_pending_pool_imports = info.max_pending_pool_imports;
502        let has_capacity_wrt_pending_pool_imports =
503            |divisor| info.has_capacity(max_pending_pool_imports / divisor);
504
505        self.transaction_fetcher
506            .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports);
507    }
508
509    fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
510        let kind = match req_err {
511            RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
512            RequestError::Timeout => ReputationChangeKind::Timeout,
513            RequestError::ChannelClosed | RequestError::ConnectionDropped => {
514                // peer is already disconnected
515                return
516            }
517            RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
518        };
519        self.report_peer(peer_id, kind);
520    }
521
522    #[inline]
523    fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
524        let metrics = &self.metrics;
525
526        let TxManagerPollDurations {
527            acc_network_events,
528            acc_pending_imports,
529            acc_tx_events,
530            acc_imported_txns,
531            acc_fetch_events,
532            acc_pending_fetch,
533            acc_cmds,
534        } = poll_durations;
535
536        // update metrics for whole poll function
537        metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
538        // update metrics for nested expressions
539        metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
540        metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
541        metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
542        metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
543        metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
544        metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
545        metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
546    }
547}
548
549impl<Pool: TransactionPool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
550    /// Processes a batch import results.
551    fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
552        for res in batch_results {
553            match res {
554                Ok(AddedTransactionOutcome { hash, .. }) => {
555                    self.on_good_import(hash);
556                }
557                Err(err) => {
558                    self.on_bad_import(err);
559                }
560            }
561        }
562    }
563
564    /// Request handler for an incoming `NewPooledTransactionHashes`
565    fn on_new_pooled_transaction_hashes(
566        &mut self,
567        peer_id: PeerId,
568        msg: NewPooledTransactionHashes,
569    ) {
570        // If the node is initially syncing, ignore transactions
571        if self.network.is_initially_syncing() {
572            return
573        }
574        if self.network.tx_gossip_disabled() {
575            return
576        }
577
578        // get handle to peer's session, if the session is still active
579        let Some(peer) = self.peers.get_mut(&peer_id) else {
580            trace!(
581                peer_id = format!("{peer_id:#}"),
582                ?msg,
583                "discarding announcement from inactive peer"
584            );
585
586            return
587        };
588        let client = peer.client_version.clone();
589
590        // keep track of the transactions the peer knows
591        let mut count_txns_already_seen_by_peer = 0;
592        for tx in msg.iter_hashes().copied() {
593            if !peer.seen_transactions.insert(tx) {
594                count_txns_already_seen_by_peer += 1;
595            }
596        }
597        if count_txns_already_seen_by_peer > 0 {
598            // this may occur if transactions are sent or announced to a peer, at the same time as
599            // the peer sends/announces those hashes to us. this is because, marking
600            // txns as seen by a peer is done optimistically upon sending them to the
601            // peer.
602            self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
603            self.metrics
604                .occurrences_hash_already_seen_by_peer
605                .increment(count_txns_already_seen_by_peer);
606
607            trace!(target: "net::tx",
608                %count_txns_already_seen_by_peer,
609                peer_id=format!("{peer_id:#}"),
610                ?client,
611                "Peer sent hashes that have already been marked as seen by peer"
612            );
613
614            self.report_already_seen(peer_id);
615        }
616
617        // 1. filter out spam
618        if msg.is_empty() {
619            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
620            return;
621        }
622
623        let original_len = msg.len();
624        let mut partially_valid_msg = msg.dedup();
625
626        if partially_valid_msg.len() != original_len {
627            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
628        }
629
630        // 2. filter out transactions pending import to pool
631        partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
632
633        // 3. filter out known hashes
634        //
635        // known txns have already been successfully fetched or received over gossip.
636        //
637        // most hashes will be filtered out here since this the mempool protocol is a gossip
638        // protocol, healthy peers will send many of the same hashes.
639        //
640        let hashes_count_pre_pool_filter = partially_valid_msg.len();
641        self.pool.retain_unknown(&mut partially_valid_msg);
642        if hashes_count_pre_pool_filter > partially_valid_msg.len() {
643            let already_known_hashes_count =
644                hashes_count_pre_pool_filter - partially_valid_msg.len();
645            self.metrics
646                .occurrences_hashes_already_in_pool
647                .increment(already_known_hashes_count as u64);
648        }
649
650        if partially_valid_msg.is_empty() {
651            // nothing to request
652            return
653        }
654
655        // 4. filter out invalid entries (spam)
656        //
657        // validates messages with respect to the given network, e.g. allowed tx types
658        //
659        let mut should_report_peer = false;
660        let mut tx_types_counter = TxTypesCounter::default();
661
662        let is_eth68_message = partially_valid_msg
663            .msg_version()
664            .expect("partially valid announcement should have a version")
665            .is_eth68();
666
667        partially_valid_msg.retain(|tx_hash, metadata_ref_mut| {
668            let (ty_byte, size_val) = match *metadata_ref_mut {
669                Some((ty, size)) => {
670                    if !is_eth68_message {
671                        should_report_peer = true;
672                    }
673                    (ty, size)
674                }
675                None => {
676                    if is_eth68_message {
677                        should_report_peer = true;
678                        return false;
679                    }
680                    (0u8, 0)
681                }
682            };
683
684            if is_eth68_message &&
685                let Some((actual_ty_byte, _)) = *metadata_ref_mut &&
686                let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte)
687            {
688                tx_types_counter.increase_by_tx_type(parsed_tx_type);
689            }
690
691            let decision = self
692                .policies
693                .announcement_filter()
694                .decide_on_announcement(ty_byte, tx_hash, size_val);
695
696            match decision {
697                AnnouncementAcceptance::Accept => true,
698                AnnouncementAcceptance::Ignore => false,
699                AnnouncementAcceptance::Reject { penalize_peer } => {
700                    if penalize_peer {
701                        should_report_peer = true;
702                    }
703                    false
704                }
705            }
706        });
707
708        if is_eth68_message {
709            self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter);
710        }
711
712        if should_report_peer {
713            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
714        }
715
716        let mut valid_announcement_data =
717            ValidAnnouncementData::from_partially_valid_data(partially_valid_msg);
718
719        if valid_announcement_data.is_empty() {
720            // no valid announcement data
721            return
722        }
723
724        // 5. filter out already seen unknown hashes
725        //
726        // seen hashes are already in the tx fetcher, pending fetch.
727        //
728        // for any seen hashes add the peer as fallback. unseen hashes are loaded into the tx
729        // fetcher, hence they should be valid at this point.
730        let bad_imports = &self.bad_imports;
731        self.transaction_fetcher.filter_unseen_and_pending_hashes(
732            &mut valid_announcement_data,
733            |hash| bad_imports.contains(hash),
734            &peer_id,
735            &client,
736        );
737
738        if valid_announcement_data.is_empty() {
739            // nothing to request
740            return
741        }
742
743        trace!(target: "net::tx::propagation",
744            peer_id=format!("{peer_id:#}"),
745            hashes_len=valid_announcement_data.len(),
746            hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
747            msg_version=%valid_announcement_data.msg_version(),
748            client_version=%client,
749            "received previously unseen and pending hashes in announcement from peer"
750        );
751
752        // only send request for hashes to idle peer, otherwise buffer hashes storing peer as
753        // fallback
754        if !self.transaction_fetcher.is_idle(&peer_id) {
755            // load message version before announcement data is destructed in packing
756            let msg_version = valid_announcement_data.msg_version();
757            let (hashes, _version) = valid_announcement_data.into_request_hashes();
758
759            trace!(target: "net::tx",
760                peer_id=format!("{peer_id:#}"),
761                hashes=?*hashes,
762                %msg_version,
763                %client,
764                "buffering hashes announced by busy peer"
765            );
766
767            self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
768
769            return
770        }
771
772        let mut hashes_to_request =
773            RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
774        let surplus_hashes =
775            self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
776
777        if !surplus_hashes.is_empty() {
778            trace!(target: "net::tx",
779                peer_id=format!("{peer_id:#}"),
780                surplus_hashes=?*surplus_hashes,
781                %client,
782                "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
783            );
784
785            self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
786        }
787
788        trace!(target: "net::tx",
789            peer_id=format!("{peer_id:#}"),
790            hashes=?*hashes_to_request,
791            %client,
792            "sending hashes in `GetPooledTransactions` request to peer's session"
793        );
794
795        // request the missing transactions
796        //
797        // get handle to peer's session again, at this point we know it exists
798        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
799        if let Some(failed_to_request_hashes) =
800            self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
801        {
802            let conn_eth_version = peer.version;
803
804            trace!(target: "net::tx",
805                peer_id=format!("{peer_id:#}"),
806                failed_to_request_hashes=?*failed_to_request_hashes,
807                %conn_eth_version,
808                %client,
809                "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
810            );
811            self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
812        }
813    }
814}
815
816impl<Pool, N> TransactionsManager<Pool, N>
817where
818    Pool: TransactionPool + Unpin + 'static,
819    N: NetworkPrimitives<
820            BroadcastedTransaction: SignedTransaction,
821            PooledTransaction: SignedTransaction,
822        > + Unpin,
823    Pool::Transaction:
824        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
825{
826    /// Invoked when transactions in the local mempool are considered __pending__.
827    ///
828    /// When a transaction in the local mempool is moved to the pending pool, we propagate them to
829    /// connected peers over network using the `Transactions` and `NewPooledTransactionHashes`
830    /// messages. The Transactions message relays complete transaction objects and is typically
831    /// sent to a small, random fraction of connected peers.
832    ///
833    /// All other peers receive a notification of the transaction hash and can request the
834    /// complete transaction object if it is unknown to them. The dissemination of complete
835    /// transactions to a fraction of peers usually ensures that all nodes receive the transaction
836    /// and won't need to request it.
837    fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
838        // Nothing to propagate while initially syncing
839        if self.network.is_initially_syncing() {
840            return
841        }
842        if self.network.tx_gossip_disabled() {
843            return
844        }
845
846        trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
847
848        self.propagate_all(hashes);
849    }
850
851    /// Propagate the full transactions to a specific peer.
852    ///
853    /// Returns the propagated transactions.
854    fn propagate_full_transactions_to_peer(
855        &mut self,
856        txs: Vec<TxHash>,
857        peer_id: PeerId,
858        propagation_mode: PropagationMode,
859    ) -> Option<PropagatedTransactions> {
860        trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
861
862        let peer = self.peers.get_mut(&peer_id)?;
863        let mut propagated = PropagatedTransactions::default();
864
865        // filter all transactions unknown to the peer
866        let mut full_transactions = FullTransactionsBuilder::new(peer.version);
867
868        let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
869
870        if propagation_mode.is_forced() {
871            // skip cache check if forced
872            full_transactions.extend(to_propagate);
873        } else {
874            // Iterate through the transactions to propagate and fill the hashes and full
875            // transaction
876            for tx in to_propagate {
877                if !peer.seen_transactions.contains(tx.tx_hash()) {
878                    // Only include if the peer hasn't seen the transaction
879                    full_transactions.push(&tx);
880                }
881            }
882        }
883
884        if full_transactions.is_empty() {
885            // nothing to propagate
886            return None
887        }
888
889        let PropagateTransactions { pooled, full } = full_transactions.build();
890
891        // send hashes if any
892        if let Some(new_pooled_hashes) = pooled {
893            for hash in new_pooled_hashes.iter_hashes().copied() {
894                propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
895                // mark transaction as seen by peer
896                peer.seen_transactions.insert(hash);
897            }
898
899            // send hashes of transactions
900            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
901        }
902
903        // send full transactions, if any
904        if let Some(new_full_transactions) = full {
905            for tx in &new_full_transactions {
906                propagated.0.entry(*tx.tx_hash()).or_default().push(PropagateKind::Full(peer_id));
907                // mark transaction as seen by peer
908                peer.seen_transactions.insert(*tx.tx_hash());
909            }
910
911            // send full transactions
912            self.network.send_transactions(peer_id, new_full_transactions);
913        }
914
915        // Update propagated transactions metrics
916        self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
917
918        Some(propagated)
919    }
920
921    /// Propagate the transaction hashes to the given peer
922    ///
923    /// Note: This will only send the hashes for transactions that exist in the pool.
924    fn propagate_hashes_to(
925        &mut self,
926        hashes: Vec<TxHash>,
927        peer_id: PeerId,
928        propagation_mode: PropagationMode,
929    ) {
930        trace!(target: "net::tx", "Start propagating transactions as hashes");
931
932        // This fetches a transactions from the pool, including the blob transactions, which are
933        // only ever sent as hashes.
934        let propagated = {
935            let Some(peer) = self.peers.get_mut(&peer_id) else {
936                // no such peer
937                return
938            };
939
940            let to_propagate = self
941                .pool
942                .get_all(hashes)
943                .into_iter()
944                .map(PropagateTransaction::pool_tx)
945                .collect::<Vec<_>>();
946
947            let mut propagated = PropagatedTransactions::default();
948
949            // check if transaction is known to peer
950            let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
951
952            if propagation_mode.is_forced() {
953                hashes.extend(to_propagate)
954            } else {
955                for tx in to_propagate {
956                    if !peer.seen_transactions.contains(tx.tx_hash()) {
957                        // Include if the peer hasn't seen it
958                        hashes.push(&tx);
959                    }
960                }
961            }
962
963            let new_pooled_hashes = hashes.build();
964
965            if new_pooled_hashes.is_empty() {
966                // nothing to propagate
967                return
968            }
969
970            for hash in new_pooled_hashes.iter_hashes().copied() {
971                propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
972            }
973
974            trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
975
976            // send hashes of transactions
977            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
978
979            // Update propagated transactions metrics
980            self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
981
982            propagated
983        };
984
985        // notify pool so events get fired
986        self.pool.on_propagated(propagated);
987    }
988
989    /// Propagate the transactions to all connected peers either as full objects or hashes.
990    ///
991    /// The message for new pooled hashes depends on the negotiated version of the stream.
992    /// See [`NewPooledTransactionHashes`]
993    ///
994    /// Note: EIP-4844 are disallowed from being broadcast in full and are only ever sent as hashes, see also <https://eips.ethereum.org/EIPS/eip-4844#networking>.
995    fn propagate_transactions(
996        &mut self,
997        to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
998        propagation_mode: PropagationMode,
999    ) -> PropagatedTransactions {
1000        let mut propagated = PropagatedTransactions::default();
1001        if self.network.tx_gossip_disabled() {
1002            return propagated
1003        }
1004
1005        // send full transactions to a set of the connected peers based on the configured mode
1006        let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
1007
1008        // Note: Assuming ~random~ order due to random state of the peers map hasher
1009        for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
1010            if !self.policies.propagation_policy().can_propagate(peer) {
1011                // skip peers we should not propagate to
1012                continue
1013            }
1014            // determine whether to send full tx objects or hashes.
1015            let mut builder = if peer_idx > max_num_full {
1016                PropagateTransactionsBuilder::pooled(peer.version)
1017            } else {
1018                PropagateTransactionsBuilder::full(peer.version)
1019            };
1020
1021            if propagation_mode.is_forced() {
1022                builder.extend(to_propagate.iter());
1023            } else {
1024                // Iterate through the transactions to propagate and fill the hashes and full
1025                // transaction lists, before deciding whether or not to send full transactions to
1026                // the peer.
1027                for tx in &to_propagate {
1028                    // Only proceed if the transaction is not in the peer's list of seen
1029                    // transactions
1030                    if !peer.seen_transactions.contains(tx.tx_hash()) {
1031                        builder.push(tx);
1032                    }
1033                }
1034            }
1035
1036            if builder.is_empty() {
1037                trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
1038                continue
1039            }
1040
1041            let PropagateTransactions { pooled, full } = builder.build();
1042
1043            // send hashes if any
1044            if let Some(mut new_pooled_hashes) = pooled {
1045                // enforce tx soft limit per message for the (unlikely) event the number of
1046                // hashes exceeds it
1047                new_pooled_hashes
1048                    .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
1049
1050                for hash in new_pooled_hashes.iter_hashes().copied() {
1051                    propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
1052                    // mark transaction as seen by peer
1053                    peer.seen_transactions.insert(hash);
1054                }
1055
1056                trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
1057
1058                // send hashes of transactions
1059                self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
1060            }
1061
1062            // send full transactions, if any
1063            if let Some(new_full_transactions) = full {
1064                for tx in &new_full_transactions {
1065                    propagated
1066                        .0
1067                        .entry(*tx.tx_hash())
1068                        .or_default()
1069                        .push(PropagateKind::Full(*peer_id));
1070                    // mark transaction as seen by peer
1071                    peer.seen_transactions.insert(*tx.tx_hash());
1072                }
1073
1074                trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
1075
1076                // send full transactions
1077                self.network.send_transactions(*peer_id, new_full_transactions);
1078            }
1079        }
1080
1081        // Update propagated transactions metrics
1082        self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
1083
1084        propagated
1085    }
1086
1087    /// Propagates the given transactions to the peers
1088    ///
1089    /// This fetches all transaction from the pool, including the 4844 blob transactions but
1090    /// __without__ their sidecar, because 4844 transactions are only ever announced as hashes.
1091    fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1092        if self.peers.is_empty() {
1093            // nothing to propagate
1094            return
1095        }
1096        let propagated = self.propagate_transactions(
1097            self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1098            PropagationMode::Basic,
1099        );
1100
1101        // notify pool so events get fired
1102        self.pool.on_propagated(propagated);
1103    }
1104
1105    /// Request handler for an incoming request for transactions
1106    fn on_get_pooled_transactions(
1107        &mut self,
1108        peer_id: PeerId,
1109        request: GetPooledTransactions,
1110        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1111    ) {
1112        if let Some(peer) = self.peers.get_mut(&peer_id) {
1113            if self.network.tx_gossip_disabled() {
1114                let _ = response.send(Ok(PooledTransactions::default()));
1115                return
1116            }
1117            let transactions = self.pool.get_pooled_transaction_elements(
1118                request.0,
1119                GetPooledTransactionLimit::ResponseSizeSoftLimit(
1120                    self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1121                ),
1122            );
1123            trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1124
1125            // we sent a response at which point we assume that the peer is aware of the
1126            // transactions
1127            peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1128
1129            let resp = PooledTransactions(transactions);
1130            let _ = response.send(Ok(resp));
1131        }
1132    }
1133
1134    /// Handles a command received from a detached [`TransactionsHandle`]
1135    fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1136        match cmd {
1137            TransactionsCommand::PropagateHash(hash) => {
1138                self.on_new_pending_transactions(vec![hash])
1139            }
1140            TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1141                self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1142            }
1143            TransactionsCommand::GetActivePeers(tx) => {
1144                let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1145                tx.send(peers).ok();
1146            }
1147            TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1148                if let Some(propagated) =
1149                    self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1150                {
1151                    self.pool.on_propagated(propagated);
1152                }
1153            }
1154            TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1155            TransactionsCommand::BroadcastTransactions(txs) => {
1156                let propagated = self.propagate_transactions(txs, PropagationMode::Forced);
1157                self.pool.on_propagated(propagated);
1158            }
1159            TransactionsCommand::GetTransactionHashes { peers, tx } => {
1160                let mut res = HashMap::with_capacity(peers.len());
1161                for peer_id in peers {
1162                    let hashes = self
1163                        .peers
1164                        .get(&peer_id)
1165                        .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1166                        .unwrap_or_default();
1167                    res.insert(peer_id, hashes);
1168                }
1169                tx.send(res).ok();
1170            }
1171            TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1172                let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1173                peer_request_sender.send(sender).ok();
1174            }
1175        }
1176    }
1177
1178    /// Handles session establishment and peer transactions initialization.
1179    ///
1180    /// This is invoked when a new session is established.
1181    fn handle_peer_session(
1182        &mut self,
1183        info: SessionInfo,
1184        messages: PeerRequestSender<PeerRequest<N>>,
1185    ) {
1186        let SessionInfo { peer_id, client_version, version, .. } = info;
1187
1188        // Insert a new peer into the peerset.
1189        let peer = PeerMetadata::<N>::new(
1190            messages,
1191            version,
1192            client_version,
1193            self.config.max_transactions_seen_by_peer_history,
1194            info.peer_kind,
1195        );
1196        let peer = match self.peers.entry(peer_id) {
1197            Entry::Occupied(mut entry) => {
1198                entry.insert(peer);
1199                entry.into_mut()
1200            }
1201            Entry::Vacant(entry) => entry.insert(peer),
1202        };
1203
1204        self.policies.propagation_policy_mut().on_session_established(peer);
1205
1206        // Send a `NewPooledTransactionHashes` to the peer with up to
1207        // `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE`
1208        // transactions in the pool.
1209        if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1210            trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1211            return
1212        }
1213
1214        // Get transactions to broadcast
1215        let pooled_txs = self.pool.pooled_transactions_max(
1216            SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1217        );
1218        if pooled_txs.is_empty() {
1219            trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1220            return;
1221        }
1222
1223        // Build and send transaction hashes message
1224        let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1225        for pooled_tx in pooled_txs {
1226            peer.seen_transactions.insert(*pooled_tx.hash());
1227            msg_builder.push_pooled(pooled_tx);
1228        }
1229
1230        debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.is_empty(), "Broadcasting transaction hashes");
1231        let msg = msg_builder.build();
1232        self.network.send_transactions_hashes(peer_id, msg);
1233    }
1234
1235    /// Handles a received event related to common network events.
1236    fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1237        match event_result {
1238            NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1239                // remove the peer
1240
1241                let peer = self.peers.remove(&peer_id);
1242                if let Some(mut peer) = peer {
1243                    self.policies.propagation_policy_mut().on_session_closed(&mut peer);
1244                }
1245                self.transaction_fetcher.remove_peer(&peer_id);
1246            }
1247            NetworkEvent::ActivePeerSession { info, messages } => {
1248                // process active peer session and broadcast available transaction from the pool
1249                self.handle_peer_session(info, messages);
1250            }
1251            NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1252                let peer_id = info.peer_id;
1253                // get messages from existing peer
1254                let messages = match self.peers.get(&peer_id) {
1255                    Some(p) => p.request_tx.clone(),
1256                    None => {
1257                        debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1258                        return;
1259                    }
1260                };
1261                self.handle_peer_session(info, messages);
1262            }
1263            _ => {}
1264        }
1265    }
1266
1267    /// Returns true if the ingress policy allows processing messages from the given peer.
1268    fn accepts_incoming_from(&self, peer_id: &PeerId) -> bool {
1269        if self.config.ingress_policy.allows_all() {
1270            return true;
1271        }
1272        let Some(peer) = self.peers.get(peer_id) else {
1273            return false;
1274        };
1275        self.config.ingress_policy.allows(peer.peer_kind())
1276    }
1277
1278    /// Handles dedicated transaction events related to the `eth` protocol.
1279    fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1280        match event {
1281            NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1282                if !self.accepts_incoming_from(&peer_id) {
1283                    trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring full transactions from peer blocked by ingress policy");
1284                    return;
1285                }
1286                // ensure we didn't receive any blob transactions as these are disallowed to be
1287                // broadcasted in full
1288
1289                let has_blob_txs = msg.has_eip4844();
1290
1291                let non_blob_txs = msg
1292                    .0
1293                    .into_iter()
1294                    .map(N::PooledTransaction::try_from)
1295                    .filter_map(Result::ok)
1296                    .collect();
1297
1298                self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1299
1300                if has_blob_txs {
1301                    debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1302                    self.report_peer_bad_transactions(peer_id);
1303                }
1304            }
1305            NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1306                if !self.accepts_incoming_from(&peer_id) {
1307                    trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), policy=?self.config.ingress_policy, "Ignoring transaction hashes from peer blocked by ingress policy");
1308                    return;
1309                }
1310                self.on_new_pooled_transaction_hashes(peer_id, msg)
1311            }
1312            NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1313                self.on_get_pooled_transactions(peer_id, request, response)
1314            }
1315            NetworkTransactionEvent::GetTransactionsHandle(response) => {
1316                let _ = response.send(Some(self.handle()));
1317            }
1318        }
1319    }
1320
1321    /// Starts the import process for the given transactions.
1322    fn import_transactions(
1323        &mut self,
1324        peer_id: PeerId,
1325        transactions: PooledTransactions<N::PooledTransaction>,
1326        source: TransactionSource,
1327    ) {
1328        // If the node is pipeline syncing, ignore transactions
1329        if self.network.is_initially_syncing() {
1330            return
1331        }
1332        if self.network.tx_gossip_disabled() {
1333            return
1334        }
1335
1336        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1337        let mut transactions = transactions.0;
1338
1339        // mark the transactions as received
1340        self.transaction_fetcher
1341            .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
1342
1343        // track that the peer knows these transaction, but only if this is a new broadcast.
1344        // If we received the transactions as the response to our `GetPooledTransactions``
1345        // requests (based on received `NewPooledTransactionHashes`) then we already
1346        // recorded the hashes as seen by this peer in `Self::on_new_pooled_transaction_hashes`.
1347        let mut num_already_seen_by_peer = 0;
1348        for tx in &transactions {
1349            if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1350                num_already_seen_by_peer += 1;
1351            }
1352        }
1353
1354        // 1. filter out txns already inserted into pool
1355        let txns_count_pre_pool_filter = transactions.len();
1356        self.pool.retain_unknown(&mut transactions);
1357        if txns_count_pre_pool_filter > transactions.len() {
1358            let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1359            self.metrics
1360                .occurrences_transactions_already_in_pool
1361                .increment(already_known_txns_count as u64);
1362        }
1363
1364        // tracks the quality of the given transactions
1365        let mut has_bad_transactions = false;
1366
1367        // 2. filter out transactions that are invalid or already pending import pre-size to avoid
1368        //    reallocations
1369        let mut new_txs = Vec::with_capacity(transactions.len());
1370        for tx in transactions {
1371            // recover transaction
1372            let tx = match tx.try_into_recovered() {
1373                Ok(tx) => tx,
1374                Err(badtx) => {
1375                    trace!(target: "net::tx",
1376                        peer_id=format!("{peer_id:#}"),
1377                        hash=%badtx.tx_hash(),
1378                        client_version=%peer.client_version,
1379                        "failed ecrecovery for transaction"
1380                    );
1381                    has_bad_transactions = true;
1382                    continue
1383                }
1384            };
1385
1386            match self.transactions_by_peers.entry(*tx.tx_hash()) {
1387                Entry::Occupied(mut entry) => {
1388                    // transaction was already inserted
1389                    entry.get_mut().insert(peer_id);
1390                }
1391                Entry::Vacant(entry) => {
1392                    if self.bad_imports.contains(tx.tx_hash()) {
1393                        trace!(target: "net::tx",
1394                            peer_id=format!("{peer_id:#}"),
1395                            hash=%tx.tx_hash(),
1396                            client_version=%peer.client_version,
1397                            "received a known bad transaction from peer"
1398                        );
1399                        has_bad_transactions = true;
1400                    } else {
1401                        // this is a new transaction that should be imported into the pool
1402
1403                        let pool_transaction = Pool::Transaction::from_pooled(tx);
1404                        new_txs.push(pool_transaction);
1405
1406                        entry.insert(HashSet::from([peer_id]));
1407                    }
1408                }
1409            }
1410        }
1411        new_txs.shrink_to_fit();
1412
1413        // 3. import new transactions as a batch to minimize lock contention on the underlying
1414        // pool
1415        if !new_txs.is_empty() {
1416            let pool = self.pool.clone();
1417            // update metrics
1418            let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1419            metric_pending_pool_imports.increment(new_txs.len() as f64);
1420
1421            // update self-monitoring info
1422            self.pending_pool_imports_info
1423                .pending_pool_imports
1424                .fetch_add(new_txs.len(), Ordering::Relaxed);
1425            let tx_manager_info_pending_pool_imports =
1426                self.pending_pool_imports_info.pending_pool_imports.clone();
1427
1428            trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1429            let import = Box::pin(async move {
1430                let added = new_txs.len();
1431                let res = pool.add_external_transactions(new_txs).await;
1432
1433                // update metrics
1434                metric_pending_pool_imports.decrement(added as f64);
1435                // update self-monitoring info
1436                tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1437
1438                res
1439            });
1440
1441            self.pool_imports.push(import);
1442        }
1443
1444        if num_already_seen_by_peer > 0 {
1445            self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1446            self.metrics
1447                .occurrences_of_transaction_already_seen_by_peer
1448                .increment(num_already_seen_by_peer);
1449            trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions");
1450        }
1451
1452        if has_bad_transactions {
1453            // peer sent us invalid transactions
1454            self.report_peer_bad_transactions(peer_id)
1455        }
1456
1457        if num_already_seen_by_peer > 0 {
1458            self.report_already_seen(peer_id);
1459        }
1460    }
1461
1462    /// Processes a [`FetchEvent`].
1463    fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1464        match fetch_event {
1465            FetchEvent::TransactionsFetched { peer_id, transactions, report_peer } => {
1466                self.import_transactions(peer_id, transactions, TransactionSource::Response);
1467                if report_peer {
1468                    self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
1469                }
1470            }
1471            FetchEvent::FetchError { peer_id, error } => {
1472                trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1473                self.on_request_error(peer_id, error);
1474            }
1475            FetchEvent::EmptyResponse { peer_id } => {
1476                trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1477            }
1478        }
1479    }
1480}
1481
1482/// An endless future. Preemption ensure that future is non-blocking, nonetheless. See
1483/// [`crate::NetworkManager`] for more context on the design pattern.
1484///
1485/// This should be spawned or used as part of `tokio::select!`.
1486//
1487// spawned in `NodeConfig::start_network`(reth_node_core::NodeConfig) and
1488// `NetworkConfig::start_network`(reth_network::NetworkConfig)
1489impl<
1490        Pool: TransactionPool + Unpin + 'static,
1491        N: NetworkPrimitives<
1492                BroadcastedTransaction: SignedTransaction,
1493                PooledTransaction: SignedTransaction,
1494            > + Unpin,
1495    > Future for TransactionsManager<Pool, N>
1496where
1497    Pool::Transaction:
1498        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1499{
1500    type Output = ();
1501
1502    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1503        let start = Instant::now();
1504        let mut poll_durations = TxManagerPollDurations::default();
1505
1506        let this = self.get_mut();
1507
1508        // All streams are polled until their corresponding budget is exhausted, then we manually
1509        // yield back control to tokio. See `NetworkManager` for more context on the design
1510        // pattern.
1511
1512        // Advance network/peer related events (update peers map).
1513        let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1514            poll_durations.acc_network_events,
1515            "net::tx",
1516            "Network events stream",
1517            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1518            this.network_events.poll_next_unpin(cx),
1519            |event| this.on_network_event(event)
1520        );
1521
1522        // Advances new __pending__ transactions, transactions that were successfully inserted into
1523        // pending set in pool (are valid), and propagates them (inform peers which
1524        // transactions we have seen).
1525        //
1526        // We try to drain this to batch the transactions in a single message.
1527        //
1528        // We don't expect this buffer to be large, since only pending transactions are
1529        // emitted here.
1530        let mut new_txs = Vec::new();
1531        let maybe_more_pending_txns = match this.pending_transactions.poll_recv_many(
1532            cx,
1533            &mut new_txs,
1534            SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1535        ) {
1536            Poll::Ready(count) => {
1537                if count == SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE {
1538                    // we filled the entire buffer capacity and need to try again on the next poll
1539                    // immediately
1540                    true
1541                } else {
1542                    // try once more, because mostlikely the channel is now empty and the waker is
1543                    // registered if this is pending, if we filled additional hashes, we poll again
1544                    // on the next iteration
1545                    let limit =
1546                        SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE -
1547                            new_txs.len();
1548                    this.pending_transactions.poll_recv_many(cx, &mut new_txs, limit).is_ready()
1549                }
1550            }
1551            Poll::Pending => false,
1552        };
1553        if !new_txs.is_empty() {
1554            this.on_new_pending_transactions(new_txs);
1555        }
1556
1557        // Advance incoming transaction events (stream new txns/announcements from
1558        // network manager and queue for import to pool/fetch txns).
1559        //
1560        // This will potentially remove hashes from hashes pending fetch, it the event
1561        // is an announcement (if same hashes are announced that didn't fit into a
1562        // previous request).
1563        //
1564        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1565        // (128 KiB / 10 bytes > 13k transactions).
1566        //
1567        // If this is an event with `Transactions` message, since transactions aren't
1568        // validated until they are inserted into the pool, this can potentially queue
1569        // >13k transactions for insertion to pool. More if the message size is bigger
1570        // than the soft limit on a `Transactions` broadcast message, which is 128 KiB.
1571        let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1572            poll_durations.acc_tx_events,
1573            "net::tx",
1574            "Network transaction events stream",
1575            DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1576            this.transaction_events.poll_next_unpin(cx),
1577            |event| this.on_network_tx_event(event),
1578        );
1579
1580        // Advance inflight fetch requests (flush transaction fetcher and queue for
1581        // import to pool).
1582        //
1583        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1584        // (2 MiB / 10 bytes > 200k transactions).
1585        //
1586        // Since transactions aren't validated until they are inserted into the pool,
1587        // this can potentially queue >200k transactions for insertion to pool. More
1588        // if the message size is bigger than the soft limit on a `PooledTransactions`
1589        // response which is 2 MiB.
1590        let mut maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1591            poll_durations.acc_fetch_events,
1592            "net::tx",
1593            "Transaction fetch events stream",
1594            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1595            this.transaction_fetcher.poll_next_unpin(cx),
1596            |event| this.on_fetch_event(event),
1597        );
1598
1599        // Advance pool imports (flush txns to pool).
1600        //
1601        // Note, this is done in batches. A batch is filled from one `Transactions`
1602        // broadcast messages or one `PooledTransactions` response at a time. The
1603        // minimum batch size is 1 transaction (and might often be the case with blob
1604        // transactions).
1605        //
1606        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1607        // (2 MiB / 10 bytes > 200k transactions).
1608        //
1609        // Since transactions aren't validated until they are inserted into the pool,
1610        // this can potentially validate >200k transactions. More if the message size
1611        // is bigger than the soft limit on a `PooledTransactions` response which is
1612        // 2 MiB (`Transactions` broadcast messages is smaller, 128 KiB).
1613        let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1614            poll_durations.acc_pending_imports,
1615            "net::tx",
1616            "Batched pool imports stream",
1617            DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1618            this.pool_imports.poll_next_unpin(cx),
1619            |batch_results| this.on_batch_import_result(batch_results)
1620        );
1621
1622        // Tries to drain hashes pending fetch cache if the tx manager currently has
1623        // capacity for this (fetch txns).
1624        //
1625        // Sends at most one request.
1626        duration_metered_exec!(
1627            {
1628                if this.has_capacity_for_fetching_pending_hashes() {
1629                    this.on_fetch_hashes_pending_fetch();
1630                    maybe_more_tx_fetch_events = true;
1631                }
1632            },
1633            poll_durations.acc_pending_fetch
1634        );
1635
1636        // Advance commands (propagate/fetch/serve txns).
1637        let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1638            poll_durations.acc_cmds,
1639            "net::tx",
1640            "Commands channel",
1641            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1642            this.command_rx.poll_next_unpin(cx),
1643            |cmd| this.on_command(cmd)
1644        );
1645
1646        this.transaction_fetcher.update_metrics();
1647
1648        // all channels are fully drained and import futures pending
1649        if maybe_more_network_events ||
1650            maybe_more_commands ||
1651            maybe_more_tx_events ||
1652            maybe_more_tx_fetch_events ||
1653            maybe_more_pool_imports ||
1654            maybe_more_pending_txns
1655        {
1656            // make sure we're woken up again
1657            cx.waker().wake_by_ref();
1658            return Poll::Pending
1659        }
1660
1661        this.update_poll_metrics(start, poll_durations);
1662
1663        Poll::Pending
1664    }
1665}
1666
1667/// Represents the different modes of transaction propagation.
1668///
1669/// This enum is used to determine how transactions are propagated to peers in the network.
1670#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1671enum PropagationMode {
1672    /// Default propagation mode.
1673    ///
1674    /// Transactions are only sent to peers that haven't seen them yet.
1675    Basic,
1676    /// Forced propagation mode.
1677    ///
1678    /// Transactions are sent to all peers regardless of whether they have been sent or received
1679    /// before.
1680    Forced,
1681}
1682
1683impl PropagationMode {
1684    /// Returns `true` if the propagation kind is `Forced`.
1685    const fn is_forced(self) -> bool {
1686        matches!(self, Self::Forced)
1687    }
1688}
1689
1690/// A transaction that's about to be propagated to multiple peers.
1691#[derive(Debug, Clone)]
1692struct PropagateTransaction<T = TransactionSigned> {
1693    size: usize,
1694    transaction: Arc<T>,
1695}
1696
1697impl<T: SignedTransaction> PropagateTransaction<T> {
1698    /// Create a new instance from a transaction.
1699    pub fn new(transaction: T) -> Self {
1700        let size = transaction.length();
1701        Self { size, transaction: Arc::new(transaction) }
1702    }
1703
1704    /// Create a new instance from a pooled transaction
1705    fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1706    where
1707        P: PoolTransaction<Consensus = T>,
1708    {
1709        let size = tx.encoded_length();
1710        let transaction = tx.transaction.clone_into_consensus();
1711        let transaction = Arc::new(transaction.into_inner());
1712        Self { size, transaction }
1713    }
1714
1715    fn tx_hash(&self) -> &TxHash {
1716        self.transaction.tx_hash()
1717    }
1718}
1719
1720/// Helper type to construct the appropriate message to send to the peer based on whether the peer
1721/// should receive them in full or as pooled
1722#[derive(Debug, Clone)]
1723enum PropagateTransactionsBuilder<T> {
1724    Pooled(PooledTransactionsHashesBuilder),
1725    Full(FullTransactionsBuilder<T>),
1726}
1727
1728impl<T> PropagateTransactionsBuilder<T> {
1729    /// Create a builder for pooled transactions
1730    fn pooled(version: EthVersion) -> Self {
1731        Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1732    }
1733
1734    /// Create a builder that sends transactions in full and records transactions that don't fit.
1735    fn full(version: EthVersion) -> Self {
1736        Self::Full(FullTransactionsBuilder::new(version))
1737    }
1738
1739    /// Returns true if no transactions are recorded.
1740    fn is_empty(&self) -> bool {
1741        match self {
1742            Self::Pooled(builder) => builder.is_empty(),
1743            Self::Full(builder) => builder.is_empty(),
1744        }
1745    }
1746
1747    /// Consumes the type and returns the built messages that should be sent to the peer.
1748    fn build(self) -> PropagateTransactions<T> {
1749        match self {
1750            Self::Pooled(pooled) => {
1751                PropagateTransactions { pooled: Some(pooled.build()), full: None }
1752            }
1753            Self::Full(full) => full.build(),
1754        }
1755    }
1756}
1757
1758impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1759    /// Appends all transactions
1760    fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1761        for tx in txs {
1762            self.push(tx);
1763        }
1764    }
1765
1766    /// Appends a transaction to the list.
1767    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1768        match self {
1769            Self::Pooled(builder) => builder.push(transaction),
1770            Self::Full(builder) => builder.push(transaction),
1771        }
1772    }
1773}
1774
1775/// Represents how the transactions should be sent to a peer if any.
1776struct PropagateTransactions<T> {
1777    /// The pooled transaction hashes to send.
1778    pooled: Option<NewPooledTransactionHashes>,
1779    /// The transactions to send in full.
1780    full: Option<Vec<Arc<T>>>,
1781}
1782
1783/// Helper type for constructing the full transaction message that enforces the
1784/// [`DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE`] for full transaction broadcast
1785/// and enforces other propagation rules for EIP-4844 and tracks those transactions that can't be
1786/// broadcasted in full.
1787#[derive(Debug, Clone)]
1788struct FullTransactionsBuilder<T> {
1789    /// The soft limit to enforce for a single broadcast message of full transactions.
1790    total_size: usize,
1791    /// All transactions to be broadcasted.
1792    transactions: Vec<Arc<T>>,
1793    /// Transactions that didn't fit into the broadcast message
1794    pooled: PooledTransactionsHashesBuilder,
1795}
1796
1797impl<T> FullTransactionsBuilder<T> {
1798    /// Create a builder for the negotiated version of the peer's session
1799    fn new(version: EthVersion) -> Self {
1800        Self {
1801            total_size: 0,
1802            pooled: PooledTransactionsHashesBuilder::new(version),
1803            transactions: vec![],
1804        }
1805    }
1806
1807    /// Returns whether or not any transactions are in the [`FullTransactionsBuilder`].
1808    fn is_empty(&self) -> bool {
1809        self.transactions.is_empty() && self.pooled.is_empty()
1810    }
1811
1812    /// Returns the messages that should be propagated to the peer.
1813    fn build(self) -> PropagateTransactions<T> {
1814        let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1815        let full = Some(self.transactions).filter(|full| !full.is_empty());
1816        PropagateTransactions { pooled, full }
1817    }
1818}
1819
1820impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1821    /// Appends all transactions.
1822    fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1823        for tx in txs {
1824            self.push(&tx)
1825        }
1826    }
1827
1828    /// Append a transaction to the list of full transaction if the total message bytes size doesn't
1829    /// exceed the soft maximum target byte size. The limit is soft, meaning if one single
1830    /// transaction goes over the limit, it will be broadcasted in its own [`Transactions`]
1831    /// message. The same pattern is followed in filling a [`GetPooledTransactions`] request in
1832    /// [`TransactionFetcher::fill_request_from_hashes_pending_fetch`].
1833    ///
1834    /// If the transaction is unsuitable for broadcast or would exceed the softlimit, it is appended
1835    /// to list of pooled transactions, (e.g. 4844 transactions).
1836    /// See also [`SignedTransaction::is_broadcastable_in_full`].
1837    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1838        // Do not send full 4844 transaction hashes to peers.
1839        //
1840        //  Nodes MUST NOT automatically broadcast blob transactions to their peers.
1841        //  Instead, those transactions are only announced using
1842        //  `NewPooledTransactionHashes` messages, and can then be manually requested
1843        //  via `GetPooledTransactions`.
1844        //
1845        // From: <https://eips.ethereum.org/EIPS/eip-4844#networking>
1846        if !transaction.transaction.is_broadcastable_in_full() {
1847            self.pooled.push(transaction);
1848            return
1849        }
1850
1851        let new_size = self.total_size + transaction.size;
1852        if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1853            self.total_size > 0
1854        {
1855            // transaction does not fit into the message
1856            self.pooled.push(transaction);
1857            return
1858        }
1859
1860        self.total_size = new_size;
1861        self.transactions.push(Arc::clone(&transaction.transaction));
1862    }
1863}
1864
1865/// A helper type to create the pooled transactions message based on the negotiated version of the
1866/// session with the peer
1867#[derive(Debug, Clone)]
1868enum PooledTransactionsHashesBuilder {
1869    Eth66(NewPooledTransactionHashes66),
1870    Eth68(NewPooledTransactionHashes68),
1871}
1872
1873// === impl PooledTransactionsHashesBuilder ===
1874
1875impl PooledTransactionsHashesBuilder {
1876    /// Push a transaction from the pool to the list.
1877    fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1878        match self {
1879            Self::Eth66(msg) => msg.0.push(*pooled_tx.hash()),
1880            Self::Eth68(msg) => {
1881                msg.hashes.push(*pooled_tx.hash());
1882                msg.sizes.push(pooled_tx.encoded_length());
1883                msg.types.push(pooled_tx.transaction.ty());
1884            }
1885        }
1886    }
1887
1888    /// Returns whether or not any transactions are in the [`PooledTransactionsHashesBuilder`].
1889    fn is_empty(&self) -> bool {
1890        match self {
1891            Self::Eth66(hashes) => hashes.is_empty(),
1892            Self::Eth68(hashes) => hashes.is_empty(),
1893        }
1894    }
1895
1896    /// Appends all hashes
1897    fn extend<T: SignedTransaction>(
1898        &mut self,
1899        txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1900    ) {
1901        for tx in txs {
1902            self.push(&tx);
1903        }
1904    }
1905
1906    fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1907        match self {
1908            Self::Eth66(msg) => msg.0.push(*tx.tx_hash()),
1909            Self::Eth68(msg) => {
1910                msg.hashes.push(*tx.tx_hash());
1911                msg.sizes.push(tx.size);
1912                msg.types.push(tx.transaction.ty());
1913            }
1914        }
1915    }
1916
1917    /// Create a builder for the negotiated version of the peer's session
1918    fn new(version: EthVersion) -> Self {
1919        match version {
1920            EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1921            EthVersion::Eth68 | EthVersion::Eth69 => Self::Eth68(Default::default()),
1922        }
1923    }
1924
1925    fn build(self) -> NewPooledTransactionHashes {
1926        match self {
1927            Self::Eth66(msg) => msg.into(),
1928            Self::Eth68(msg) => msg.into(),
1929        }
1930    }
1931}
1932
1933/// How we received the transactions.
1934enum TransactionSource {
1935    /// Transactions were broadcast to us via [`Transactions`] message.
1936    Broadcast,
1937    /// Transactions were sent as the response of [`fetcher::GetPooledTxRequest`] issued by us.
1938    Response,
1939}
1940
1941// === impl TransactionSource ===
1942
1943impl TransactionSource {
1944    /// Whether the transaction were sent as broadcast.
1945    const fn is_broadcast(&self) -> bool {
1946        matches!(self, Self::Broadcast)
1947    }
1948}
1949
1950/// Tracks a single peer in the context of [`TransactionsManager`].
1951#[derive(Debug)]
1952pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
1953    /// Optimistically keeps track of transactions that we know the peer has seen. Optimistic, in
1954    /// the sense that transactions are preemptively marked as seen by peer when they are sent to
1955    /// the peer.
1956    seen_transactions: LruCache<TxHash>,
1957    /// A communication channel directly to the peer's session task.
1958    request_tx: PeerRequestSender<PeerRequest<N>>,
1959    /// negotiated version of the session.
1960    version: EthVersion,
1961    /// The peer's client version.
1962    client_version: Arc<str>,
1963    /// The kind of peer.
1964    peer_kind: PeerKind,
1965}
1966
1967impl<N: NetworkPrimitives> PeerMetadata<N> {
1968    /// Returns a new instance of [`PeerMetadata`].
1969    pub fn new(
1970        request_tx: PeerRequestSender<PeerRequest<N>>,
1971        version: EthVersion,
1972        client_version: Arc<str>,
1973        max_transactions_seen_by_peer: u32,
1974        peer_kind: PeerKind,
1975    ) -> Self {
1976        Self {
1977            seen_transactions: LruCache::new(max_transactions_seen_by_peer),
1978            request_tx,
1979            version,
1980            client_version,
1981            peer_kind,
1982        }
1983    }
1984
1985    /// Returns a reference to the peer's request sender channel.
1986    pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
1987        &self.request_tx
1988    }
1989
1990    /// Return a
1991    pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
1992        &mut self.seen_transactions
1993    }
1994
1995    /// Returns the negotiated `EthVersion` of the session.
1996    pub const fn version(&self) -> EthVersion {
1997        self.version
1998    }
1999
2000    /// Returns a reference to the peer's client version string.
2001    pub fn client_version(&self) -> &str {
2002        &self.client_version
2003    }
2004
2005    /// Returns the peer's kind.
2006    pub const fn peer_kind(&self) -> PeerKind {
2007        self.peer_kind
2008    }
2009}
2010
2011/// Commands to send to the [`TransactionsManager`]
2012#[derive(Debug)]
2013enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
2014    /// Propagate a transaction hash to the network.
2015    PropagateHash(B256),
2016    /// Propagate transaction hashes to a specific peer.
2017    PropagateHashesTo(Vec<B256>, PeerId),
2018    /// Request the list of active peer IDs from the [`TransactionsManager`].
2019    GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
2020    /// Propagate a collection of full transactions to a specific peer.
2021    PropagateTransactionsTo(Vec<TxHash>, PeerId),
2022    /// Propagate a collection of hashes to all peers.
2023    PropagateTransactions(Vec<TxHash>),
2024    /// Propagate a collection of broadcastable transactions in full to all peers.
2025    BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
2026    /// Request transaction hashes known by specific peers from the [`TransactionsManager`].
2027    GetTransactionHashes {
2028        peers: Vec<PeerId>,
2029        tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
2030    },
2031    /// Requests a clone of the sender channel to the peer.
2032    GetPeerSender {
2033        peer_id: PeerId,
2034        peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
2035    },
2036}
2037
2038/// All events related to transactions emitted by the network.
2039#[derive(Debug)]
2040pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
2041    /// Represents the event of receiving a list of transactions from a peer.
2042    ///
2043    /// This indicates transactions that were broadcasted to us from the peer.
2044    IncomingTransactions {
2045        /// The ID of the peer from which the transactions were received.
2046        peer_id: PeerId,
2047        /// The received transactions.
2048        msg: Transactions<N::BroadcastedTransaction>,
2049    },
2050    /// Represents the event of receiving a list of transaction hashes from a peer.
2051    IncomingPooledTransactionHashes {
2052        /// The ID of the peer from which the transaction hashes were received.
2053        peer_id: PeerId,
2054        /// The received new pooled transaction hashes.
2055        msg: NewPooledTransactionHashes,
2056    },
2057    /// Represents the event of receiving a `GetPooledTransactions` request from a peer.
2058    GetPooledTransactions {
2059        /// The ID of the peer from which the request was received.
2060        peer_id: PeerId,
2061        /// The received `GetPooledTransactions` request.
2062        request: GetPooledTransactions,
2063        /// The sender for responding to the request with a result of `PooledTransactions`.
2064        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
2065    },
2066    /// Represents the event of receiving a `GetTransactionsHandle` request.
2067    GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
2068}
2069
2070/// Tracks stats about the [`TransactionsManager`].
2071#[derive(Debug)]
2072pub struct PendingPoolImportsInfo {
2073    /// Number of transactions about to be inserted into the pool.
2074    pending_pool_imports: Arc<AtomicUsize>,
2075    /// Max number of transactions allowed to be imported concurrently.
2076    max_pending_pool_imports: usize,
2077}
2078
2079impl PendingPoolImportsInfo {
2080    /// Returns a new [`PendingPoolImportsInfo`].
2081    pub fn new(max_pending_pool_imports: usize) -> Self {
2082        Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
2083    }
2084
2085    /// Returns `true` if the number of pool imports is under a given tolerated max.
2086    pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
2087        self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
2088    }
2089}
2090
2091impl Default for PendingPoolImportsInfo {
2092    fn default() -> Self {
2093        Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
2094    }
2095}
2096
2097#[derive(Debug, Default)]
2098struct TxManagerPollDurations {
2099    acc_network_events: Duration,
2100    acc_pending_imports: Duration,
2101    acc_tx_events: Duration,
2102    acc_imported_txns: Duration,
2103    acc_fetch_events: Duration,
2104    acc_pending_fetch: Duration,
2105    acc_cmds: Duration,
2106}
2107
2108#[cfg(test)]
2109mod tests {
2110    use super::*;
2111    use crate::{
2112        test_utils::{
2113            transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
2114            Testnet,
2115        },
2116        transactions::config::RelaxedEthAnnouncementFilter,
2117        NetworkConfigBuilder, NetworkManager,
2118    };
2119    use alloy_consensus::{TxEip1559, TxLegacy};
2120    use alloy_primitives::{hex, Signature, TxKind, U256};
2121    use alloy_rlp::Decodable;
2122    use futures::FutureExt;
2123    use reth_chainspec::MIN_TRANSACTION_GAS;
2124    use reth_ethereum_primitives::{PooledTransactionVariant, Transaction, TransactionSigned};
2125    use reth_network_api::{NetworkInfo, PeerKind};
2126    use reth_network_p2p::{
2127        error::{RequestError, RequestResult},
2128        sync::{NetworkSyncUpdater, SyncState},
2129    };
2130    use reth_storage_api::noop::NoopProvider;
2131    use reth_transaction_pool::test_utils::{
2132        testing_pool, MockTransaction, MockTransactionFactory, TestPool,
2133    };
2134    use secp256k1::SecretKey;
2135    use std::{
2136        future::poll_fn,
2137        net::{IpAddr, Ipv4Addr, SocketAddr},
2138        str::FromStr,
2139    };
2140    use tracing::error;
2141
2142    #[tokio::test(flavor = "multi_thread")]
2143    async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2144        reth_tracing::init_test_tracing();
2145        let net = Testnet::create(3).await;
2146
2147        let mut handles = net.handles();
2148        let handle0 = handles.next().unwrap();
2149        let handle1 = handles.next().unwrap();
2150
2151        drop(handles);
2152        let handle = net.spawn();
2153
2154        let listener0 = handle0.event_listener();
2155        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2156        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2157
2158        let client = NoopProvider::default();
2159        let pool = testing_pool();
2160        let config = NetworkConfigBuilder::eth(secret_key)
2161            .disable_discovery()
2162            .listener_port(0)
2163            .build(client);
2164        let transactions_manager_config = config.transactions_manager_config.clone();
2165        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2166            .await
2167            .unwrap()
2168            .into_builder()
2169            .transactions(pool.clone(), transactions_manager_config)
2170            .split_with_handle();
2171
2172        tokio::task::spawn(network);
2173
2174        // go to syncing (pipeline sync)
2175        network_handle.update_sync_state(SyncState::Syncing);
2176        assert!(NetworkInfo::is_syncing(&network_handle));
2177        assert!(NetworkInfo::is_initially_syncing(&network_handle));
2178
2179        // wait for all initiator connections
2180        let mut established = listener0.take(2);
2181        while let Some(ev) = established.next().await {
2182            match ev {
2183                NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2184                    // to insert a new peer in transactions peerset
2185                    transactions
2186                        .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2187                }
2188                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2189                ev => {
2190                    error!("unexpected event {ev:?}")
2191                }
2192            }
2193        }
2194        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2195        let input = hex!(
2196            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2197        );
2198        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2199        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2200            peer_id: *handle1.peer_id(),
2201            msg: Transactions(vec![signed_tx.clone()]),
2202        });
2203        poll_fn(|cx| {
2204            let _ = transactions.poll_unpin(cx);
2205            Poll::Ready(())
2206        })
2207        .await;
2208        assert!(pool.is_empty());
2209        handle.terminate().await;
2210    }
2211
2212    #[tokio::test(flavor = "multi_thread")]
2213    async fn test_tx_broadcasts_through_two_syncs() {
2214        reth_tracing::init_test_tracing();
2215        let net = Testnet::create(3).await;
2216
2217        let mut handles = net.handles();
2218        let handle0 = handles.next().unwrap();
2219        let handle1 = handles.next().unwrap();
2220
2221        drop(handles);
2222        let handle = net.spawn();
2223
2224        let listener0 = handle0.event_listener();
2225        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2226        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2227
2228        let client = NoopProvider::default();
2229        let pool = testing_pool();
2230        let config = NetworkConfigBuilder::new(secret_key)
2231            .disable_discovery()
2232            .listener_port(0)
2233            .build(client);
2234        let transactions_manager_config = config.transactions_manager_config.clone();
2235        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2236            .await
2237            .unwrap()
2238            .into_builder()
2239            .transactions(pool.clone(), transactions_manager_config)
2240            .split_with_handle();
2241
2242        tokio::task::spawn(network);
2243
2244        // go to syncing (pipeline sync) to idle and then to syncing (live)
2245        network_handle.update_sync_state(SyncState::Syncing);
2246        assert!(NetworkInfo::is_syncing(&network_handle));
2247        network_handle.update_sync_state(SyncState::Idle);
2248        assert!(!NetworkInfo::is_syncing(&network_handle));
2249        network_handle.update_sync_state(SyncState::Syncing);
2250        assert!(NetworkInfo::is_syncing(&network_handle));
2251
2252        // wait for all initiator connections
2253        let mut established = listener0.take(2);
2254        while let Some(ev) = established.next().await {
2255            match ev {
2256                NetworkEvent::ActivePeerSession { .. } |
2257                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2258                    // to insert a new peer in transactions peerset
2259                    transactions.on_network_event(ev);
2260                }
2261                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2262                _ => {
2263                    error!("unexpected event {ev:?}")
2264                }
2265            }
2266        }
2267        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2268        let input = hex!(
2269            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2270        );
2271        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2272        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2273            peer_id: *handle1.peer_id(),
2274            msg: Transactions(vec![signed_tx.clone()]),
2275        });
2276        poll_fn(|cx| {
2277            let _ = transactions.poll_unpin(cx);
2278            Poll::Ready(())
2279        })
2280        .await;
2281        assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2282        assert!(NetworkInfo::is_syncing(&network_handle));
2283        assert!(!pool.is_empty());
2284        handle.terminate().await;
2285    }
2286
2287    // Ensure that the transaction manager correctly handles the `IncomingPooledTransactionHashes`
2288    // event and is able to retrieve the corresponding transactions.
2289    #[tokio::test(flavor = "multi_thread")]
2290    async fn test_handle_incoming_transactions_hashes() {
2291        reth_tracing::init_test_tracing();
2292
2293        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2294        let client = NoopProvider::default();
2295
2296        let config = NetworkConfigBuilder::new(secret_key)
2297            // let OS choose port
2298            .listener_port(0)
2299            .disable_discovery()
2300            .build(client);
2301
2302        let pool = testing_pool();
2303
2304        let transactions_manager_config = config.transactions_manager_config.clone();
2305        let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2306            .await
2307            .unwrap()
2308            .into_builder()
2309            .transactions(pool.clone(), transactions_manager_config)
2310            .split_with_handle();
2311
2312        let peer_id_1 = PeerId::new([1; 64]);
2313        let eth_version = EthVersion::Eth66;
2314
2315        let txs = vec![TransactionSigned::new_unhashed(
2316            Transaction::Legacy(TxLegacy {
2317                chain_id: Some(4),
2318                nonce: 15u64,
2319                gas_price: 2200000000,
2320                gas_limit: 34811,
2321                to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2322                value: U256::from(1234u64),
2323                input: Default::default(),
2324            }),
2325            Signature::new(
2326                U256::from_str(
2327                    "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2328                )
2329                .unwrap(),
2330                U256::from_str(
2331                    "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2332                )
2333                .unwrap(),
2334                true,
2335            ),
2336        )];
2337
2338        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2339
2340        let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2341        tx_manager.peers.insert(peer_id_1, peer_1);
2342
2343        assert!(pool.is_empty());
2344
2345        tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2346            peer_id: peer_id_1,
2347            msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2348                txs_hashes.clone(),
2349            )),
2350        });
2351
2352        // mock session of peer_1 receives request
2353        let req = to_mock_session_rx
2354            .recv()
2355            .await
2356            .expect("peer_1 session should receive request with buffered hashes");
2357        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2358        assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2359
2360        let message: Vec<PooledTransactionVariant> = txs
2361            .into_iter()
2362            .map(|tx| {
2363                PooledTransactionVariant::try_from(tx)
2364                    .expect("Failed to convert MockTransaction to PooledTransaction")
2365            })
2366            .collect();
2367
2368        // return the transactions corresponding to the transaction hashes.
2369        response
2370            .send(Ok(PooledTransactions(message)))
2371            .expect("should send peer_1 response to tx manager");
2372
2373        // adance the transaction manager future
2374        poll_fn(|cx| {
2375            let _ = tx_manager.poll_unpin(cx);
2376            Poll::Ready(())
2377        })
2378        .await;
2379
2380        // ensure that the transactions corresponding to the transaction hashes have been
2381        // successfully retrieved and stored in the Pool.
2382        assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2383    }
2384
2385    #[tokio::test(flavor = "multi_thread")]
2386    async fn test_handle_incoming_transactions() {
2387        reth_tracing::init_test_tracing();
2388        let net = Testnet::create(3).await;
2389
2390        let mut handles = net.handles();
2391        let handle0 = handles.next().unwrap();
2392        let handle1 = handles.next().unwrap();
2393
2394        drop(handles);
2395        let handle = net.spawn();
2396
2397        let listener0 = handle0.event_listener();
2398
2399        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2400        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2401
2402        let client = NoopProvider::default();
2403        let pool = testing_pool();
2404        let config = NetworkConfigBuilder::new(secret_key)
2405            .disable_discovery()
2406            .listener_port(0)
2407            .build(client);
2408        let transactions_manager_config = config.transactions_manager_config.clone();
2409        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2410            .await
2411            .unwrap()
2412            .into_builder()
2413            .transactions(pool.clone(), transactions_manager_config)
2414            .split_with_handle();
2415        tokio::task::spawn(network);
2416
2417        network_handle.update_sync_state(SyncState::Idle);
2418
2419        assert!(!NetworkInfo::is_syncing(&network_handle));
2420
2421        // wait for all initiator connections
2422        let mut established = listener0.take(2);
2423        while let Some(ev) = established.next().await {
2424            match ev {
2425                NetworkEvent::ActivePeerSession { .. } |
2426                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2427                    // to insert a new peer in transactions peerset
2428                    transactions.on_network_event(ev);
2429                }
2430                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2431                ev => {
2432                    error!("unexpected event {ev:?}")
2433                }
2434            }
2435        }
2436        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2437        let input = hex!(
2438            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2439        );
2440        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2441        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2442            peer_id: *handle1.peer_id(),
2443            msg: Transactions(vec![signed_tx.clone()]),
2444        });
2445        assert!(transactions
2446            .transactions_by_peers
2447            .get(signed_tx.tx_hash())
2448            .unwrap()
2449            .contains(handle1.peer_id()));
2450
2451        // advance the transaction manager future
2452        poll_fn(|cx| {
2453            let _ = transactions.poll_unpin(cx);
2454            Poll::Ready(())
2455        })
2456        .await;
2457
2458        assert!(!pool.is_empty());
2459        assert!(pool.get(signed_tx.tx_hash()).is_some());
2460        handle.terminate().await;
2461    }
2462
2463    #[tokio::test(flavor = "multi_thread")]
2464    async fn test_on_get_pooled_transactions_network() {
2465        reth_tracing::init_test_tracing();
2466        let net = Testnet::create(2).await;
2467
2468        let mut handles = net.handles();
2469        let handle0 = handles.next().unwrap();
2470        let handle1 = handles.next().unwrap();
2471
2472        drop(handles);
2473        let handle = net.spawn();
2474
2475        let listener0 = handle0.event_listener();
2476
2477        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2478        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2479
2480        let client = NoopProvider::default();
2481        let pool = testing_pool();
2482        let config = NetworkConfigBuilder::new(secret_key)
2483            .disable_discovery()
2484            .listener_port(0)
2485            .build(client);
2486        let transactions_manager_config = config.transactions_manager_config.clone();
2487        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2488            .await
2489            .unwrap()
2490            .into_builder()
2491            .transactions(pool.clone(), transactions_manager_config)
2492            .split_with_handle();
2493        tokio::task::spawn(network);
2494
2495        network_handle.update_sync_state(SyncState::Idle);
2496
2497        assert!(!NetworkInfo::is_syncing(&network_handle));
2498
2499        // wait for all initiator connections
2500        let mut established = listener0.take(2);
2501        while let Some(ev) = established.next().await {
2502            match ev {
2503                NetworkEvent::ActivePeerSession { .. } |
2504                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2505                    transactions.on_network_event(ev);
2506                }
2507                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2508                ev => {
2509                    error!("unexpected event {ev:?}")
2510                }
2511            }
2512        }
2513        handle.terminate().await;
2514
2515        let tx = MockTransaction::eip1559();
2516        let _ = transactions
2517            .pool
2518            .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2519            .await;
2520
2521        let request = GetPooledTransactions(vec![*tx.get_hash()]);
2522
2523        let (send, receive) =
2524            oneshot::channel::<RequestResult<PooledTransactions<PooledTransactionVariant>>>();
2525
2526        transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2527            peer_id: *handle1.peer_id(),
2528            request,
2529            response: send,
2530        });
2531
2532        match receive.await.unwrap() {
2533            Ok(PooledTransactions(transactions)) => {
2534                assert_eq!(transactions.len(), 1);
2535            }
2536            Err(e) => {
2537                panic!("error: {e:?}");
2538            }
2539        }
2540    }
2541
2542    // Ensure that when the remote peer only returns part of the requested transactions, the
2543    // replied transactions are removed from the `tx_fetcher`, while the unresponsive ones are
2544    // re-buffered.
2545    #[tokio::test]
2546    async fn test_partially_tx_response() {
2547        reth_tracing::init_test_tracing();
2548
2549        let mut tx_manager = new_tx_manager().await.0;
2550        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2551
2552        let peer_id_1 = PeerId::new([1; 64]);
2553        let eth_version = EthVersion::Eth66;
2554
2555        let txs = vec![
2556            TransactionSigned::new_unhashed(
2557                Transaction::Legacy(TxLegacy {
2558                    chain_id: Some(4),
2559                    nonce: 15u64,
2560                    gas_price: 2200000000,
2561                    gas_limit: 34811,
2562                    to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2563                    value: U256::from(1234u64),
2564                    input: Default::default(),
2565                }),
2566                Signature::new(
2567                    U256::from_str(
2568                        "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2569                    )
2570                    .unwrap(),
2571                    U256::from_str(
2572                        "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2573                    )
2574                    .unwrap(),
2575                    true,
2576                ),
2577            ),
2578            TransactionSigned::new_unhashed(
2579                Transaction::Eip1559(TxEip1559 {
2580                    chain_id: 4,
2581                    nonce: 26u64,
2582                    max_priority_fee_per_gas: 1500000000,
2583                    max_fee_per_gas: 1500000013,
2584                    gas_limit: MIN_TRANSACTION_GAS,
2585                    to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2586                    value: U256::from(3000000000000000000u64),
2587                    input: Default::default(),
2588                    access_list: Default::default(),
2589                }),
2590                Signature::new(
2591                    U256::from_str(
2592                        "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2593                    )
2594                    .unwrap(),
2595                    U256::from_str(
2596                        "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2597                    )
2598                    .unwrap(),
2599                    true,
2600                ),
2601            ),
2602        ];
2603
2604        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2605
2606        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2607        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2608        // fetch
2609        peer_1.seen_transactions.insert(txs_hashes[0]);
2610        peer_1.seen_transactions.insert(txs_hashes[1]);
2611        tx_manager.peers.insert(peer_id_1, peer_1);
2612
2613        buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2614        buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2615
2616        // peer_1 is idle
2617        assert!(tx_fetcher.is_idle(&peer_id_1));
2618        assert_eq!(tx_fetcher.active_peers.len(), 0);
2619
2620        // sends requests for buffered hashes to peer_1
2621        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2622
2623        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2624        // as long as request is in flight peer_1 is not idle
2625        assert!(!tx_fetcher.is_idle(&peer_id_1));
2626        assert_eq!(tx_fetcher.active_peers.len(), 1);
2627
2628        // mock session of peer_1 receives request
2629        let req = to_mock_session_rx
2630            .recv()
2631            .await
2632            .expect("peer_1 session should receive request with buffered hashes");
2633        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2634
2635        let message: Vec<PooledTransactionVariant> = txs
2636            .into_iter()
2637            .take(1)
2638            .map(|tx| {
2639                PooledTransactionVariant::try_from(tx)
2640                    .expect("Failed to convert MockTransaction to PooledTransaction")
2641            })
2642            .collect();
2643        // response partial request
2644        response
2645            .send(Ok(PooledTransactions(message)))
2646            .expect("should send peer_1 response to tx manager");
2647        let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2648            unreachable!()
2649        };
2650
2651        // request has resolved, peer_1 is idle again
2652        assert!(tx_fetcher.is_idle(&peer_id));
2653        assert_eq!(tx_fetcher.active_peers.len(), 0);
2654        // failing peer_1's request buffers requested hashes for retry.
2655        assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2656    }
2657
2658    #[tokio::test]
2659    async fn test_max_retries_tx_request() {
2660        reth_tracing::init_test_tracing();
2661
2662        let mut tx_manager = new_tx_manager().await.0;
2663        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2664
2665        let peer_id_1 = PeerId::new([1; 64]);
2666        let peer_id_2 = PeerId::new([2; 64]);
2667        let eth_version = EthVersion::Eth66;
2668        let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2669
2670        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2671        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2672        // fetch
2673        peer_1.seen_transactions.insert(seen_hashes[0]);
2674        peer_1.seen_transactions.insert(seen_hashes[1]);
2675        tx_manager.peers.insert(peer_id_1, peer_1);
2676
2677        // hashes are seen and currently not inflight, with one fallback peer, and are buffered
2678        // for first retry in reverse order to make index 0 lru
2679        let retries = 1;
2680        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2681        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2682
2683        // peer_1 is idle
2684        assert!(tx_fetcher.is_idle(&peer_id_1));
2685        assert_eq!(tx_fetcher.active_peers.len(), 0);
2686
2687        // sends request for buffered hashes to peer_1
2688        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2689
2690        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2691
2692        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2693        // as long as request is in inflight peer_1 is not idle
2694        assert!(!tx_fetcher.is_idle(&peer_id_1));
2695        assert_eq!(tx_fetcher.active_peers.len(), 1);
2696
2697        // mock session of peer_1 receives request
2698        let req = to_mock_session_rx
2699            .recv()
2700            .await
2701            .expect("peer_1 session should receive request with buffered hashes");
2702        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2703        let GetPooledTransactions(hashes) = request;
2704
2705        let hashes = hashes.into_iter().collect::<HashSet<_>>();
2706
2707        assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2708
2709        // fail request to peer_1
2710        response
2711            .send(Err(RequestError::BadResponse))
2712            .expect("should send peer_1 response to tx manager");
2713        let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2714            unreachable!()
2715        };
2716
2717        // request has resolved, peer_1 is idle again
2718        assert!(tx_fetcher.is_idle(&peer_id));
2719        assert_eq!(tx_fetcher.active_peers.len(), 0);
2720        // failing peer_1's request buffers requested hashes for retry
2721        assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2722
2723        let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2724        tx_manager.peers.insert(peer_id_2, peer_2);
2725
2726        // peer_2 announces same hashes as peer_1
2727        let msg =
2728            NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2729        tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2730
2731        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2732
2733        // peer_2 should be in active_peers.
2734        assert_eq!(tx_fetcher.active_peers.len(), 1);
2735
2736        // since hashes are already seen, no changes to length of unknown hashes
2737        assert_eq!(tx_fetcher.num_all_hashes(), 2);
2738        // but hashes are taken out of buffer and packed into request to peer_2
2739        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2740
2741        // mock session of peer_2 receives request
2742        let req = to_mock_session_rx
2743            .recv()
2744            .await
2745            .expect("peer_2 session should receive request with buffered hashes");
2746        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2747
2748        // report failed request to tx manager
2749        response
2750            .send(Err(RequestError::BadResponse))
2751            .expect("should send peer_2 response to tx manager");
2752        let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2753
2754        // `MAX_REQUEST_RETRIES_PER_TX_HASH`, 2, for hashes reached so this time won't be buffered
2755        // for retry
2756        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2757        assert_eq!(tx_fetcher.active_peers.len(), 0);
2758    }
2759
2760    #[test]
2761    fn test_transaction_builder_empty() {
2762        let mut builder =
2763            PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2764        assert!(builder.is_empty());
2765
2766        let mut factory = MockTransactionFactory::default();
2767        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2768        builder.push(&tx);
2769        assert!(!builder.is_empty());
2770
2771        let txs = builder.build();
2772        assert!(txs.full.is_none());
2773        let txs = txs.pooled.unwrap();
2774        assert_eq!(txs.len(), 1);
2775    }
2776
2777    #[test]
2778    fn test_transaction_builder_large() {
2779        let mut builder =
2780            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2781        assert!(builder.is_empty());
2782
2783        let mut factory = MockTransactionFactory::default();
2784        let mut tx = factory.create_eip1559();
2785        // create a transaction that still fits
2786        tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2787        let tx = Arc::new(tx);
2788        let tx = PropagateTransaction::pool_tx(tx);
2789        builder.push(&tx);
2790        assert!(!builder.is_empty());
2791
2792        let txs = builder.clone().build();
2793        assert!(txs.pooled.is_none());
2794        let txs = txs.full.unwrap();
2795        assert_eq!(txs.len(), 1);
2796
2797        builder.push(&tx);
2798
2799        let txs = builder.clone().build();
2800        let pooled = txs.pooled.unwrap();
2801        assert_eq!(pooled.len(), 1);
2802        let txs = txs.full.unwrap();
2803        assert_eq!(txs.len(), 1);
2804    }
2805
2806    #[test]
2807    fn test_transaction_builder_eip4844() {
2808        let mut builder =
2809            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2810        assert!(builder.is_empty());
2811
2812        let mut factory = MockTransactionFactory::default();
2813        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
2814        builder.push(&tx);
2815        assert!(!builder.is_empty());
2816
2817        let txs = builder.clone().build();
2818        assert!(txs.full.is_none());
2819        let txs = txs.pooled.unwrap();
2820        assert_eq!(txs.len(), 1);
2821
2822        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2823        builder.push(&tx);
2824
2825        let txs = builder.clone().build();
2826        let pooled = txs.pooled.unwrap();
2827        assert_eq!(pooled.len(), 1);
2828        let txs = txs.full.unwrap();
2829        assert_eq!(txs.len(), 1);
2830    }
2831
2832    #[tokio::test]
2833    async fn test_propagate_full() {
2834        reth_tracing::init_test_tracing();
2835
2836        let (mut tx_manager, network) = new_tx_manager().await;
2837        let peer_id = PeerId::random();
2838
2839        // ensure not syncing
2840        network.handle().update_sync_state(SyncState::Idle);
2841
2842        // mock a peer
2843        let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
2844
2845        let session_info = SessionInfo {
2846            peer_id,
2847            remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
2848            client_version: Arc::from(""),
2849            capabilities: Arc::new(vec![].into()),
2850            status: Arc::new(Default::default()),
2851            version: EthVersion::Eth68,
2852            peer_kind: PeerKind::Basic,
2853        };
2854        let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
2855        tx_manager
2856            .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
2857        let mut propagate = vec![];
2858        let mut factory = MockTransactionFactory::default();
2859        let eip1559_tx = Arc::new(factory.create_eip1559());
2860        propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
2861        let eip4844_tx = Arc::new(factory.create_eip4844());
2862        propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
2863
2864        let propagated =
2865            tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
2866        assert_eq!(propagated.0.len(), 2);
2867        let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
2868        assert_eq!(prop_txs.len(), 1);
2869        assert!(prop_txs[0].is_full());
2870
2871        let prop_txs = propagated.0.get(eip4844_tx.transaction.hash()).unwrap();
2872        assert_eq!(prop_txs.len(), 1);
2873        assert!(prop_txs[0].is_hash());
2874
2875        let peer = tx_manager.peers.get(&peer_id).unwrap();
2876        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2877        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2878        peer.seen_transactions.contains(eip4844_tx.transaction.hash());
2879
2880        // propagate again
2881        let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
2882        assert!(propagated.0.is_empty());
2883    }
2884
2885    #[tokio::test]
2886    async fn test_relaxed_filter_ignores_unknown_tx_types() {
2887        reth_tracing::init_test_tracing();
2888
2889        let transactions_manager_config = TransactionsManagerConfig::default();
2890
2891        let propagation_policy = TransactionPropagationKind::default();
2892        let announcement_policy = RelaxedEthAnnouncementFilter::default();
2893
2894        let policy_bundle = NetworkPolicies::new(propagation_policy, announcement_policy);
2895
2896        let pool = testing_pool();
2897        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2898        let client = NoopProvider::default();
2899
2900        let network_config = NetworkConfigBuilder::new(secret_key)
2901            .listener_port(0)
2902            .disable_discovery()
2903            .build(client.clone());
2904
2905        let mut network_manager = NetworkManager::new(network_config).await.unwrap();
2906        let (to_tx_manager_tx, from_network_rx) =
2907            mpsc::unbounded_channel::<NetworkTransactionEvent<EthNetworkPrimitives>>();
2908        network_manager.set_transactions(to_tx_manager_tx);
2909        let network_handle = network_manager.handle().clone();
2910        let network_service_handle = tokio::spawn(network_manager);
2911
2912        let mut tx_manager = TransactionsManager::<TestPool, EthNetworkPrimitives>::with_policy(
2913            network_handle.clone(),
2914            pool.clone(),
2915            from_network_rx,
2916            transactions_manager_config,
2917            policy_bundle,
2918        );
2919
2920        let peer_id = PeerId::random();
2921        let eth_version = EthVersion::Eth68;
2922        let (mock_peer_metadata, mut mock_session_rx) = new_mock_session(peer_id, eth_version);
2923        tx_manager.peers.insert(peer_id, mock_peer_metadata);
2924
2925        let mut tx_factory = MockTransactionFactory::default();
2926
2927        let valid_known_tx = tx_factory.create_eip1559();
2928        let known_tx_signed: Arc<ValidPoolTransaction<MockTransaction>> = Arc::new(valid_known_tx);
2929
2930        let known_tx_hash = *known_tx_signed.hash();
2931        let known_tx_type_byte = known_tx_signed.transaction.tx_type();
2932        let known_tx_size = known_tx_signed.encoded_length();
2933
2934        let unknown_tx_hash = B256::random();
2935        let unknown_tx_type_byte = 0xff_u8;
2936        let unknown_tx_size = 150;
2937
2938        let announcement_msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
2939            types: vec![known_tx_type_byte, unknown_tx_type_byte],
2940            sizes: vec![known_tx_size, unknown_tx_size],
2941            hashes: vec![known_tx_hash, unknown_tx_hash],
2942        });
2943
2944        tx_manager.on_new_pooled_transaction_hashes(peer_id, announcement_msg);
2945
2946        poll_fn(|cx| {
2947            let _ = tx_manager.poll_unpin(cx);
2948            Poll::Ready(())
2949        })
2950        .await;
2951
2952        let mut requested_hashes_in_getpooled = HashSet::new();
2953        let mut unexpected_request_received = false;
2954
2955        match tokio::time::timeout(std::time::Duration::from_millis(200), mock_session_rx.recv())
2956            .await
2957        {
2958            Ok(Some(PeerRequest::GetPooledTransactions { request, response: tx_response_ch })) => {
2959                let GetPooledTransactions(hashes) = request;
2960                for hash in hashes {
2961                    requested_hashes_in_getpooled.insert(hash);
2962                }
2963                let _ = tx_response_ch.send(Ok(PooledTransactions(vec![])));
2964            }
2965            Ok(Some(other_request)) => {
2966                tracing::error!(?other_request, "Received unexpected PeerRequest type");
2967                unexpected_request_received = true;
2968            }
2969            Ok(None) => tracing::info!("Mock session channel closed or no request received."),
2970            Err(_timeout_err) => {
2971                tracing::info!("Timeout: No GetPooledTransactions request received.")
2972            }
2973        }
2974
2975        assert!(
2976            requested_hashes_in_getpooled.contains(&known_tx_hash),
2977            "Should have requested the known EIP-1559 transaction. Requested: {requested_hashes_in_getpooled:?}"
2978        );
2979        assert!(
2980            !requested_hashes_in_getpooled.contains(&unknown_tx_hash),
2981            "Should NOT have requested the unknown transaction type. Requested: {requested_hashes_in_getpooled:?}"
2982        );
2983        assert!(
2984            !unexpected_request_received,
2985            "An unexpected P2P request was received by the mock peer."
2986        );
2987
2988        network_service_handle.abort();
2989    }
2990}