reth_network/transactions/
mod.rs

1//! Transactions management for the p2p network.
2
3use alloy_consensus::transaction::TxHashRef;
4
5/// Aggregation on configurable parameters for [`TransactionsManager`].
6pub mod config;
7/// Default and spec'd bounds.
8pub mod constants;
9/// Component responsible for fetching transactions from [`NewPooledTransactionHashes`].
10pub mod fetcher;
11/// Defines the [`TransactionPolicies`] trait for aggregating transaction-related policies.
12pub mod policy;
13
14pub use self::constants::{
15    tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
16    SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
17};
18use config::{AnnouncementAcceptance, StrictEthAnnouncementFilter, TransactionPropagationKind};
19pub use config::{
20    AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionPropagationMode,
21    TransactionPropagationPolicy, TransactionsManagerConfig,
22};
23use policy::{NetworkPolicies, TransactionPolicies};
24
25pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
26
27use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
28use crate::{
29    budget::{
30        DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
31        DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
32        DEFAULT_BUDGET_TRY_DRAIN_STREAM,
33    },
34    cache::LruCache,
35    duration_metered_exec, metered_poll_nested_stream_with_budget,
36    metrics::{
37        AnnouncedTxTypesMetrics, TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
38    },
39    NetworkHandle, TxTypesCounter,
40};
41use alloy_primitives::{TxHash, B256};
42use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
43use futures::{stream::FuturesUnordered, Future, StreamExt};
44use reth_eth_wire::{
45    DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
46    HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
47    NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
48    RequestTxHashes, Transactions, ValidAnnouncementData,
49};
50use reth_ethereum_primitives::{TransactionSigned, TxType};
51use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
52use reth_network_api::{
53    events::{PeerEvent, SessionInfo},
54    NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
55};
56use reth_network_p2p::{
57    error::{RequestError, RequestResult},
58    sync::SyncStateProvider,
59};
60use reth_network_peers::PeerId;
61use reth_network_types::ReputationChangeKind;
62use reth_primitives_traits::SignedTransaction;
63use reth_tokio_util::EventStream;
64use reth_transaction_pool::{
65    error::{PoolError, PoolResult},
66    AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
67    PropagatedTransactions, TransactionPool, ValidPoolTransaction,
68};
69use std::{
70    collections::{hash_map::Entry, HashMap, HashSet},
71    pin::Pin,
72    sync::{
73        atomic::{AtomicUsize, Ordering},
74        Arc,
75    },
76    task::{Context, Poll},
77    time::{Duration, Instant},
78};
79use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
80use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
81use tracing::{debug, trace};
82
83/// The future for importing transactions into the pool.
84///
85/// Resolves with the result of each transaction import.
86pub type PoolImportFuture =
87    Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
88
89/// Api to interact with [`TransactionsManager`] task.
90///
91/// This can be obtained via [`TransactionsManager::handle`] and can be used to manually interact
92/// with the [`TransactionsManager`] task once it is spawned.
93///
94/// For example [`TransactionsHandle::get_peer_transaction_hashes`] returns the transaction hashes
95/// known by a specific peer.
96#[derive(Debug, Clone)]
97pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
98    /// Command channel to the [`TransactionsManager`]
99    manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
100}
101
102/// Implementation of the `TransactionsHandle` API for use in testnet via type
103/// [`PeerHandle`](crate::test_utils::PeerHandle).
104impl<N: NetworkPrimitives> TransactionsHandle<N> {
105    fn send(&self, cmd: TransactionsCommand<N>) {
106        let _ = self.manager_tx.send(cmd);
107    }
108
109    /// Fetch the [`PeerRequestSender`] for the given peer.
110    async fn peer_handle(
111        &self,
112        peer_id: PeerId,
113    ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
114        let (tx, rx) = oneshot::channel();
115        self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
116        rx.await
117    }
118
119    /// Manually propagate the transaction that belongs to the hash.
120    pub fn propagate(&self, hash: TxHash) {
121        self.send(TransactionsCommand::PropagateHash(hash))
122    }
123
124    /// Manually propagate the transaction hash to a specific peer.
125    ///
126    /// Note: this only propagates if the pool contains the transaction.
127    pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
128        self.propagate_hashes_to(Some(hash), peer)
129    }
130
131    /// Manually propagate the transaction hashes to a specific peer.
132    ///
133    /// Note: this only propagates the transactions that are known to the pool.
134    pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
135        let hashes = hash.into_iter().collect::<Vec<_>>();
136        if hashes.is_empty() {
137            return
138        }
139        self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
140    }
141
142    /// Request the active peer IDs from the [`TransactionsManager`].
143    pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
144        let (tx, rx) = oneshot::channel();
145        self.send(TransactionsCommand::GetActivePeers(tx));
146        rx.await
147    }
148
149    /// Manually propagate full transaction hashes to a specific peer.
150    ///
151    /// Do nothing if transactions are empty.
152    pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
153        if transactions.is_empty() {
154            return
155        }
156        self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
157    }
158
159    /// Manually propagate the given transaction hashes to all peers.
160    ///
161    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
162    /// full.
163    pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
164        if transactions.is_empty() {
165            return
166        }
167        self.send(TransactionsCommand::PropagateTransactions(transactions))
168    }
169
170    /// Manually propagate the given transactions to all peers.
171    ///
172    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
173    /// full.
174    pub fn broadcast_transactions(
175        &self,
176        transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
177    ) {
178        let transactions =
179            transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
180        if transactions.is_empty() {
181            return
182        }
183        self.send(TransactionsCommand::BroadcastTransactions(transactions))
184    }
185
186    /// Request the transaction hashes known by specific peers.
187    pub async fn get_transaction_hashes(
188        &self,
189        peers: Vec<PeerId>,
190    ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
191        if peers.is_empty() {
192            return Ok(Default::default())
193        }
194        let (tx, rx) = oneshot::channel();
195        self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
196        rx.await
197    }
198
199    /// Request the transaction hashes known by a specific peer.
200    pub async fn get_peer_transaction_hashes(
201        &self,
202        peer: PeerId,
203    ) -> Result<HashSet<TxHash>, RecvError> {
204        let res = self.get_transaction_hashes(vec![peer]).await?;
205        Ok(res.into_values().next().unwrap_or_default())
206    }
207
208    /// Requests the transactions directly from the given peer.
209    ///
210    /// Returns `None` if the peer is not connected.
211    ///
212    /// **Note**: this returns the response from the peer as received.
213    pub async fn get_pooled_transactions_from(
214        &self,
215        peer_id: PeerId,
216        hashes: Vec<B256>,
217    ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
218        let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
219
220        let (tx, rx) = oneshot::channel();
221        let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
222        peer.try_send(request).ok();
223
224        rx.await?.map(|res| Some(res.0))
225    }
226}
227
228/// Manages transactions on top of the p2p network.
229///
230/// This can be spawned to another task and is supposed to be run as background service.
231/// [`TransactionsHandle`] can be used as frontend to programmatically send commands to it and
232/// interact with it.
233///
234/// The [`TransactionsManager`] is responsible for:
235///    - handling incoming eth messages for transactions.
236///    - serving transaction requests.
237///    - propagate transactions
238///
239/// This type communicates with the [`NetworkManager`](crate::NetworkManager) in both directions.
240///   - receives incoming network messages.
241///   - sends messages to dispatch (responses, propagate tx)
242///
243/// It is directly connected to the [`TransactionPool`] to retrieve requested transactions and
244/// propagate new transactions over the network.
245///
246/// It can be configured with different policies for transaction propagation and announcement
247/// filtering. See [`NetworkPolicies`] and [`TransactionPolicies`] for more details.
248///
249/// ## Network Transaction Processing
250///
251/// ### Message Types
252///
253/// - **`Transactions`**: Full transaction broadcasts (rejects blob transactions)
254/// - **`NewPooledTransactionHashes`**: Hash announcements
255///
256/// ### Peer Tracking
257///
258/// - Maintains per-peer transaction cache (default: 10,240 entries)
259/// - Prevents duplicate imports and enables efficient propagation
260///
261/// ### Bad Transaction Handling
262///
263/// Caches and rejects transactions with consensus violations (gas, signature, chain ID).
264/// Penalizes peers sending invalid transactions.
265///
266/// ### Import Management
267///
268/// Limits concurrent pool imports and backs off when approaching capacity.
269///
270/// ### Transaction Fetching
271///
272/// For announced transactions: filters known → queues unknown → fetches → imports
273///
274/// ### Propagation Rules
275///
276/// Based on: origin (Local/External/Private), peer capabilities, and network state.
277/// Disabled during initial sync.
278///
279/// ### Security
280///
281/// Rate limiting via reputation, bad transaction isolation, peer scoring.
282#[derive(Debug)]
283#[must_use = "Manager does nothing unless polled."]
284pub struct TransactionsManager<
285    Pool,
286    N: NetworkPrimitives = EthNetworkPrimitives,
287    PBundle: TransactionPolicies = NetworkPolicies<
288        TransactionPropagationKind,
289        StrictEthAnnouncementFilter,
290    >,
291> {
292    /// Access to the transaction pool.
293    pool: Pool,
294    /// Network access.
295    network: NetworkHandle<N>,
296    /// Subscriptions to all network related events.
297    ///
298    /// From which we get all new incoming transaction related messages.
299    network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
300    /// Transaction fetcher to handle inflight and missing transaction requests.
301    transaction_fetcher: TransactionFetcher<N>,
302    /// All currently pending transactions grouped by peers.
303    ///
304    /// This way we can track incoming transactions and prevent multiple pool imports for the same
305    /// transaction
306    transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
307    /// Transactions that are currently imported into the `Pool`.
308    ///
309    /// The import process includes:
310    ///  - validation of the transactions, e.g. transaction is well formed: valid tx type, fees are
311    ///    valid, or for 4844 transaction the blobs are valid. See also
312    ///    [`EthTransactionValidator`](reth_transaction_pool::validate::EthTransactionValidator)
313    /// - if the transaction is valid, it is added into the pool.
314    ///
315    /// Once the new transaction reaches the __pending__ state it will be emitted by the pool via
316    /// [`TransactionPool::pending_transactions_listener`] and arrive at the `pending_transactions`
317    /// receiver.
318    pool_imports: FuturesUnordered<PoolImportFuture>,
319    /// Stats on pending pool imports that help the node self-monitor.
320    pending_pool_imports_info: PendingPoolImportsInfo,
321    /// Bad imports.
322    bad_imports: LruCache<TxHash>,
323    /// All the connected peers.
324    peers: HashMap<PeerId, PeerMetadata<N>>,
325    /// Send half for the command channel.
326    ///
327    /// This is kept so that a new [`TransactionsHandle`] can be created at any time.
328    command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
329    /// Incoming commands from [`TransactionsHandle`].
330    ///
331    /// This will only receive commands if a user manually sends a command to the manager through
332    /// the [`TransactionsHandle`] to interact with this type directly.
333    command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
334    /// A stream that yields new __pending__ transactions.
335    ///
336    /// A transaction is considered __pending__ if it is executable on the current state of the
337    /// chain. In other words, this only yields transactions that satisfy all consensus
338    /// requirements, these include:
339    ///   - no nonce gaps
340    ///   - all dynamic fee requirements are (currently) met
341    ///   - account has enough balance to cover the transaction's gas
342    pending_transactions: ReceiverStream<TxHash>,
343    /// Incoming events from the [`NetworkManager`](crate::NetworkManager).
344    transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
345    /// How the `TransactionsManager` is configured.
346    config: TransactionsManagerConfig,
347    /// Network Policies
348    policies: PBundle,
349    /// `TransactionsManager` metrics
350    metrics: TransactionsManagerMetrics,
351    /// `AnnouncedTxTypes` metrics
352    announced_tx_types_metrics: AnnouncedTxTypesMetrics,
353}
354
355impl<Pool: TransactionPool, N: NetworkPrimitives>
356    TransactionsManager<
357        Pool,
358        N,
359        NetworkPolicies<TransactionPropagationKind, StrictEthAnnouncementFilter>,
360    >
361{
362    /// Sets up a new instance.
363    ///
364    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
365    pub fn new(
366        network: NetworkHandle<N>,
367        pool: Pool,
368        from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
369        transactions_manager_config: TransactionsManagerConfig,
370    ) -> Self {
371        Self::with_policy(
372            network,
373            pool,
374            from_network,
375            transactions_manager_config,
376            NetworkPolicies::default(),
377        )
378    }
379}
380
381impl<Pool: TransactionPool, N: NetworkPrimitives, PBundle: TransactionPolicies>
382    TransactionsManager<Pool, N, PBundle>
383{
384    /// Sets up a new instance with given the settings.
385    ///
386    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
387    pub fn with_policy(
388        network: NetworkHandle<N>,
389        pool: Pool,
390        from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
391        transactions_manager_config: TransactionsManagerConfig,
392        policies: PBundle,
393    ) -> Self {
394        let network_events = network.event_listener();
395
396        let (command_tx, command_rx) = mpsc::unbounded_channel();
397
398        let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
399            &transactions_manager_config.transaction_fetcher_config,
400        );
401
402        // install a listener for new __pending__ transactions that are allowed to be propagated
403        // over the network
404        let pending = pool.pending_transactions_listener();
405        let pending_pool_imports_info = PendingPoolImportsInfo::default();
406        let metrics = TransactionsManagerMetrics::default();
407        metrics
408            .capacity_pending_pool_imports
409            .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
410
411        Self {
412            pool,
413            network,
414            network_events,
415            transaction_fetcher,
416            transactions_by_peers: Default::default(),
417            pool_imports: Default::default(),
418            pending_pool_imports_info: PendingPoolImportsInfo::new(
419                DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS,
420            ),
421            bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
422            peers: Default::default(),
423            command_tx,
424            command_rx: UnboundedReceiverStream::new(command_rx),
425            pending_transactions: ReceiverStream::new(pending),
426            transaction_events: UnboundedMeteredReceiver::new(
427                from_network,
428                NETWORK_POOL_TRANSACTIONS_SCOPE,
429            ),
430            config: transactions_manager_config,
431            policies,
432            metrics,
433            announced_tx_types_metrics: AnnouncedTxTypesMetrics::default(),
434        }
435    }
436
437    /// Returns a new handle that can send commands to this type.
438    pub fn handle(&self) -> TransactionsHandle<N> {
439        TransactionsHandle { manager_tx: self.command_tx.clone() }
440    }
441
442    /// Returns `true` if [`TransactionsManager`] has capacity to request pending hashes. Returns
443    /// `false` if [`TransactionsManager`] is operating close to full capacity.
444    fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
445        self.pending_pool_imports_info
446            .has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) &&
447            self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
448    }
449
450    fn report_peer_bad_transactions(&self, peer_id: PeerId) {
451        self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
452        self.metrics.reported_bad_transactions.increment(1);
453    }
454
455    fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
456        trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
457        self.network.reputation_change(peer_id, kind);
458    }
459
460    fn report_already_seen(&self, peer_id: PeerId) {
461        trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
462        self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
463    }
464
465    /// Clear the transaction
466    fn on_good_import(&mut self, hash: TxHash) {
467        self.transactions_by_peers.remove(&hash);
468    }
469
470    /// Penalize the peers that intentionally sent the bad transaction, and cache it to avoid
471    /// fetching or importing it again.
472    ///
473    /// Errors that count as bad transactions are:
474    ///
475    /// - intrinsic gas too low
476    /// - exceeds gas limit
477    /// - gas uint overflow
478    /// - exceeds max init code size
479    /// - oversized data
480    /// - signer account has bytecode
481    /// - chain id mismatch
482    /// - old legacy chain id
483    /// - tx type not supported
484    ///
485    /// (and additionally for blobs txns...)
486    ///
487    /// - no blobs
488    /// - too many blobs
489    /// - invalid kzg proof
490    /// - kzg error
491    /// - not blob transaction (tx type mismatch)
492    /// - wrong versioned kzg commitment hash
493    fn on_bad_import(&mut self, err: PoolError) {
494        let peers = self.transactions_by_peers.remove(&err.hash);
495
496        // if we're _currently_ syncing, we ignore a bad transaction
497        if !err.is_bad_transaction() || self.network.is_syncing() {
498            return
499        }
500        // otherwise we penalize the peer that sent the bad transaction, with the assumption that
501        // the peer should have known that this transaction is bad (e.g. violating consensus rules)
502        if let Some(peers) = peers {
503            for peer_id in peers {
504                self.report_peer_bad_transactions(peer_id);
505            }
506        }
507        self.metrics.bad_imports.increment(1);
508        self.bad_imports.insert(err.hash);
509    }
510
511    /// Runs an operation to fetch hashes that are cached in [`TransactionFetcher`].
512    fn on_fetch_hashes_pending_fetch(&mut self) {
513        // try drain transaction hashes pending fetch
514        let info = &self.pending_pool_imports_info;
515        let max_pending_pool_imports = info.max_pending_pool_imports;
516        let has_capacity_wrt_pending_pool_imports =
517            |divisor| info.has_capacity(max_pending_pool_imports / divisor);
518
519        self.transaction_fetcher
520            .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports);
521    }
522
523    fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
524        let kind = match req_err {
525            RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
526            RequestError::Timeout => ReputationChangeKind::Timeout,
527            RequestError::ChannelClosed | RequestError::ConnectionDropped => {
528                // peer is already disconnected
529                return
530            }
531            RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
532        };
533        self.report_peer(peer_id, kind);
534    }
535
536    #[inline]
537    fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
538        let metrics = &self.metrics;
539
540        let TxManagerPollDurations {
541            acc_network_events,
542            acc_pending_imports,
543            acc_tx_events,
544            acc_imported_txns,
545            acc_fetch_events,
546            acc_pending_fetch,
547            acc_cmds,
548        } = poll_durations;
549
550        // update metrics for whole poll function
551        metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
552        // update metrics for nested expressions
553        metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
554        metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
555        metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
556        metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
557        metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
558        metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
559        metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
560    }
561}
562
563impl<Pool: TransactionPool, N: NetworkPrimitives, PBundle: TransactionPolicies>
564    TransactionsManager<Pool, N, PBundle>
565{
566    /// Processes a batch import results.
567    fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
568        for res in batch_results {
569            match res {
570                Ok(AddedTransactionOutcome { hash, .. }) => {
571                    self.on_good_import(hash);
572                }
573                Err(err) => {
574                    self.on_bad_import(err);
575                }
576            }
577        }
578    }
579
580    /// Request handler for an incoming `NewPooledTransactionHashes`
581    fn on_new_pooled_transaction_hashes(
582        &mut self,
583        peer_id: PeerId,
584        msg: NewPooledTransactionHashes,
585    ) {
586        // If the node is initially syncing, ignore transactions
587        if self.network.is_initially_syncing() {
588            return
589        }
590        if self.network.tx_gossip_disabled() {
591            return
592        }
593
594        // get handle to peer's session, if the session is still active
595        let Some(peer) = self.peers.get_mut(&peer_id) else {
596            trace!(
597                peer_id = format!("{peer_id:#}"),
598                ?msg,
599                "discarding announcement from inactive peer"
600            );
601
602            return
603        };
604        let client = peer.client_version.clone();
605
606        // keep track of the transactions the peer knows
607        let mut count_txns_already_seen_by_peer = 0;
608        for tx in msg.iter_hashes().copied() {
609            if !peer.seen_transactions.insert(tx) {
610                count_txns_already_seen_by_peer += 1;
611            }
612        }
613        if count_txns_already_seen_by_peer > 0 {
614            // this may occur if transactions are sent or announced to a peer, at the same time as
615            // the peer sends/announces those hashes to us. this is because, marking
616            // txns as seen by a peer is done optimistically upon sending them to the
617            // peer.
618            self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
619            self.metrics
620                .occurrences_hash_already_seen_by_peer
621                .increment(count_txns_already_seen_by_peer);
622
623            trace!(target: "net::tx",
624                %count_txns_already_seen_by_peer,
625                peer_id=format!("{peer_id:#}"),
626                ?client,
627                "Peer sent hashes that have already been marked as seen by peer"
628            );
629
630            self.report_already_seen(peer_id);
631        }
632
633        // 1. filter out spam
634        if msg.is_empty() {
635            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
636            return;
637        }
638
639        let original_len = msg.len();
640        let mut partially_valid_msg = msg.dedup();
641
642        if partially_valid_msg.len() != original_len {
643            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
644        }
645
646        // 2. filter out transactions pending import to pool
647        partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
648
649        // 3. filter out known hashes
650        //
651        // known txns have already been successfully fetched or received over gossip.
652        //
653        // most hashes will be filtered out here since this the mempool protocol is a gossip
654        // protocol, healthy peers will send many of the same hashes.
655        //
656        let hashes_count_pre_pool_filter = partially_valid_msg.len();
657        self.pool.retain_unknown(&mut partially_valid_msg);
658        if hashes_count_pre_pool_filter > partially_valid_msg.len() {
659            let already_known_hashes_count =
660                hashes_count_pre_pool_filter - partially_valid_msg.len();
661            self.metrics
662                .occurrences_hashes_already_in_pool
663                .increment(already_known_hashes_count as u64);
664        }
665
666        if partially_valid_msg.is_empty() {
667            // nothing to request
668            return
669        }
670
671        // 4. filter out invalid entries (spam)
672        //
673        // validates messages with respect to the given network, e.g. allowed tx types
674        //
675        let mut should_report_peer = false;
676        let mut tx_types_counter = TxTypesCounter::default();
677
678        let is_eth68_message = partially_valid_msg
679            .msg_version()
680            .expect("partially valid announcement should have a version")
681            .is_eth68();
682
683        partially_valid_msg.retain(|tx_hash, metadata_ref_mut| {
684            let (ty_byte, size_val) = match *metadata_ref_mut {
685                Some((ty, size)) => {
686                    if !is_eth68_message {
687                        should_report_peer = true;
688                    }
689                    (ty, size)
690                }
691                None => {
692                    if is_eth68_message {
693                        should_report_peer = true;
694                        return false;
695                    }
696                    (0u8, 0)
697                }
698            };
699
700            if is_eth68_message &&
701                let Some((actual_ty_byte, _)) = *metadata_ref_mut &&
702                let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte)
703            {
704                tx_types_counter.increase_by_tx_type(parsed_tx_type);
705            }
706
707            let decision = self
708                .policies
709                .announcement_filter()
710                .decide_on_announcement(ty_byte, tx_hash, size_val);
711
712            match decision {
713                AnnouncementAcceptance::Accept => true,
714                AnnouncementAcceptance::Ignore => false,
715                AnnouncementAcceptance::Reject { penalize_peer } => {
716                    if penalize_peer {
717                        should_report_peer = true;
718                    }
719                    false
720                }
721            }
722        });
723
724        if is_eth68_message {
725            self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter);
726        }
727
728        if should_report_peer {
729            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
730        }
731
732        let mut valid_announcement_data =
733            ValidAnnouncementData::from_partially_valid_data(partially_valid_msg);
734
735        if valid_announcement_data.is_empty() {
736            // no valid announcement data
737            return
738        }
739
740        // 5. filter out already seen unknown hashes
741        //
742        // seen hashes are already in the tx fetcher, pending fetch.
743        //
744        // for any seen hashes add the peer as fallback. unseen hashes are loaded into the tx
745        // fetcher, hence they should be valid at this point.
746        let bad_imports = &self.bad_imports;
747        self.transaction_fetcher.filter_unseen_and_pending_hashes(
748            &mut valid_announcement_data,
749            |hash| bad_imports.contains(hash),
750            &peer_id,
751            &client,
752        );
753
754        if valid_announcement_data.is_empty() {
755            // nothing to request
756            return
757        }
758
759        trace!(target: "net::tx::propagation",
760            peer_id=format!("{peer_id:#}"),
761            hashes_len=valid_announcement_data.len(),
762            hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
763            msg_version=%valid_announcement_data.msg_version(),
764            client_version=%client,
765            "received previously unseen and pending hashes in announcement from peer"
766        );
767
768        // only send request for hashes to idle peer, otherwise buffer hashes storing peer as
769        // fallback
770        if !self.transaction_fetcher.is_idle(&peer_id) {
771            // load message version before announcement data is destructed in packing
772            let msg_version = valid_announcement_data.msg_version();
773            let (hashes, _version) = valid_announcement_data.into_request_hashes();
774
775            trace!(target: "net::tx",
776                peer_id=format!("{peer_id:#}"),
777                hashes=?*hashes,
778                %msg_version,
779                %client,
780                "buffering hashes announced by busy peer"
781            );
782
783            self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
784
785            return
786        }
787
788        let mut hashes_to_request =
789            RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
790        let surplus_hashes =
791            self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
792
793        if !surplus_hashes.is_empty() {
794            trace!(target: "net::tx",
795                peer_id=format!("{peer_id:#}"),
796                surplus_hashes=?*surplus_hashes,
797                %client,
798                "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
799            );
800
801            self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
802        }
803
804        trace!(target: "net::tx",
805            peer_id=format!("{peer_id:#}"),
806            hashes=?*hashes_to_request,
807            %client,
808            "sending hashes in `GetPooledTransactions` request to peer's session"
809        );
810
811        // request the missing transactions
812        //
813        // get handle to peer's session again, at this point we know it exists
814        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
815        if let Some(failed_to_request_hashes) =
816            self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
817        {
818            let conn_eth_version = peer.version;
819
820            trace!(target: "net::tx",
821                peer_id=format!("{peer_id:#}"),
822                failed_to_request_hashes=?*failed_to_request_hashes,
823                %conn_eth_version,
824                %client,
825                "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
826            );
827            self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
828        }
829    }
830}
831
832impl<Pool, N, PBundle> TransactionsManager<Pool, N, PBundle>
833where
834    Pool: TransactionPool + Unpin + 'static,
835
836    N: NetworkPrimitives<
837            BroadcastedTransaction: SignedTransaction,
838            PooledTransaction: SignedTransaction,
839        > + Unpin,
840
841    PBundle: TransactionPolicies,
842    Pool::Transaction:
843        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
844{
845    /// Invoked when transactions in the local mempool are considered __pending__.
846    ///
847    /// When a transaction in the local mempool is moved to the pending pool, we propagate them to
848    /// connected peers over network using the `Transactions` and `NewPooledTransactionHashes`
849    /// messages. The Transactions message relays complete transaction objects and is typically
850    /// sent to a small, random fraction of connected peers.
851    ///
852    /// All other peers receive a notification of the transaction hash and can request the
853    /// complete transaction object if it is unknown to them. The dissemination of complete
854    /// transactions to a fraction of peers usually ensures that all nodes receive the transaction
855    /// and won't need to request it.
856    fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
857        // Nothing to propagate while initially syncing
858        if self.network.is_initially_syncing() {
859            return
860        }
861        if self.network.tx_gossip_disabled() {
862            return
863        }
864
865        trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
866
867        self.propagate_all(hashes);
868    }
869
870    /// Propagate the full transactions to a specific peer.
871    ///
872    /// Returns the propagated transactions.
873    fn propagate_full_transactions_to_peer(
874        &mut self,
875        txs: Vec<TxHash>,
876        peer_id: PeerId,
877        propagation_mode: PropagationMode,
878    ) -> Option<PropagatedTransactions> {
879        trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
880
881        let peer = self.peers.get_mut(&peer_id)?;
882        let mut propagated = PropagatedTransactions::default();
883
884        // filter all transactions unknown to the peer
885        let mut full_transactions = FullTransactionsBuilder::new(peer.version);
886
887        let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
888
889        if propagation_mode.is_forced() {
890            // skip cache check if forced
891            full_transactions.extend(to_propagate);
892        } else {
893            // Iterate through the transactions to propagate and fill the hashes and full
894            // transaction
895            for tx in to_propagate {
896                if !peer.seen_transactions.contains(tx.tx_hash()) {
897                    // Only include if the peer hasn't seen the transaction
898                    full_transactions.push(&tx);
899                }
900            }
901        }
902
903        if full_transactions.is_empty() {
904            // nothing to propagate
905            return None
906        }
907
908        let PropagateTransactions { pooled, full } = full_transactions.build();
909
910        // send hashes if any
911        if let Some(new_pooled_hashes) = pooled {
912            for hash in new_pooled_hashes.iter_hashes().copied() {
913                propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
914                // mark transaction as seen by peer
915                peer.seen_transactions.insert(hash);
916            }
917
918            // send hashes of transactions
919            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
920        }
921
922        // send full transactions, if any
923        if let Some(new_full_transactions) = full {
924            for tx in &new_full_transactions {
925                propagated.0.entry(*tx.tx_hash()).or_default().push(PropagateKind::Full(peer_id));
926                // mark transaction as seen by peer
927                peer.seen_transactions.insert(*tx.tx_hash());
928            }
929
930            // send full transactions
931            self.network.send_transactions(peer_id, new_full_transactions);
932        }
933
934        // Update propagated transactions metrics
935        self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
936
937        Some(propagated)
938    }
939
940    /// Propagate the transaction hashes to the given peer
941    ///
942    /// Note: This will only send the hashes for transactions that exist in the pool.
943    fn propagate_hashes_to(
944        &mut self,
945        hashes: Vec<TxHash>,
946        peer_id: PeerId,
947        propagation_mode: PropagationMode,
948    ) {
949        trace!(target: "net::tx", "Start propagating transactions as hashes");
950
951        // This fetches a transactions from the pool, including the blob transactions, which are
952        // only ever sent as hashes.
953        let propagated = {
954            let Some(peer) = self.peers.get_mut(&peer_id) else {
955                // no such peer
956                return
957            };
958
959            let to_propagate = self
960                .pool
961                .get_all(hashes)
962                .into_iter()
963                .map(PropagateTransaction::pool_tx)
964                .collect::<Vec<_>>();
965
966            let mut propagated = PropagatedTransactions::default();
967
968            // check if transaction is known to peer
969            let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
970
971            if propagation_mode.is_forced() {
972                hashes.extend(to_propagate)
973            } else {
974                for tx in to_propagate {
975                    if !peer.seen_transactions.contains(tx.tx_hash()) {
976                        // Include if the peer hasn't seen it
977                        hashes.push(&tx);
978                    }
979                }
980            }
981
982            let new_pooled_hashes = hashes.build();
983
984            if new_pooled_hashes.is_empty() {
985                // nothing to propagate
986                return
987            }
988
989            for hash in new_pooled_hashes.iter_hashes().copied() {
990                propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
991            }
992
993            trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
994
995            // send hashes of transactions
996            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
997
998            // Update propagated transactions metrics
999            self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
1000
1001            propagated
1002        };
1003
1004        // notify pool so events get fired
1005        self.pool.on_propagated(propagated);
1006    }
1007
1008    /// Propagate the transactions to all connected peers either as full objects or hashes.
1009    ///
1010    /// The message for new pooled hashes depends on the negotiated version of the stream.
1011    /// See [`NewPooledTransactionHashes`]
1012    ///
1013    /// Note: EIP-4844 are disallowed from being broadcast in full and are only ever sent as hashes, see also <https://eips.ethereum.org/EIPS/eip-4844#networking>.
1014    fn propagate_transactions(
1015        &mut self,
1016        to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
1017        propagation_mode: PropagationMode,
1018    ) -> PropagatedTransactions {
1019        let mut propagated = PropagatedTransactions::default();
1020        if self.network.tx_gossip_disabled() {
1021            return propagated
1022        }
1023
1024        // send full transactions to a set of the connected peers based on the configured mode
1025        let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
1026
1027        // Note: Assuming ~random~ order due to random state of the peers map hasher
1028        for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
1029            if !self.policies.propagation_policy().can_propagate(peer) {
1030                // skip peers we should not propagate to
1031                continue
1032            }
1033            // determine whether to send full tx objects or hashes.
1034            let mut builder = if peer_idx > max_num_full {
1035                PropagateTransactionsBuilder::pooled(peer.version)
1036            } else {
1037                PropagateTransactionsBuilder::full(peer.version)
1038            };
1039
1040            if propagation_mode.is_forced() {
1041                builder.extend(to_propagate.iter());
1042            } else {
1043                // Iterate through the transactions to propagate and fill the hashes and full
1044                // transaction lists, before deciding whether or not to send full transactions to
1045                // the peer.
1046                for tx in &to_propagate {
1047                    // Only proceed if the transaction is not in the peer's list of seen
1048                    // transactions
1049                    if !peer.seen_transactions.contains(tx.tx_hash()) {
1050                        builder.push(tx);
1051                    }
1052                }
1053            }
1054
1055            if builder.is_empty() {
1056                trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
1057                continue
1058            }
1059
1060            let PropagateTransactions { pooled, full } = builder.build();
1061
1062            // send hashes if any
1063            if let Some(mut new_pooled_hashes) = pooled {
1064                // enforce tx soft limit per message for the (unlikely) event the number of
1065                // hashes exceeds it
1066                new_pooled_hashes
1067                    .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
1068
1069                for hash in new_pooled_hashes.iter_hashes().copied() {
1070                    propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
1071                    // mark transaction as seen by peer
1072                    peer.seen_transactions.insert(hash);
1073                }
1074
1075                trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
1076
1077                // send hashes of transactions
1078                self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
1079            }
1080
1081            // send full transactions, if any
1082            if let Some(new_full_transactions) = full {
1083                for tx in &new_full_transactions {
1084                    propagated
1085                        .0
1086                        .entry(*tx.tx_hash())
1087                        .or_default()
1088                        .push(PropagateKind::Full(*peer_id));
1089                    // mark transaction as seen by peer
1090                    peer.seen_transactions.insert(*tx.tx_hash());
1091                }
1092
1093                trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
1094
1095                // send full transactions
1096                self.network.send_transactions(*peer_id, new_full_transactions);
1097            }
1098        }
1099
1100        // Update propagated transactions metrics
1101        self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
1102
1103        propagated
1104    }
1105
1106    /// Propagates the given transactions to the peers
1107    ///
1108    /// This fetches all transaction from the pool, including the 4844 blob transactions but
1109    /// __without__ their sidecar, because 4844 transactions are only ever announced as hashes.
1110    fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1111        if self.peers.is_empty() {
1112            // nothing to propagate
1113            return
1114        }
1115        let propagated = self.propagate_transactions(
1116            self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1117            PropagationMode::Basic,
1118        );
1119
1120        // notify pool so events get fired
1121        self.pool.on_propagated(propagated);
1122    }
1123
1124    /// Request handler for an incoming request for transactions
1125    fn on_get_pooled_transactions(
1126        &mut self,
1127        peer_id: PeerId,
1128        request: GetPooledTransactions,
1129        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1130    ) {
1131        if let Some(peer) = self.peers.get_mut(&peer_id) {
1132            if self.network.tx_gossip_disabled() {
1133                let _ = response.send(Ok(PooledTransactions::default()));
1134                return
1135            }
1136            let transactions = self.pool.get_pooled_transaction_elements(
1137                request.0,
1138                GetPooledTransactionLimit::ResponseSizeSoftLimit(
1139                    self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1140                ),
1141            );
1142            trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1143
1144            // we sent a response at which point we assume that the peer is aware of the
1145            // transactions
1146            peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1147
1148            let resp = PooledTransactions(transactions);
1149            let _ = response.send(Ok(resp));
1150        }
1151    }
1152
1153    /// Handles a command received from a detached [`TransactionsHandle`]
1154    fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1155        match cmd {
1156            TransactionsCommand::PropagateHash(hash) => {
1157                self.on_new_pending_transactions(vec![hash])
1158            }
1159            TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1160                self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1161            }
1162            TransactionsCommand::GetActivePeers(tx) => {
1163                let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1164                tx.send(peers).ok();
1165            }
1166            TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1167                if let Some(propagated) =
1168                    self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1169                {
1170                    self.pool.on_propagated(propagated);
1171                }
1172            }
1173            TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1174            TransactionsCommand::BroadcastTransactions(txs) => {
1175                let propagated = self.propagate_transactions(txs, PropagationMode::Forced);
1176                self.pool.on_propagated(propagated);
1177            }
1178            TransactionsCommand::GetTransactionHashes { peers, tx } => {
1179                let mut res = HashMap::with_capacity(peers.len());
1180                for peer_id in peers {
1181                    let hashes = self
1182                        .peers
1183                        .get(&peer_id)
1184                        .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1185                        .unwrap_or_default();
1186                    res.insert(peer_id, hashes);
1187                }
1188                tx.send(res).ok();
1189            }
1190            TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1191                let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1192                peer_request_sender.send(sender).ok();
1193            }
1194        }
1195    }
1196
1197    /// Handles session establishment and peer transactions initialization.
1198    ///
1199    /// This is invoked when a new session is established.
1200    fn handle_peer_session(
1201        &mut self,
1202        info: SessionInfo,
1203        messages: PeerRequestSender<PeerRequest<N>>,
1204    ) {
1205        let SessionInfo { peer_id, client_version, version, .. } = info;
1206
1207        // Insert a new peer into the peerset.
1208        let peer = PeerMetadata::<N>::new(
1209            messages,
1210            version,
1211            client_version,
1212            self.config.max_transactions_seen_by_peer_history,
1213            info.peer_kind,
1214        );
1215        let peer = match self.peers.entry(peer_id) {
1216            Entry::Occupied(mut entry) => {
1217                entry.insert(peer);
1218                entry.into_mut()
1219            }
1220            Entry::Vacant(entry) => entry.insert(peer),
1221        };
1222
1223        self.policies.propagation_policy_mut().on_session_established(peer);
1224
1225        // Send a `NewPooledTransactionHashes` to the peer with up to
1226        // `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE`
1227        // transactions in the pool.
1228        if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1229            trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1230            return
1231        }
1232
1233        // Get transactions to broadcast
1234        let pooled_txs = self.pool.pooled_transactions_max(
1235            SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1236        );
1237        if pooled_txs.is_empty() {
1238            trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1239            return;
1240        }
1241
1242        // Build and send transaction hashes message
1243        let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1244        for pooled_tx in pooled_txs {
1245            peer.seen_transactions.insert(*pooled_tx.hash());
1246            msg_builder.push_pooled(pooled_tx);
1247        }
1248
1249        debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.is_empty(), "Broadcasting transaction hashes");
1250        let msg = msg_builder.build();
1251        self.network.send_transactions_hashes(peer_id, msg);
1252    }
1253
1254    /// Handles a received event related to common network events.
1255    fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1256        match event_result {
1257            NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1258                // remove the peer
1259
1260                let peer = self.peers.remove(&peer_id);
1261                if let Some(mut peer) = peer {
1262                    self.policies.propagation_policy_mut().on_session_closed(&mut peer);
1263                }
1264                self.transaction_fetcher.remove_peer(&peer_id);
1265            }
1266            NetworkEvent::ActivePeerSession { info, messages } => {
1267                // process active peer session and broadcast available transaction from the pool
1268                self.handle_peer_session(info, messages);
1269            }
1270            NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1271                let peer_id = info.peer_id;
1272                // get messages from existing peer
1273                let messages = match self.peers.get(&peer_id) {
1274                    Some(p) => p.request_tx.clone(),
1275                    None => {
1276                        debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1277                        return;
1278                    }
1279                };
1280                self.handle_peer_session(info, messages);
1281            }
1282            _ => {}
1283        }
1284    }
1285
1286    /// Handles dedicated transaction events related to the `eth` protocol.
1287    fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1288        match event {
1289            NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1290                // ensure we didn't receive any blob transactions as these are disallowed to be
1291                // broadcasted in full
1292
1293                let has_blob_txs = msg.has_eip4844();
1294
1295                let non_blob_txs = msg
1296                    .0
1297                    .into_iter()
1298                    .map(N::PooledTransaction::try_from)
1299                    .filter_map(Result::ok)
1300                    .collect();
1301
1302                self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1303
1304                if has_blob_txs {
1305                    debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1306                    self.report_peer_bad_transactions(peer_id);
1307                }
1308            }
1309            NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1310                self.on_new_pooled_transaction_hashes(peer_id, msg)
1311            }
1312            NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1313                self.on_get_pooled_transactions(peer_id, request, response)
1314            }
1315            NetworkTransactionEvent::GetTransactionsHandle(response) => {
1316                let _ = response.send(Some(self.handle()));
1317            }
1318        }
1319    }
1320
1321    /// Starts the import process for the given transactions.
1322    fn import_transactions(
1323        &mut self,
1324        peer_id: PeerId,
1325        transactions: PooledTransactions<N::PooledTransaction>,
1326        source: TransactionSource,
1327    ) {
1328        // If the node is pipeline syncing, ignore transactions
1329        if self.network.is_initially_syncing() {
1330            return
1331        }
1332        if self.network.tx_gossip_disabled() {
1333            return
1334        }
1335
1336        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1337        let mut transactions = transactions.0;
1338
1339        // mark the transactions as received
1340        self.transaction_fetcher
1341            .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| tx.tx_hash()));
1342
1343        // track that the peer knows these transaction, but only if this is a new broadcast.
1344        // If we received the transactions as the response to our `GetPooledTransactions``
1345        // requests (based on received `NewPooledTransactionHashes`) then we already
1346        // recorded the hashes as seen by this peer in `Self::on_new_pooled_transaction_hashes`.
1347        let mut num_already_seen_by_peer = 0;
1348        for tx in &transactions {
1349            if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1350                num_already_seen_by_peer += 1;
1351            }
1352        }
1353
1354        // 1. filter out txns already inserted into pool
1355        let txns_count_pre_pool_filter = transactions.len();
1356        self.pool.retain_unknown(&mut transactions);
1357        if txns_count_pre_pool_filter > transactions.len() {
1358            let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1359            self.metrics
1360                .occurrences_transactions_already_in_pool
1361                .increment(already_known_txns_count as u64);
1362        }
1363
1364        // tracks the quality of the given transactions
1365        let mut has_bad_transactions = false;
1366
1367        // 2. filter out transactions that are invalid or already pending import pre-size to avoid
1368        //    reallocations
1369        let mut new_txs = Vec::with_capacity(transactions.len());
1370        for tx in transactions {
1371            // recover transaction
1372            let tx = match tx.try_into_recovered() {
1373                Ok(tx) => tx,
1374                Err(badtx) => {
1375                    trace!(target: "net::tx",
1376                        peer_id=format!("{peer_id:#}"),
1377                        hash=%badtx.tx_hash(),
1378                        client_version=%peer.client_version,
1379                        "failed ecrecovery for transaction"
1380                    );
1381                    has_bad_transactions = true;
1382                    continue
1383                }
1384            };
1385
1386            match self.transactions_by_peers.entry(*tx.tx_hash()) {
1387                Entry::Occupied(mut entry) => {
1388                    // transaction was already inserted
1389                    entry.get_mut().insert(peer_id);
1390                }
1391                Entry::Vacant(entry) => {
1392                    if self.bad_imports.contains(tx.tx_hash()) {
1393                        trace!(target: "net::tx",
1394                            peer_id=format!("{peer_id:#}"),
1395                            hash=%tx.tx_hash(),
1396                            client_version=%peer.client_version,
1397                            "received a known bad transaction from peer"
1398                        );
1399                        has_bad_transactions = true;
1400                    } else {
1401                        // this is a new transaction that should be imported into the pool
1402
1403                        let pool_transaction = Pool::Transaction::from_pooled(tx);
1404                        new_txs.push(pool_transaction);
1405
1406                        entry.insert(HashSet::from([peer_id]));
1407                    }
1408                }
1409            }
1410        }
1411        new_txs.shrink_to_fit();
1412
1413        // 3. import new transactions as a batch to minimize lock contention on the underlying
1414        // pool
1415        if !new_txs.is_empty() {
1416            let pool = self.pool.clone();
1417            // update metrics
1418            let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1419            metric_pending_pool_imports.increment(new_txs.len() as f64);
1420
1421            // update self-monitoring info
1422            self.pending_pool_imports_info
1423                .pending_pool_imports
1424                .fetch_add(new_txs.len(), Ordering::Relaxed);
1425            let tx_manager_info_pending_pool_imports =
1426                self.pending_pool_imports_info.pending_pool_imports.clone();
1427
1428            trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1429            let import = Box::pin(async move {
1430                let added = new_txs.len();
1431                let res = pool.add_external_transactions(new_txs).await;
1432
1433                // update metrics
1434                metric_pending_pool_imports.decrement(added as f64);
1435                // update self-monitoring info
1436                tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1437
1438                res
1439            });
1440
1441            self.pool_imports.push(import);
1442        }
1443
1444        if num_already_seen_by_peer > 0 {
1445            self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1446            self.metrics
1447                .occurrences_of_transaction_already_seen_by_peer
1448                .increment(num_already_seen_by_peer);
1449            trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions");
1450        }
1451
1452        if has_bad_transactions {
1453            // peer sent us invalid transactions
1454            self.report_peer_bad_transactions(peer_id)
1455        }
1456
1457        if num_already_seen_by_peer > 0 {
1458            self.report_already_seen(peer_id);
1459        }
1460    }
1461
1462    /// Processes a [`FetchEvent`].
1463    fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1464        match fetch_event {
1465            FetchEvent::TransactionsFetched { peer_id, transactions, report_peer } => {
1466                self.import_transactions(peer_id, transactions, TransactionSource::Response);
1467                if report_peer {
1468                    self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
1469                }
1470            }
1471            FetchEvent::FetchError { peer_id, error } => {
1472                trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1473                self.on_request_error(peer_id, error);
1474            }
1475            FetchEvent::EmptyResponse { peer_id } => {
1476                trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1477            }
1478        }
1479    }
1480}
1481
1482/// An endless future. Preemption ensure that future is non-blocking, nonetheless. See
1483/// [`crate::NetworkManager`] for more context on the design pattern.
1484///
1485/// This should be spawned or used as part of `tokio::select!`.
1486//
1487// spawned in `NodeConfig::start_network`(reth_node_core::NodeConfig) and
1488// `NetworkConfig::start_network`(reth_network::NetworkConfig)
1489impl<
1490        Pool: TransactionPool + Unpin + 'static,
1491        N: NetworkPrimitives<
1492                BroadcastedTransaction: SignedTransaction,
1493                PooledTransaction: SignedTransaction,
1494            > + Unpin,
1495        PBundle: TransactionPolicies + Unpin,
1496    > Future for TransactionsManager<Pool, N, PBundle>
1497where
1498    Pool::Transaction:
1499        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1500{
1501    type Output = ();
1502
1503    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1504        let start = Instant::now();
1505        let mut poll_durations = TxManagerPollDurations::default();
1506
1507        let this = self.get_mut();
1508
1509        // All streams are polled until their corresponding budget is exhausted, then we manually
1510        // yield back control to tokio. See `NetworkManager` for more context on the design
1511        // pattern.
1512
1513        // Advance network/peer related events (update peers map).
1514        let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1515            poll_durations.acc_network_events,
1516            "net::tx",
1517            "Network events stream",
1518            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1519            this.network_events.poll_next_unpin(cx),
1520            |event| this.on_network_event(event)
1521        );
1522
1523        // Advances new __pending__ transactions, transactions that were successfully inserted into
1524        // pending set in pool (are valid), and propagates them (inform peers which
1525        // transactions we have seen).
1526        //
1527        // We try to drain this to batch the transactions in a single message.
1528        //
1529        // We don't expect this buffer to be large, since only pending transactions are
1530        // emitted here.
1531        let mut new_txs = Vec::new();
1532        let maybe_more_pending_txns = metered_poll_nested_stream_with_budget!(
1533            poll_durations.acc_imported_txns,
1534            "net::tx",
1535            "Pending transactions stream",
1536            DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
1537            this.pending_transactions.poll_next_unpin(cx),
1538            |hash| new_txs.push(hash)
1539        );
1540        if !new_txs.is_empty() {
1541            this.on_new_pending_transactions(new_txs);
1542        }
1543
1544        // Advance inflight fetch requests (flush transaction fetcher and queue for
1545        // import to pool).
1546        //
1547        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1548        // (2 MiB / 10 bytes > 200k transactions).
1549        //
1550        // Since transactions aren't validated until they are inserted into the pool,
1551        // this can potentially queue >200k transactions for insertion to pool. More
1552        // if the message size is bigger than the soft limit on a `PooledTransactions`
1553        // response which is 2 MiB.
1554        let maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1555            poll_durations.acc_fetch_events,
1556            "net::tx",
1557            "Transaction fetch events stream",
1558            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1559            this.transaction_fetcher.poll_next_unpin(cx),
1560            |event| this.on_fetch_event(event),
1561        );
1562
1563        // Advance incoming transaction events (stream new txns/announcements from
1564        // network manager and queue for import to pool/fetch txns).
1565        //
1566        // This will potentially remove hashes from hashes pending fetch, it the event
1567        // is an announcement (if same hashes are announced that didn't fit into a
1568        // previous request).
1569        //
1570        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1571        // (128 KiB / 10 bytes > 13k transactions).
1572        //
1573        // If this is an event with `Transactions` message, since transactions aren't
1574        // validated until they are inserted into the pool, this can potentially queue
1575        // >13k transactions for insertion to pool. More if the message size is bigger
1576        // than the soft limit on a `Transactions` broadcast message, which is 128 KiB.
1577        let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1578            poll_durations.acc_tx_events,
1579            "net::tx",
1580            "Network transaction events stream",
1581            DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1582            this.transaction_events.poll_next_unpin(cx),
1583            |event| this.on_network_tx_event(event),
1584        );
1585
1586        // Advance pool imports (flush txns to pool).
1587        //
1588        // Note, this is done in batches. A batch is filled from one `Transactions`
1589        // broadcast messages or one `PooledTransactions` response at a time. The
1590        // minimum batch size is 1 transaction (and might often be the case with blob
1591        // transactions).
1592        //
1593        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1594        // (2 MiB / 10 bytes > 200k transactions).
1595        //
1596        // Since transactions aren't validated until they are inserted into the pool,
1597        // this can potentially validate >200k transactions. More if the message size
1598        // is bigger than the soft limit on a `PooledTransactions` response which is
1599        // 2 MiB (`Transactions` broadcast messages is smaller, 128 KiB).
1600        let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1601            poll_durations.acc_pending_imports,
1602            "net::tx",
1603            "Batched pool imports stream",
1604            DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1605            this.pool_imports.poll_next_unpin(cx),
1606            |batch_results| this.on_batch_import_result(batch_results)
1607        );
1608
1609        // Tries to drain hashes pending fetch cache if the tx manager currently has
1610        // capacity for this (fetch txns).
1611        //
1612        // Sends at most one request.
1613        duration_metered_exec!(
1614            {
1615                if this.has_capacity_for_fetching_pending_hashes() {
1616                    this.on_fetch_hashes_pending_fetch();
1617                }
1618            },
1619            poll_durations.acc_pending_fetch
1620        );
1621
1622        // Advance commands (propagate/fetch/serve txns).
1623        let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1624            poll_durations.acc_cmds,
1625            "net::tx",
1626            "Commands channel",
1627            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1628            this.command_rx.poll_next_unpin(cx),
1629            |cmd| this.on_command(cmd)
1630        );
1631
1632        this.transaction_fetcher.update_metrics();
1633
1634        // all channels are fully drained and import futures pending
1635        if maybe_more_network_events ||
1636            maybe_more_commands ||
1637            maybe_more_tx_events ||
1638            maybe_more_tx_fetch_events ||
1639            maybe_more_pool_imports ||
1640            maybe_more_pending_txns
1641        {
1642            // make sure we're woken up again
1643            cx.waker().wake_by_ref();
1644            return Poll::Pending
1645        }
1646
1647        this.update_poll_metrics(start, poll_durations);
1648
1649        Poll::Pending
1650    }
1651}
1652
1653/// Represents the different modes of transaction propagation.
1654///
1655/// This enum is used to determine how transactions are propagated to peers in the network.
1656#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1657enum PropagationMode {
1658    /// Default propagation mode.
1659    ///
1660    /// Transactions are only sent to peers that haven't seen them yet.
1661    Basic,
1662    /// Forced propagation mode.
1663    ///
1664    /// Transactions are sent to all peers regardless of whether they have been sent or received
1665    /// before.
1666    Forced,
1667}
1668
1669impl PropagationMode {
1670    /// Returns `true` if the propagation kind is `Forced`.
1671    const fn is_forced(self) -> bool {
1672        matches!(self, Self::Forced)
1673    }
1674}
1675
1676/// A transaction that's about to be propagated to multiple peers.
1677#[derive(Debug, Clone)]
1678struct PropagateTransaction<T = TransactionSigned> {
1679    size: usize,
1680    transaction: Arc<T>,
1681}
1682
1683impl<T: SignedTransaction> PropagateTransaction<T> {
1684    /// Create a new instance from a transaction.
1685    pub fn new(transaction: T) -> Self {
1686        let size = transaction.length();
1687        Self { size, transaction: Arc::new(transaction) }
1688    }
1689
1690    /// Create a new instance from a pooled transaction
1691    fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1692    where
1693        P: PoolTransaction<Consensus = T>,
1694    {
1695        let size = tx.encoded_length();
1696        let transaction = tx.transaction.clone_into_consensus();
1697        let transaction = Arc::new(transaction.into_inner());
1698        Self { size, transaction }
1699    }
1700
1701    fn tx_hash(&self) -> &TxHash {
1702        self.transaction.tx_hash()
1703    }
1704}
1705
1706/// Helper type to construct the appropriate message to send to the peer based on whether the peer
1707/// should receive them in full or as pooled
1708#[derive(Debug, Clone)]
1709enum PropagateTransactionsBuilder<T> {
1710    Pooled(PooledTransactionsHashesBuilder),
1711    Full(FullTransactionsBuilder<T>),
1712}
1713
1714impl<T> PropagateTransactionsBuilder<T> {
1715    /// Create a builder for pooled transactions
1716    fn pooled(version: EthVersion) -> Self {
1717        Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1718    }
1719
1720    /// Create a builder that sends transactions in full and records transactions that don't fit.
1721    fn full(version: EthVersion) -> Self {
1722        Self::Full(FullTransactionsBuilder::new(version))
1723    }
1724
1725    /// Returns true if no transactions are recorded.
1726    fn is_empty(&self) -> bool {
1727        match self {
1728            Self::Pooled(builder) => builder.is_empty(),
1729            Self::Full(builder) => builder.is_empty(),
1730        }
1731    }
1732
1733    /// Consumes the type and returns the built messages that should be sent to the peer.
1734    fn build(self) -> PropagateTransactions<T> {
1735        match self {
1736            Self::Pooled(pooled) => {
1737                PropagateTransactions { pooled: Some(pooled.build()), full: None }
1738            }
1739            Self::Full(full) => full.build(),
1740        }
1741    }
1742}
1743
1744impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1745    /// Appends all transactions
1746    fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1747        for tx in txs {
1748            self.push(tx);
1749        }
1750    }
1751
1752    /// Appends a transaction to the list.
1753    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1754        match self {
1755            Self::Pooled(builder) => builder.push(transaction),
1756            Self::Full(builder) => builder.push(transaction),
1757        }
1758    }
1759}
1760
1761/// Represents how the transactions should be sent to a peer if any.
1762struct PropagateTransactions<T> {
1763    /// The pooled transaction hashes to send.
1764    pooled: Option<NewPooledTransactionHashes>,
1765    /// The transactions to send in full.
1766    full: Option<Vec<Arc<T>>>,
1767}
1768
1769/// Helper type for constructing the full transaction message that enforces the
1770/// [`DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE`] for full transaction broadcast
1771/// and enforces other propagation rules for EIP-4844 and tracks those transactions that can't be
1772/// broadcasted in full.
1773#[derive(Debug, Clone)]
1774struct FullTransactionsBuilder<T> {
1775    /// The soft limit to enforce for a single broadcast message of full transactions.
1776    total_size: usize,
1777    /// All transactions to be broadcasted.
1778    transactions: Vec<Arc<T>>,
1779    /// Transactions that didn't fit into the broadcast message
1780    pooled: PooledTransactionsHashesBuilder,
1781}
1782
1783impl<T> FullTransactionsBuilder<T> {
1784    /// Create a builder for the negotiated version of the peer's session
1785    fn new(version: EthVersion) -> Self {
1786        Self {
1787            total_size: 0,
1788            pooled: PooledTransactionsHashesBuilder::new(version),
1789            transactions: vec![],
1790        }
1791    }
1792
1793    /// Returns whether or not any transactions are in the [`FullTransactionsBuilder`].
1794    fn is_empty(&self) -> bool {
1795        self.transactions.is_empty() && self.pooled.is_empty()
1796    }
1797
1798    /// Returns the messages that should be propagated to the peer.
1799    fn build(self) -> PropagateTransactions<T> {
1800        let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1801        let full = Some(self.transactions).filter(|full| !full.is_empty());
1802        PropagateTransactions { pooled, full }
1803    }
1804}
1805
1806impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1807    /// Appends all transactions.
1808    fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1809        for tx in txs {
1810            self.push(&tx)
1811        }
1812    }
1813
1814    /// Append a transaction to the list of full transaction if the total message bytes size doesn't
1815    /// exceed the soft maximum target byte size. The limit is soft, meaning if one single
1816    /// transaction goes over the limit, it will be broadcasted in its own [`Transactions`]
1817    /// message. The same pattern is followed in filling a [`GetPooledTransactions`] request in
1818    /// [`TransactionFetcher::fill_request_from_hashes_pending_fetch`].
1819    ///
1820    /// If the transaction is unsuitable for broadcast or would exceed the softlimit, it is appended
1821    /// to list of pooled transactions, (e.g. 4844 transactions).
1822    /// See also [`SignedTransaction::is_broadcastable_in_full`].
1823    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1824        // Do not send full 4844 transaction hashes to peers.
1825        //
1826        //  Nodes MUST NOT automatically broadcast blob transactions to their peers.
1827        //  Instead, those transactions are only announced using
1828        //  `NewPooledTransactionHashes` messages, and can then be manually requested
1829        //  via `GetPooledTransactions`.
1830        //
1831        // From: <https://eips.ethereum.org/EIPS/eip-4844#networking>
1832        if !transaction.transaction.is_broadcastable_in_full() {
1833            self.pooled.push(transaction);
1834            return
1835        }
1836
1837        let new_size = self.total_size + transaction.size;
1838        if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1839            self.total_size > 0
1840        {
1841            // transaction does not fit into the message
1842            self.pooled.push(transaction);
1843            return
1844        }
1845
1846        self.total_size = new_size;
1847        self.transactions.push(Arc::clone(&transaction.transaction));
1848    }
1849}
1850
1851/// A helper type to create the pooled transactions message based on the negotiated version of the
1852/// session with the peer
1853#[derive(Debug, Clone)]
1854enum PooledTransactionsHashesBuilder {
1855    Eth66(NewPooledTransactionHashes66),
1856    Eth68(NewPooledTransactionHashes68),
1857}
1858
1859// === impl PooledTransactionsHashesBuilder ===
1860
1861impl PooledTransactionsHashesBuilder {
1862    /// Push a transaction from the pool to the list.
1863    fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1864        match self {
1865            Self::Eth66(msg) => msg.0.push(*pooled_tx.hash()),
1866            Self::Eth68(msg) => {
1867                msg.hashes.push(*pooled_tx.hash());
1868                msg.sizes.push(pooled_tx.encoded_length());
1869                msg.types.push(pooled_tx.transaction.ty());
1870            }
1871        }
1872    }
1873
1874    /// Returns whether or not any transactions are in the [`PooledTransactionsHashesBuilder`].
1875    fn is_empty(&self) -> bool {
1876        match self {
1877            Self::Eth66(hashes) => hashes.is_empty(),
1878            Self::Eth68(hashes) => hashes.is_empty(),
1879        }
1880    }
1881
1882    /// Appends all hashes
1883    fn extend<T: SignedTransaction>(
1884        &mut self,
1885        txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1886    ) {
1887        for tx in txs {
1888            self.push(&tx);
1889        }
1890    }
1891
1892    fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1893        match self {
1894            Self::Eth66(msg) => msg.0.push(*tx.tx_hash()),
1895            Self::Eth68(msg) => {
1896                msg.hashes.push(*tx.tx_hash());
1897                msg.sizes.push(tx.size);
1898                msg.types.push(tx.transaction.ty());
1899            }
1900        }
1901    }
1902
1903    /// Create a builder for the negotiated version of the peer's session
1904    fn new(version: EthVersion) -> Self {
1905        match version {
1906            EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1907            EthVersion::Eth68 | EthVersion::Eth69 => Self::Eth68(Default::default()),
1908        }
1909    }
1910
1911    fn build(self) -> NewPooledTransactionHashes {
1912        match self {
1913            Self::Eth66(msg) => msg.into(),
1914            Self::Eth68(msg) => msg.into(),
1915        }
1916    }
1917}
1918
1919/// How we received the transactions.
1920enum TransactionSource {
1921    /// Transactions were broadcast to us via [`Transactions`] message.
1922    Broadcast,
1923    /// Transactions were sent as the response of [`fetcher::GetPooledTxRequest`] issued by us.
1924    Response,
1925}
1926
1927// === impl TransactionSource ===
1928
1929impl TransactionSource {
1930    /// Whether the transaction were sent as broadcast.
1931    const fn is_broadcast(&self) -> bool {
1932        matches!(self, Self::Broadcast)
1933    }
1934}
1935
1936/// Tracks a single peer in the context of [`TransactionsManager`].
1937#[derive(Debug)]
1938pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
1939    /// Optimistically keeps track of transactions that we know the peer has seen. Optimistic, in
1940    /// the sense that transactions are preemptively marked as seen by peer when they are sent to
1941    /// the peer.
1942    seen_transactions: LruCache<TxHash>,
1943    /// A communication channel directly to the peer's session task.
1944    request_tx: PeerRequestSender<PeerRequest<N>>,
1945    /// negotiated version of the session.
1946    version: EthVersion,
1947    /// The peer's client version.
1948    client_version: Arc<str>,
1949    /// The kind of peer.
1950    peer_kind: PeerKind,
1951}
1952
1953impl<N: NetworkPrimitives> PeerMetadata<N> {
1954    /// Returns a new instance of [`PeerMetadata`].
1955    pub fn new(
1956        request_tx: PeerRequestSender<PeerRequest<N>>,
1957        version: EthVersion,
1958        client_version: Arc<str>,
1959        max_transactions_seen_by_peer: u32,
1960        peer_kind: PeerKind,
1961    ) -> Self {
1962        Self {
1963            seen_transactions: LruCache::new(max_transactions_seen_by_peer),
1964            request_tx,
1965            version,
1966            client_version,
1967            peer_kind,
1968        }
1969    }
1970
1971    /// Returns a reference to the peer's request sender channel.
1972    pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
1973        &self.request_tx
1974    }
1975
1976    /// Return a
1977    pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
1978        &mut self.seen_transactions
1979    }
1980
1981    /// Returns the negotiated `EthVersion` of the session.
1982    pub const fn version(&self) -> EthVersion {
1983        self.version
1984    }
1985
1986    /// Returns a reference to the peer's client version string.
1987    pub fn client_version(&self) -> &str {
1988        &self.client_version
1989    }
1990
1991    /// Returns the peer's kind.
1992    pub const fn peer_kind(&self) -> PeerKind {
1993        self.peer_kind
1994    }
1995}
1996
1997/// Commands to send to the [`TransactionsManager`]
1998#[derive(Debug)]
1999enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
2000    /// Propagate a transaction hash to the network.
2001    PropagateHash(B256),
2002    /// Propagate transaction hashes to a specific peer.
2003    PropagateHashesTo(Vec<B256>, PeerId),
2004    /// Request the list of active peer IDs from the [`TransactionsManager`].
2005    GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
2006    /// Propagate a collection of full transactions to a specific peer.
2007    PropagateTransactionsTo(Vec<TxHash>, PeerId),
2008    /// Propagate a collection of hashes to all peers.
2009    PropagateTransactions(Vec<TxHash>),
2010    /// Propagate a collection of broadcastable transactions in full to all peers.
2011    BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
2012    /// Request transaction hashes known by specific peers from the [`TransactionsManager`].
2013    GetTransactionHashes {
2014        peers: Vec<PeerId>,
2015        tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
2016    },
2017    /// Requests a clone of the sender channel to the peer.
2018    GetPeerSender {
2019        peer_id: PeerId,
2020        peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
2021    },
2022}
2023
2024/// All events related to transactions emitted by the network.
2025#[derive(Debug)]
2026pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
2027    /// Represents the event of receiving a list of transactions from a peer.
2028    ///
2029    /// This indicates transactions that were broadcasted to us from the peer.
2030    IncomingTransactions {
2031        /// The ID of the peer from which the transactions were received.
2032        peer_id: PeerId,
2033        /// The received transactions.
2034        msg: Transactions<N::BroadcastedTransaction>,
2035    },
2036    /// Represents the event of receiving a list of transaction hashes from a peer.
2037    IncomingPooledTransactionHashes {
2038        /// The ID of the peer from which the transaction hashes were received.
2039        peer_id: PeerId,
2040        /// The received new pooled transaction hashes.
2041        msg: NewPooledTransactionHashes,
2042    },
2043    /// Represents the event of receiving a `GetPooledTransactions` request from a peer.
2044    GetPooledTransactions {
2045        /// The ID of the peer from which the request was received.
2046        peer_id: PeerId,
2047        /// The received `GetPooledTransactions` request.
2048        request: GetPooledTransactions,
2049        /// The sender for responding to the request with a result of `PooledTransactions`.
2050        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
2051    },
2052    /// Represents the event of receiving a `GetTransactionsHandle` request.
2053    GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
2054}
2055
2056/// Tracks stats about the [`TransactionsManager`].
2057#[derive(Debug)]
2058pub struct PendingPoolImportsInfo {
2059    /// Number of transactions about to be inserted into the pool.
2060    pending_pool_imports: Arc<AtomicUsize>,
2061    /// Max number of transactions allowed to be imported concurrently.
2062    max_pending_pool_imports: usize,
2063}
2064
2065impl PendingPoolImportsInfo {
2066    /// Returns a new [`PendingPoolImportsInfo`].
2067    pub fn new(max_pending_pool_imports: usize) -> Self {
2068        Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
2069    }
2070
2071    /// Returns `true` if the number of pool imports is under a given tolerated max.
2072    pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
2073        self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
2074    }
2075}
2076
2077impl Default for PendingPoolImportsInfo {
2078    fn default() -> Self {
2079        Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
2080    }
2081}
2082
2083#[derive(Debug, Default)]
2084struct TxManagerPollDurations {
2085    acc_network_events: Duration,
2086    acc_pending_imports: Duration,
2087    acc_tx_events: Duration,
2088    acc_imported_txns: Duration,
2089    acc_fetch_events: Duration,
2090    acc_pending_fetch: Duration,
2091    acc_cmds: Duration,
2092}
2093
2094#[cfg(test)]
2095mod tests {
2096    use super::*;
2097    use crate::{
2098        test_utils::{
2099            transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
2100            Testnet,
2101        },
2102        transactions::config::RelaxedEthAnnouncementFilter,
2103        NetworkConfigBuilder, NetworkManager,
2104    };
2105    use alloy_consensus::{TxEip1559, TxLegacy};
2106    use alloy_primitives::{hex, Signature, TxKind, U256};
2107    use alloy_rlp::Decodable;
2108    use futures::FutureExt;
2109    use reth_chainspec::MIN_TRANSACTION_GAS;
2110    use reth_ethereum_primitives::{PooledTransactionVariant, Transaction, TransactionSigned};
2111    use reth_network_api::{NetworkInfo, PeerKind};
2112    use reth_network_p2p::{
2113        error::{RequestError, RequestResult},
2114        sync::{NetworkSyncUpdater, SyncState},
2115    };
2116    use reth_storage_api::noop::NoopProvider;
2117    use reth_transaction_pool::test_utils::{
2118        testing_pool, MockTransaction, MockTransactionFactory, TestPool,
2119    };
2120    use secp256k1::SecretKey;
2121    use std::{
2122        future::poll_fn,
2123        net::{IpAddr, Ipv4Addr, SocketAddr},
2124        str::FromStr,
2125    };
2126    use tracing::error;
2127
2128    #[tokio::test(flavor = "multi_thread")]
2129    async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2130        reth_tracing::init_test_tracing();
2131        let net = Testnet::create(3).await;
2132
2133        let mut handles = net.handles();
2134        let handle0 = handles.next().unwrap();
2135        let handle1 = handles.next().unwrap();
2136
2137        drop(handles);
2138        let handle = net.spawn();
2139
2140        let listener0 = handle0.event_listener();
2141        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2142        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2143
2144        let client = NoopProvider::default();
2145        let pool = testing_pool();
2146        let config = NetworkConfigBuilder::eth(secret_key)
2147            .disable_discovery()
2148            .listener_port(0)
2149            .build(client);
2150        let transactions_manager_config = config.transactions_manager_config.clone();
2151        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2152            .await
2153            .unwrap()
2154            .into_builder()
2155            .transactions(pool.clone(), transactions_manager_config)
2156            .split_with_handle();
2157
2158        tokio::task::spawn(network);
2159
2160        // go to syncing (pipeline sync)
2161        network_handle.update_sync_state(SyncState::Syncing);
2162        assert!(NetworkInfo::is_syncing(&network_handle));
2163        assert!(NetworkInfo::is_initially_syncing(&network_handle));
2164
2165        // wait for all initiator connections
2166        let mut established = listener0.take(2);
2167        while let Some(ev) = established.next().await {
2168            match ev {
2169                NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2170                    // to insert a new peer in transactions peerset
2171                    transactions
2172                        .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2173                }
2174                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2175                ev => {
2176                    error!("unexpected event {ev:?}")
2177                }
2178            }
2179        }
2180        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2181        let input = hex!(
2182            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2183        );
2184        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2185        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2186            peer_id: *handle1.peer_id(),
2187            msg: Transactions(vec![signed_tx.clone()]),
2188        });
2189        poll_fn(|cx| {
2190            let _ = transactions.poll_unpin(cx);
2191            Poll::Ready(())
2192        })
2193        .await;
2194        assert!(pool.is_empty());
2195        handle.terminate().await;
2196    }
2197
2198    #[tokio::test(flavor = "multi_thread")]
2199    async fn test_tx_broadcasts_through_two_syncs() {
2200        reth_tracing::init_test_tracing();
2201        let net = Testnet::create(3).await;
2202
2203        let mut handles = net.handles();
2204        let handle0 = handles.next().unwrap();
2205        let handle1 = handles.next().unwrap();
2206
2207        drop(handles);
2208        let handle = net.spawn();
2209
2210        let listener0 = handle0.event_listener();
2211        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2212        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2213
2214        let client = NoopProvider::default();
2215        let pool = testing_pool();
2216        let config = NetworkConfigBuilder::new(secret_key)
2217            .disable_discovery()
2218            .listener_port(0)
2219            .build(client);
2220        let transactions_manager_config = config.transactions_manager_config.clone();
2221        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2222            .await
2223            .unwrap()
2224            .into_builder()
2225            .transactions(pool.clone(), transactions_manager_config)
2226            .split_with_handle();
2227
2228        tokio::task::spawn(network);
2229
2230        // go to syncing (pipeline sync) to idle and then to syncing (live)
2231        network_handle.update_sync_state(SyncState::Syncing);
2232        assert!(NetworkInfo::is_syncing(&network_handle));
2233        network_handle.update_sync_state(SyncState::Idle);
2234        assert!(!NetworkInfo::is_syncing(&network_handle));
2235        network_handle.update_sync_state(SyncState::Syncing);
2236        assert!(NetworkInfo::is_syncing(&network_handle));
2237
2238        // wait for all initiator connections
2239        let mut established = listener0.take(2);
2240        while let Some(ev) = established.next().await {
2241            match ev {
2242                NetworkEvent::ActivePeerSession { .. } |
2243                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2244                    // to insert a new peer in transactions peerset
2245                    transactions.on_network_event(ev);
2246                }
2247                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2248                _ => {
2249                    error!("unexpected event {ev:?}")
2250                }
2251            }
2252        }
2253        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2254        let input = hex!(
2255            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2256        );
2257        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2258        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2259            peer_id: *handle1.peer_id(),
2260            msg: Transactions(vec![signed_tx.clone()]),
2261        });
2262        poll_fn(|cx| {
2263            let _ = transactions.poll_unpin(cx);
2264            Poll::Ready(())
2265        })
2266        .await;
2267        assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2268        assert!(NetworkInfo::is_syncing(&network_handle));
2269        assert!(!pool.is_empty());
2270        handle.terminate().await;
2271    }
2272
2273    // Ensure that the transaction manager correctly handles the `IncomingPooledTransactionHashes`
2274    // event and is able to retrieve the corresponding transactions.
2275    #[tokio::test(flavor = "multi_thread")]
2276    async fn test_handle_incoming_transactions_hashes() {
2277        reth_tracing::init_test_tracing();
2278
2279        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2280        let client = NoopProvider::default();
2281
2282        let config = NetworkConfigBuilder::new(secret_key)
2283            // let OS choose port
2284            .listener_port(0)
2285            .disable_discovery()
2286            .build(client);
2287
2288        let pool = testing_pool();
2289
2290        let transactions_manager_config = config.transactions_manager_config.clone();
2291        let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2292            .await
2293            .unwrap()
2294            .into_builder()
2295            .transactions(pool.clone(), transactions_manager_config)
2296            .split_with_handle();
2297
2298        let peer_id_1 = PeerId::new([1; 64]);
2299        let eth_version = EthVersion::Eth66;
2300
2301        let txs = vec![TransactionSigned::new_unhashed(
2302            Transaction::Legacy(TxLegacy {
2303                chain_id: Some(4),
2304                nonce: 15u64,
2305                gas_price: 2200000000,
2306                gas_limit: 34811,
2307                to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2308                value: U256::from(1234u64),
2309                input: Default::default(),
2310            }),
2311            Signature::new(
2312                U256::from_str(
2313                    "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2314                )
2315                .unwrap(),
2316                U256::from_str(
2317                    "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2318                )
2319                .unwrap(),
2320                true,
2321            ),
2322        )];
2323
2324        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2325
2326        let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2327        tx_manager.peers.insert(peer_id_1, peer_1);
2328
2329        assert!(pool.is_empty());
2330
2331        tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2332            peer_id: peer_id_1,
2333            msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2334                txs_hashes.clone(),
2335            )),
2336        });
2337
2338        // mock session of peer_1 receives request
2339        let req = to_mock_session_rx
2340            .recv()
2341            .await
2342            .expect("peer_1 session should receive request with buffered hashes");
2343        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2344        assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2345
2346        let message: Vec<PooledTransactionVariant> = txs
2347            .into_iter()
2348            .map(|tx| {
2349                PooledTransactionVariant::try_from(tx)
2350                    .expect("Failed to convert MockTransaction to PooledTransaction")
2351            })
2352            .collect();
2353
2354        // return the transactions corresponding to the transaction hashes.
2355        response
2356            .send(Ok(PooledTransactions(message)))
2357            .expect("should send peer_1 response to tx manager");
2358
2359        // adance the transaction manager future
2360        poll_fn(|cx| {
2361            let _ = tx_manager.poll_unpin(cx);
2362            Poll::Ready(())
2363        })
2364        .await;
2365
2366        // ensure that the transactions corresponding to the transaction hashes have been
2367        // successfully retrieved and stored in the Pool.
2368        assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2369    }
2370
2371    #[tokio::test(flavor = "multi_thread")]
2372    async fn test_handle_incoming_transactions() {
2373        reth_tracing::init_test_tracing();
2374        let net = Testnet::create(3).await;
2375
2376        let mut handles = net.handles();
2377        let handle0 = handles.next().unwrap();
2378        let handle1 = handles.next().unwrap();
2379
2380        drop(handles);
2381        let handle = net.spawn();
2382
2383        let listener0 = handle0.event_listener();
2384
2385        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2386        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2387
2388        let client = NoopProvider::default();
2389        let pool = testing_pool();
2390        let config = NetworkConfigBuilder::new(secret_key)
2391            .disable_discovery()
2392            .listener_port(0)
2393            .build(client);
2394        let transactions_manager_config = config.transactions_manager_config.clone();
2395        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2396            .await
2397            .unwrap()
2398            .into_builder()
2399            .transactions(pool.clone(), transactions_manager_config)
2400            .split_with_handle();
2401        tokio::task::spawn(network);
2402
2403        network_handle.update_sync_state(SyncState::Idle);
2404
2405        assert!(!NetworkInfo::is_syncing(&network_handle));
2406
2407        // wait for all initiator connections
2408        let mut established = listener0.take(2);
2409        while let Some(ev) = established.next().await {
2410            match ev {
2411                NetworkEvent::ActivePeerSession { .. } |
2412                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2413                    // to insert a new peer in transactions peerset
2414                    transactions.on_network_event(ev);
2415                }
2416                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2417                ev => {
2418                    error!("unexpected event {ev:?}")
2419                }
2420            }
2421        }
2422        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2423        let input = hex!(
2424            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2425        );
2426        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2427        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2428            peer_id: *handle1.peer_id(),
2429            msg: Transactions(vec![signed_tx.clone()]),
2430        });
2431        assert!(transactions
2432            .transactions_by_peers
2433            .get(signed_tx.tx_hash())
2434            .unwrap()
2435            .contains(handle1.peer_id()));
2436
2437        // advance the transaction manager future
2438        poll_fn(|cx| {
2439            let _ = transactions.poll_unpin(cx);
2440            Poll::Ready(())
2441        })
2442        .await;
2443
2444        assert!(!pool.is_empty());
2445        assert!(pool.get(signed_tx.tx_hash()).is_some());
2446        handle.terminate().await;
2447    }
2448
2449    #[tokio::test(flavor = "multi_thread")]
2450    async fn test_on_get_pooled_transactions_network() {
2451        reth_tracing::init_test_tracing();
2452        let net = Testnet::create(2).await;
2453
2454        let mut handles = net.handles();
2455        let handle0 = handles.next().unwrap();
2456        let handle1 = handles.next().unwrap();
2457
2458        drop(handles);
2459        let handle = net.spawn();
2460
2461        let listener0 = handle0.event_listener();
2462
2463        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2464        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2465
2466        let client = NoopProvider::default();
2467        let pool = testing_pool();
2468        let config = NetworkConfigBuilder::new(secret_key)
2469            .disable_discovery()
2470            .listener_port(0)
2471            .build(client);
2472        let transactions_manager_config = config.transactions_manager_config.clone();
2473        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2474            .await
2475            .unwrap()
2476            .into_builder()
2477            .transactions(pool.clone(), transactions_manager_config)
2478            .split_with_handle();
2479        tokio::task::spawn(network);
2480
2481        network_handle.update_sync_state(SyncState::Idle);
2482
2483        assert!(!NetworkInfo::is_syncing(&network_handle));
2484
2485        // wait for all initiator connections
2486        let mut established = listener0.take(2);
2487        while let Some(ev) = established.next().await {
2488            match ev {
2489                NetworkEvent::ActivePeerSession { .. } |
2490                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2491                    transactions.on_network_event(ev);
2492                }
2493                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2494                ev => {
2495                    error!("unexpected event {ev:?}")
2496                }
2497            }
2498        }
2499        handle.terminate().await;
2500
2501        let tx = MockTransaction::eip1559();
2502        let _ = transactions
2503            .pool
2504            .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2505            .await;
2506
2507        let request = GetPooledTransactions(vec![*tx.get_hash()]);
2508
2509        let (send, receive) =
2510            oneshot::channel::<RequestResult<PooledTransactions<PooledTransactionVariant>>>();
2511
2512        transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2513            peer_id: *handle1.peer_id(),
2514            request,
2515            response: send,
2516        });
2517
2518        match receive.await.unwrap() {
2519            Ok(PooledTransactions(transactions)) => {
2520                assert_eq!(transactions.len(), 1);
2521            }
2522            Err(e) => {
2523                panic!("error: {e:?}");
2524            }
2525        }
2526    }
2527
2528    // Ensure that when the remote peer only returns part of the requested transactions, the
2529    // replied transactions are removed from the `tx_fetcher`, while the unresponsive ones are
2530    // re-buffered.
2531    #[tokio::test]
2532    async fn test_partially_tx_response() {
2533        reth_tracing::init_test_tracing();
2534
2535        let mut tx_manager = new_tx_manager().await.0;
2536        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2537
2538        let peer_id_1 = PeerId::new([1; 64]);
2539        let eth_version = EthVersion::Eth66;
2540
2541        let txs = vec![
2542            TransactionSigned::new_unhashed(
2543                Transaction::Legacy(TxLegacy {
2544                    chain_id: Some(4),
2545                    nonce: 15u64,
2546                    gas_price: 2200000000,
2547                    gas_limit: 34811,
2548                    to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2549                    value: U256::from(1234u64),
2550                    input: Default::default(),
2551                }),
2552                Signature::new(
2553                    U256::from_str(
2554                        "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2555                    )
2556                    .unwrap(),
2557                    U256::from_str(
2558                        "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2559                    )
2560                    .unwrap(),
2561                    true,
2562                ),
2563            ),
2564            TransactionSigned::new_unhashed(
2565                Transaction::Eip1559(TxEip1559 {
2566                    chain_id: 4,
2567                    nonce: 26u64,
2568                    max_priority_fee_per_gas: 1500000000,
2569                    max_fee_per_gas: 1500000013,
2570                    gas_limit: MIN_TRANSACTION_GAS,
2571                    to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2572                    value: U256::from(3000000000000000000u64),
2573                    input: Default::default(),
2574                    access_list: Default::default(),
2575                }),
2576                Signature::new(
2577                    U256::from_str(
2578                        "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2579                    )
2580                    .unwrap(),
2581                    U256::from_str(
2582                        "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2583                    )
2584                    .unwrap(),
2585                    true,
2586                ),
2587            ),
2588        ];
2589
2590        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2591
2592        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2593        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2594        // fetch
2595        peer_1.seen_transactions.insert(txs_hashes[0]);
2596        peer_1.seen_transactions.insert(txs_hashes[1]);
2597        tx_manager.peers.insert(peer_id_1, peer_1);
2598
2599        buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2600        buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2601
2602        // peer_1 is idle
2603        assert!(tx_fetcher.is_idle(&peer_id_1));
2604        assert_eq!(tx_fetcher.active_peers.len(), 0);
2605
2606        // sends requests for buffered hashes to peer_1
2607        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2608
2609        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2610        // as long as request is in flight peer_1 is not idle
2611        assert!(!tx_fetcher.is_idle(&peer_id_1));
2612        assert_eq!(tx_fetcher.active_peers.len(), 1);
2613
2614        // mock session of peer_1 receives request
2615        let req = to_mock_session_rx
2616            .recv()
2617            .await
2618            .expect("peer_1 session should receive request with buffered hashes");
2619        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2620
2621        let message: Vec<PooledTransactionVariant> = txs
2622            .into_iter()
2623            .take(1)
2624            .map(|tx| {
2625                PooledTransactionVariant::try_from(tx)
2626                    .expect("Failed to convert MockTransaction to PooledTransaction")
2627            })
2628            .collect();
2629        // response partial request
2630        response
2631            .send(Ok(PooledTransactions(message)))
2632            .expect("should send peer_1 response to tx manager");
2633        let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2634            unreachable!()
2635        };
2636
2637        // request has resolved, peer_1 is idle again
2638        assert!(tx_fetcher.is_idle(&peer_id));
2639        assert_eq!(tx_fetcher.active_peers.len(), 0);
2640        // failing peer_1's request buffers requested hashes for retry.
2641        assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2642    }
2643
2644    #[tokio::test]
2645    async fn test_max_retries_tx_request() {
2646        reth_tracing::init_test_tracing();
2647
2648        let mut tx_manager = new_tx_manager().await.0;
2649        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2650
2651        let peer_id_1 = PeerId::new([1; 64]);
2652        let peer_id_2 = PeerId::new([2; 64]);
2653        let eth_version = EthVersion::Eth66;
2654        let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2655
2656        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2657        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2658        // fetch
2659        peer_1.seen_transactions.insert(seen_hashes[0]);
2660        peer_1.seen_transactions.insert(seen_hashes[1]);
2661        tx_manager.peers.insert(peer_id_1, peer_1);
2662
2663        // hashes are seen and currently not inflight, with one fallback peer, and are buffered
2664        // for first retry in reverse order to make index 0 lru
2665        let retries = 1;
2666        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2667        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2668
2669        // peer_1 is idle
2670        assert!(tx_fetcher.is_idle(&peer_id_1));
2671        assert_eq!(tx_fetcher.active_peers.len(), 0);
2672
2673        // sends request for buffered hashes to peer_1
2674        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2675
2676        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2677
2678        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2679        // as long as request is in inflight peer_1 is not idle
2680        assert!(!tx_fetcher.is_idle(&peer_id_1));
2681        assert_eq!(tx_fetcher.active_peers.len(), 1);
2682
2683        // mock session of peer_1 receives request
2684        let req = to_mock_session_rx
2685            .recv()
2686            .await
2687            .expect("peer_1 session should receive request with buffered hashes");
2688        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2689        let GetPooledTransactions(hashes) = request;
2690
2691        let hashes = hashes.into_iter().collect::<HashSet<_>>();
2692
2693        assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2694
2695        // fail request to peer_1
2696        response
2697            .send(Err(RequestError::BadResponse))
2698            .expect("should send peer_1 response to tx manager");
2699        let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2700            unreachable!()
2701        };
2702
2703        // request has resolved, peer_1 is idle again
2704        assert!(tx_fetcher.is_idle(&peer_id));
2705        assert_eq!(tx_fetcher.active_peers.len(), 0);
2706        // failing peer_1's request buffers requested hashes for retry
2707        assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2708
2709        let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2710        tx_manager.peers.insert(peer_id_2, peer_2);
2711
2712        // peer_2 announces same hashes as peer_1
2713        let msg =
2714            NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2715        tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2716
2717        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2718
2719        // peer_2 should be in active_peers.
2720        assert_eq!(tx_fetcher.active_peers.len(), 1);
2721
2722        // since hashes are already seen, no changes to length of unknown hashes
2723        assert_eq!(tx_fetcher.num_all_hashes(), 2);
2724        // but hashes are taken out of buffer and packed into request to peer_2
2725        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2726
2727        // mock session of peer_2 receives request
2728        let req = to_mock_session_rx
2729            .recv()
2730            .await
2731            .expect("peer_2 session should receive request with buffered hashes");
2732        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2733
2734        // report failed request to tx manager
2735        response
2736            .send(Err(RequestError::BadResponse))
2737            .expect("should send peer_2 response to tx manager");
2738        let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2739
2740        // `MAX_REQUEST_RETRIES_PER_TX_HASH`, 2, for hashes reached so this time won't be buffered
2741        // for retry
2742        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2743        assert_eq!(tx_fetcher.active_peers.len(), 0);
2744    }
2745
2746    #[test]
2747    fn test_transaction_builder_empty() {
2748        let mut builder =
2749            PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2750        assert!(builder.is_empty());
2751
2752        let mut factory = MockTransactionFactory::default();
2753        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2754        builder.push(&tx);
2755        assert!(!builder.is_empty());
2756
2757        let txs = builder.build();
2758        assert!(txs.full.is_none());
2759        let txs = txs.pooled.unwrap();
2760        assert_eq!(txs.len(), 1);
2761    }
2762
2763    #[test]
2764    fn test_transaction_builder_large() {
2765        let mut builder =
2766            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2767        assert!(builder.is_empty());
2768
2769        let mut factory = MockTransactionFactory::default();
2770        let mut tx = factory.create_eip1559();
2771        // create a transaction that still fits
2772        tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2773        let tx = Arc::new(tx);
2774        let tx = PropagateTransaction::pool_tx(tx);
2775        builder.push(&tx);
2776        assert!(!builder.is_empty());
2777
2778        let txs = builder.clone().build();
2779        assert!(txs.pooled.is_none());
2780        let txs = txs.full.unwrap();
2781        assert_eq!(txs.len(), 1);
2782
2783        builder.push(&tx);
2784
2785        let txs = builder.clone().build();
2786        let pooled = txs.pooled.unwrap();
2787        assert_eq!(pooled.len(), 1);
2788        let txs = txs.full.unwrap();
2789        assert_eq!(txs.len(), 1);
2790    }
2791
2792    #[test]
2793    fn test_transaction_builder_eip4844() {
2794        let mut builder =
2795            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2796        assert!(builder.is_empty());
2797
2798        let mut factory = MockTransactionFactory::default();
2799        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
2800        builder.push(&tx);
2801        assert!(!builder.is_empty());
2802
2803        let txs = builder.clone().build();
2804        assert!(txs.full.is_none());
2805        let txs = txs.pooled.unwrap();
2806        assert_eq!(txs.len(), 1);
2807
2808        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2809        builder.push(&tx);
2810
2811        let txs = builder.clone().build();
2812        let pooled = txs.pooled.unwrap();
2813        assert_eq!(pooled.len(), 1);
2814        let txs = txs.full.unwrap();
2815        assert_eq!(txs.len(), 1);
2816    }
2817
2818    #[tokio::test]
2819    async fn test_propagate_full() {
2820        reth_tracing::init_test_tracing();
2821
2822        let (mut tx_manager, network) = new_tx_manager().await;
2823        let peer_id = PeerId::random();
2824
2825        // ensure not syncing
2826        network.handle().update_sync_state(SyncState::Idle);
2827
2828        // mock a peer
2829        let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
2830
2831        let session_info = SessionInfo {
2832            peer_id,
2833            remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
2834            client_version: Arc::from(""),
2835            capabilities: Arc::new(vec![].into()),
2836            status: Arc::new(Default::default()),
2837            version: EthVersion::Eth68,
2838            peer_kind: PeerKind::Basic,
2839        };
2840        let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
2841        tx_manager
2842            .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
2843        let mut propagate = vec![];
2844        let mut factory = MockTransactionFactory::default();
2845        let eip1559_tx = Arc::new(factory.create_eip1559());
2846        propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
2847        let eip4844_tx = Arc::new(factory.create_eip4844());
2848        propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
2849
2850        let propagated =
2851            tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
2852        assert_eq!(propagated.0.len(), 2);
2853        let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
2854        assert_eq!(prop_txs.len(), 1);
2855        assert!(prop_txs[0].is_full());
2856
2857        let prop_txs = propagated.0.get(eip4844_tx.transaction.hash()).unwrap();
2858        assert_eq!(prop_txs.len(), 1);
2859        assert!(prop_txs[0].is_hash());
2860
2861        let peer = tx_manager.peers.get(&peer_id).unwrap();
2862        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2863        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2864        peer.seen_transactions.contains(eip4844_tx.transaction.hash());
2865
2866        // propagate again
2867        let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
2868        assert!(propagated.0.is_empty());
2869    }
2870
2871    #[tokio::test]
2872    async fn test_relaxed_filter_ignores_unknown_tx_types() {
2873        reth_tracing::init_test_tracing();
2874
2875        let transactions_manager_config = TransactionsManagerConfig::default();
2876
2877        let propagation_policy = TransactionPropagationKind::default();
2878        let announcement_policy = RelaxedEthAnnouncementFilter::default();
2879
2880        let policy_bundle = NetworkPolicies::new(propagation_policy, announcement_policy);
2881
2882        let pool = testing_pool();
2883        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2884        let client = NoopProvider::default();
2885
2886        let network_config = NetworkConfigBuilder::new(secret_key)
2887            .listener_port(0)
2888            .disable_discovery()
2889            .build(client.clone());
2890
2891        let mut network_manager = NetworkManager::new(network_config).await.unwrap();
2892        let (to_tx_manager_tx, from_network_rx) =
2893            mpsc::unbounded_channel::<NetworkTransactionEvent<EthNetworkPrimitives>>();
2894        network_manager.set_transactions(to_tx_manager_tx);
2895        let network_handle = network_manager.handle().clone();
2896        let network_service_handle = tokio::spawn(network_manager);
2897
2898        let mut tx_manager = TransactionsManager::<
2899            TestPool,
2900            EthNetworkPrimitives,
2901            NetworkPolicies<TransactionPropagationKind, RelaxedEthAnnouncementFilter>,
2902        >::with_policy(
2903            network_handle.clone(),
2904            pool.clone(),
2905            from_network_rx,
2906            transactions_manager_config,
2907            policy_bundle,
2908        );
2909
2910        let peer_id = PeerId::random();
2911        let eth_version = EthVersion::Eth68;
2912        let (mock_peer_metadata, mut mock_session_rx) = new_mock_session(peer_id, eth_version);
2913        tx_manager.peers.insert(peer_id, mock_peer_metadata);
2914
2915        let mut tx_factory = MockTransactionFactory::default();
2916
2917        let valid_known_tx = tx_factory.create_eip1559();
2918        let known_tx_signed: Arc<ValidPoolTransaction<MockTransaction>> = Arc::new(valid_known_tx);
2919
2920        let known_tx_hash = *known_tx_signed.hash();
2921        let known_tx_type_byte = known_tx_signed.transaction.tx_type();
2922        let known_tx_size = known_tx_signed.encoded_length();
2923
2924        let unknown_tx_hash = B256::random();
2925        let unknown_tx_type_byte = 0xff_u8;
2926        let unknown_tx_size = 150;
2927
2928        let announcement_msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
2929            types: vec![known_tx_type_byte, unknown_tx_type_byte],
2930            sizes: vec![known_tx_size, unknown_tx_size],
2931            hashes: vec![known_tx_hash, unknown_tx_hash],
2932        });
2933
2934        tx_manager.on_new_pooled_transaction_hashes(peer_id, announcement_msg);
2935
2936        poll_fn(|cx| {
2937            let _ = tx_manager.poll_unpin(cx);
2938            Poll::Ready(())
2939        })
2940        .await;
2941
2942        let mut requested_hashes_in_getpooled = HashSet::new();
2943        let mut unexpected_request_received = false;
2944
2945        match tokio::time::timeout(std::time::Duration::from_millis(200), mock_session_rx.recv())
2946            .await
2947        {
2948            Ok(Some(PeerRequest::GetPooledTransactions { request, response: tx_response_ch })) => {
2949                let GetPooledTransactions(hashes) = request;
2950                for hash in hashes {
2951                    requested_hashes_in_getpooled.insert(hash);
2952                }
2953                let _ = tx_response_ch.send(Ok(PooledTransactions(vec![])));
2954            }
2955            Ok(Some(other_request)) => {
2956                tracing::error!(?other_request, "Received unexpected PeerRequest type");
2957                unexpected_request_received = true;
2958            }
2959            Ok(None) => tracing::info!("Mock session channel closed or no request received."),
2960            Err(_timeout_err) => {
2961                tracing::info!("Timeout: No GetPooledTransactions request received.")
2962            }
2963        }
2964
2965        assert!(
2966            requested_hashes_in_getpooled.contains(&known_tx_hash),
2967            "Should have requested the known EIP-1559 transaction. Requested: {requested_hashes_in_getpooled:?}"
2968        );
2969        assert!(
2970            !requested_hashes_in_getpooled.contains(&unknown_tx_hash),
2971            "Should NOT have requested the unknown transaction type. Requested: {requested_hashes_in_getpooled:?}"
2972        );
2973        assert!(
2974            !unexpected_request_received,
2975            "An unexpected P2P request was received by the mock peer."
2976        );
2977
2978        network_service_handle.abort();
2979    }
2980}