reth_network/transactions/
mod.rs

1//! Transactions management for the p2p network.
2
3/// Aggregation on configurable parameters for [`TransactionsManager`].
4pub mod config;
5/// Default and spec'd bounds.
6pub mod constants;
7/// Component responsible for fetching transactions from [`NewPooledTransactionHashes`].
8pub mod fetcher;
9/// Defines the [`TransactionPolicies`] trait for aggregating transaction-related policies.
10pub mod policy;
11
12pub use self::constants::{
13    tx_fetcher::DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
14    SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
15};
16use config::{AnnouncementAcceptance, StrictEthAnnouncementFilter, TransactionPropagationKind};
17pub use config::{
18    AnnouncementFilteringPolicy, TransactionFetcherConfig, TransactionPropagationMode,
19    TransactionPropagationPolicy, TransactionsManagerConfig,
20};
21use policy::{NetworkPolicies, TransactionPolicies};
22
23pub(crate) use fetcher::{FetchEvent, TransactionFetcher};
24
25use self::constants::{tx_manager::*, DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE};
26use crate::{
27    budget::{
28        DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
29        DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS, DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
30        DEFAULT_BUDGET_TRY_DRAIN_STREAM,
31    },
32    cache::LruCache,
33    duration_metered_exec, metered_poll_nested_stream_with_budget,
34    metrics::{
35        AnnouncedTxTypesMetrics, TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE,
36    },
37    NetworkHandle, TxTypesCounter,
38};
39use alloy_primitives::{TxHash, B256};
40use constants::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
41use futures::{stream::FuturesUnordered, Future, StreamExt};
42use reth_eth_wire::{
43    DedupPayload, EthNetworkPrimitives, EthVersion, GetPooledTransactions, HandleMempoolData,
44    HandleVersionedMempoolData, NetworkPrimitives, NewPooledTransactionHashes,
45    NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
46    RequestTxHashes, Transactions, ValidAnnouncementData,
47};
48use reth_ethereum_primitives::{TransactionSigned, TxType};
49use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
50use reth_network_api::{
51    events::{PeerEvent, SessionInfo},
52    NetworkEvent, NetworkEventListenerProvider, PeerKind, PeerRequest, PeerRequestSender, Peers,
53};
54use reth_network_p2p::{
55    error::{RequestError, RequestResult},
56    sync::SyncStateProvider,
57};
58use reth_network_peers::PeerId;
59use reth_network_types::ReputationChangeKind;
60use reth_primitives_traits::SignedTransaction;
61use reth_tokio_util::EventStream;
62use reth_transaction_pool::{
63    error::{PoolError, PoolResult},
64    AddedTransactionOutcome, GetPooledTransactionLimit, PoolTransaction, PropagateKind,
65    PropagatedTransactions, TransactionPool, ValidPoolTransaction,
66};
67use std::{
68    collections::{hash_map::Entry, HashMap, HashSet},
69    pin::Pin,
70    sync::{
71        atomic::{AtomicUsize, Ordering},
72        Arc,
73    },
74    task::{Context, Poll},
75    time::{Duration, Instant},
76};
77use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
78use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
79use tracing::{debug, trace};
80
81/// The future for importing transactions into the pool.
82///
83/// Resolves with the result of each transaction import.
84pub type PoolImportFuture =
85    Pin<Box<dyn Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send + 'static>>;
86
87/// Api to interact with [`TransactionsManager`] task.
88///
89/// This can be obtained via [`TransactionsManager::handle`] and can be used to manually interact
90/// with the [`TransactionsManager`] task once it is spawned.
91///
92/// For example [`TransactionsHandle::get_peer_transaction_hashes`] returns the transaction hashes
93/// known by a specific peer.
94#[derive(Debug, Clone)]
95pub struct TransactionsHandle<N: NetworkPrimitives = EthNetworkPrimitives> {
96    /// Command channel to the [`TransactionsManager`]
97    manager_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
98}
99
100/// Implementation of the `TransactionsHandle` API for use in testnet via type
101/// [`PeerHandle`](crate::test_utils::PeerHandle).
102impl<N: NetworkPrimitives> TransactionsHandle<N> {
103    fn send(&self, cmd: TransactionsCommand<N>) {
104        let _ = self.manager_tx.send(cmd);
105    }
106
107    /// Fetch the [`PeerRequestSender`] for the given peer.
108    async fn peer_handle(
109        &self,
110        peer_id: PeerId,
111    ) -> Result<Option<PeerRequestSender<PeerRequest<N>>>, RecvError> {
112        let (tx, rx) = oneshot::channel();
113        self.send(TransactionsCommand::GetPeerSender { peer_id, peer_request_sender: tx });
114        rx.await
115    }
116
117    /// Manually propagate the transaction that belongs to the hash.
118    pub fn propagate(&self, hash: TxHash) {
119        self.send(TransactionsCommand::PropagateHash(hash))
120    }
121
122    /// Manually propagate the transaction hash to a specific peer.
123    ///
124    /// Note: this only propagates if the pool contains the transaction.
125    pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
126        self.propagate_hashes_to(Some(hash), peer)
127    }
128
129    /// Manually propagate the transaction hashes to a specific peer.
130    ///
131    /// Note: this only propagates the transactions that are known to the pool.
132    pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
133        let hashes = hash.into_iter().collect::<Vec<_>>();
134        if hashes.is_empty() {
135            return
136        }
137        self.send(TransactionsCommand::PropagateHashesTo(hashes, peer))
138    }
139
140    /// Request the active peer IDs from the [`TransactionsManager`].
141    pub async fn get_active_peers(&self) -> Result<HashSet<PeerId>, RecvError> {
142        let (tx, rx) = oneshot::channel();
143        self.send(TransactionsCommand::GetActivePeers(tx));
144        rx.await
145    }
146
147    /// Manually propagate full transaction hashes to a specific peer.
148    ///
149    /// Do nothing if transactions are empty.
150    pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
151        if transactions.is_empty() {
152            return
153        }
154        self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
155    }
156
157    /// Manually propagate the given transaction hashes to all peers.
158    ///
159    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
160    /// full.
161    pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
162        if transactions.is_empty() {
163            return
164        }
165        self.send(TransactionsCommand::PropagateTransactions(transactions))
166    }
167
168    /// Manually propagate the given transactions to all peers.
169    ///
170    /// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
171    /// full.
172    pub fn broadcast_transactions(
173        &self,
174        transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
175    ) {
176        let transactions =
177            transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
178        if transactions.is_empty() {
179            return
180        }
181        self.send(TransactionsCommand::BroadcastTransactions(transactions))
182    }
183
184    /// Request the transaction hashes known by specific peers.
185    pub async fn get_transaction_hashes(
186        &self,
187        peers: Vec<PeerId>,
188    ) -> Result<HashMap<PeerId, HashSet<TxHash>>, RecvError> {
189        if peers.is_empty() {
190            return Ok(Default::default())
191        }
192        let (tx, rx) = oneshot::channel();
193        self.send(TransactionsCommand::GetTransactionHashes { peers, tx });
194        rx.await
195    }
196
197    /// Request the transaction hashes known by a specific peer.
198    pub async fn get_peer_transaction_hashes(
199        &self,
200        peer: PeerId,
201    ) -> Result<HashSet<TxHash>, RecvError> {
202        let res = self.get_transaction_hashes(vec![peer]).await?;
203        Ok(res.into_values().next().unwrap_or_default())
204    }
205
206    /// Requests the transactions directly from the given peer.
207    ///
208    /// Returns `None` if the peer is not connected.
209    ///
210    /// **Note**: this returns the response from the peer as received.
211    pub async fn get_pooled_transactions_from(
212        &self,
213        peer_id: PeerId,
214        hashes: Vec<B256>,
215    ) -> Result<Option<Vec<N::PooledTransaction>>, RequestError> {
216        let Some(peer) = self.peer_handle(peer_id).await? else { return Ok(None) };
217
218        let (tx, rx) = oneshot::channel();
219        let request = PeerRequest::GetPooledTransactions { request: hashes.into(), response: tx };
220        peer.try_send(request).ok();
221
222        rx.await?.map(|res| Some(res.0))
223    }
224}
225
226/// Manages transactions on top of the p2p network.
227///
228/// This can be spawned to another task and is supposed to be run as background service.
229/// [`TransactionsHandle`] can be used as frontend to programmatically send commands to it and
230/// interact with it.
231///
232/// The [`TransactionsManager`] is responsible for:
233///    - handling incoming eth messages for transactions.
234///    - serving transaction requests.
235///    - propagate transactions
236///
237/// This type communicates with the [`NetworkManager`](crate::NetworkManager) in both directions.
238///   - receives incoming network messages.
239///   - sends messages to dispatch (responses, propagate tx)
240///
241/// It is directly connected to the [`TransactionPool`] to retrieve requested transactions and
242/// propagate new transactions over the network.
243///
244/// It can be configured with different policies for transaction propagation and announcement
245/// filtering. See [`NetworkPolicies`] and [`TransactionPolicies`] for more details.
246///
247/// ## Network Transaction Processing
248///
249/// ### Message Types
250///
251/// - **`Transactions`**: Full transaction broadcasts (rejects blob transactions)
252/// - **`NewPooledTransactionHashes`**: Hash announcements
253///
254/// ### Peer Tracking
255///
256/// - Maintains per-peer transaction cache (default: 10,240 entries)
257/// - Prevents duplicate imports and enables efficient propagation
258///
259/// ### Bad Transaction Handling
260///
261/// Caches and rejects transactions with consensus violations (gas, signature, chain ID).
262/// Penalizes peers sending invalid transactions.
263///
264/// ### Import Management
265///
266/// Limits concurrent pool imports and backs off when approaching capacity.
267///
268/// ### Transaction Fetching
269///
270/// For announced transactions: filters known → queues unknown → fetches → imports
271///
272/// ### Propagation Rules
273///
274/// Based on: origin (Local/External/Private), peer capabilities, and network state.
275/// Disabled during initial sync.
276///
277/// ### Security
278///
279/// Rate limiting via reputation, bad transaction isolation, peer scoring.
280#[derive(Debug)]
281#[must_use = "Manager does nothing unless polled."]
282pub struct TransactionsManager<
283    Pool,
284    N: NetworkPrimitives = EthNetworkPrimitives,
285    PBundle: TransactionPolicies = NetworkPolicies<
286        TransactionPropagationKind,
287        StrictEthAnnouncementFilter,
288    >,
289> {
290    /// Access to the transaction pool.
291    pool: Pool,
292    /// Network access.
293    network: NetworkHandle<N>,
294    /// Subscriptions to all network related events.
295    ///
296    /// From which we get all new incoming transaction related messages.
297    network_events: EventStream<NetworkEvent<PeerRequest<N>>>,
298    /// Transaction fetcher to handle inflight and missing transaction requests.
299    transaction_fetcher: TransactionFetcher<N>,
300    /// All currently pending transactions grouped by peers.
301    ///
302    /// This way we can track incoming transactions and prevent multiple pool imports for the same
303    /// transaction
304    transactions_by_peers: HashMap<TxHash, HashSet<PeerId>>,
305    /// Transactions that are currently imported into the `Pool`.
306    ///
307    /// The import process includes:
308    ///  - validation of the transactions, e.g. transaction is well formed: valid tx type, fees are
309    ///    valid, or for 4844 transaction the blobs are valid. See also
310    ///    [`EthTransactionValidator`](reth_transaction_pool::validate::EthTransactionValidator)
311    /// - if the transaction is valid, it is added into the pool.
312    ///
313    /// Once the new transaction reaches the __pending__ state it will be emitted by the pool via
314    /// [`TransactionPool::pending_transactions_listener`] and arrive at the `pending_transactions`
315    /// receiver.
316    pool_imports: FuturesUnordered<PoolImportFuture>,
317    /// Stats on pending pool imports that help the node self-monitor.
318    pending_pool_imports_info: PendingPoolImportsInfo,
319    /// Bad imports.
320    bad_imports: LruCache<TxHash>,
321    /// All the connected peers.
322    peers: HashMap<PeerId, PeerMetadata<N>>,
323    /// Send half for the command channel.
324    ///
325    /// This is kept so that a new [`TransactionsHandle`] can be created at any time.
326    command_tx: mpsc::UnboundedSender<TransactionsCommand<N>>,
327    /// Incoming commands from [`TransactionsHandle`].
328    ///
329    /// This will only receive commands if a user manually sends a command to the manager through
330    /// the [`TransactionsHandle`] to interact with this type directly.
331    command_rx: UnboundedReceiverStream<TransactionsCommand<N>>,
332    /// A stream that yields new __pending__ transactions.
333    ///
334    /// A transaction is considered __pending__ if it is executable on the current state of the
335    /// chain. In other words, this only yields transactions that satisfy all consensus
336    /// requirements, these include:
337    ///   - no nonce gaps
338    ///   - all dynamic fee requirements are (currently) met
339    ///   - account has enough balance to cover the transaction's gas
340    pending_transactions: ReceiverStream<TxHash>,
341    /// Incoming events from the [`NetworkManager`](crate::NetworkManager).
342    transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent<N>>,
343    /// How the `TransactionsManager` is configured.
344    config: TransactionsManagerConfig,
345    /// Network Policies
346    policies: PBundle,
347    /// `TransactionsManager` metrics
348    metrics: TransactionsManagerMetrics,
349    /// `AnnouncedTxTypes` metrics
350    announced_tx_types_metrics: AnnouncedTxTypesMetrics,
351}
352
353impl<Pool: TransactionPool, N: NetworkPrimitives>
354    TransactionsManager<
355        Pool,
356        N,
357        NetworkPolicies<TransactionPropagationKind, StrictEthAnnouncementFilter>,
358    >
359{
360    /// Sets up a new instance.
361    ///
362    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
363    pub fn new(
364        network: NetworkHandle<N>,
365        pool: Pool,
366        from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
367        transactions_manager_config: TransactionsManagerConfig,
368    ) -> Self {
369        Self::with_policy(
370            network,
371            pool,
372            from_network,
373            transactions_manager_config,
374            NetworkPolicies::default(),
375        )
376    }
377}
378
379impl<Pool: TransactionPool, N: NetworkPrimitives, PBundle: TransactionPolicies>
380    TransactionsManager<Pool, N, PBundle>
381{
382    /// Sets up a new instance with given the settings.
383    ///
384    /// Note: This expects an existing [`NetworkManager`](crate::NetworkManager) instance.
385    pub fn with_policy(
386        network: NetworkHandle<N>,
387        pool: Pool,
388        from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent<N>>,
389        transactions_manager_config: TransactionsManagerConfig,
390        policies: PBundle,
391    ) -> Self {
392        let network_events = network.event_listener();
393
394        let (command_tx, command_rx) = mpsc::unbounded_channel();
395
396        let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
397            &transactions_manager_config.transaction_fetcher_config,
398        );
399
400        // install a listener for new __pending__ transactions that are allowed to be propagated
401        // over the network
402        let pending = pool.pending_transactions_listener();
403        let pending_pool_imports_info = PendingPoolImportsInfo::default();
404        let metrics = TransactionsManagerMetrics::default();
405        metrics
406            .capacity_pending_pool_imports
407            .increment(pending_pool_imports_info.max_pending_pool_imports as u64);
408
409        Self {
410            pool,
411            network,
412            network_events,
413            transaction_fetcher,
414            transactions_by_peers: Default::default(),
415            pool_imports: Default::default(),
416            pending_pool_imports_info: PendingPoolImportsInfo::new(
417                DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS,
418            ),
419            bad_imports: LruCache::new(DEFAULT_MAX_COUNT_BAD_IMPORTS),
420            peers: Default::default(),
421            command_tx,
422            command_rx: UnboundedReceiverStream::new(command_rx),
423            pending_transactions: ReceiverStream::new(pending),
424            transaction_events: UnboundedMeteredReceiver::new(
425                from_network,
426                NETWORK_POOL_TRANSACTIONS_SCOPE,
427            ),
428            config: transactions_manager_config,
429            policies,
430            metrics,
431            announced_tx_types_metrics: AnnouncedTxTypesMetrics::default(),
432        }
433    }
434
435    /// Returns a new handle that can send commands to this type.
436    pub fn handle(&self) -> TransactionsHandle<N> {
437        TransactionsHandle { manager_tx: self.command_tx.clone() }
438    }
439
440    /// Returns `true` if [`TransactionsManager`] has capacity to request pending hashes. Returns
441    /// `false` if [`TransactionsManager`] is operating close to full capacity.
442    fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
443        self.pending_pool_imports_info
444            .has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) &&
445            self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
446    }
447
448    fn report_peer_bad_transactions(&self, peer_id: PeerId) {
449        self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
450        self.metrics.reported_bad_transactions.increment(1);
451    }
452
453    fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
454        trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
455        self.network.reputation_change(peer_id, kind);
456    }
457
458    fn report_already_seen(&self, peer_id: PeerId) {
459        trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
460        self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
461    }
462
463    /// Clear the transaction
464    fn on_good_import(&mut self, hash: TxHash) {
465        self.transactions_by_peers.remove(&hash);
466    }
467
468    /// Penalize the peers that intentionally sent the bad transaction, and cache it to avoid
469    /// fetching or importing it again.
470    ///
471    /// Errors that count as bad transactions are:
472    ///
473    /// - intrinsic gas too low
474    /// - exceeds gas limit
475    /// - gas uint overflow
476    /// - exceeds max init code size
477    /// - oversized data
478    /// - signer account has bytecode
479    /// - chain id mismatch
480    /// - old legacy chain id
481    /// - tx type not supported
482    ///
483    /// (and additionally for blobs txns...)
484    ///
485    /// - no blobs
486    /// - too many blobs
487    /// - invalid kzg proof
488    /// - kzg error
489    /// - not blob transaction (tx type mismatch)
490    /// - wrong versioned kzg commitment hash
491    fn on_bad_import(&mut self, err: PoolError) {
492        let peers = self.transactions_by_peers.remove(&err.hash);
493
494        // if we're _currently_ syncing, we ignore a bad transaction
495        if !err.is_bad_transaction() || self.network.is_syncing() {
496            return
497        }
498        // otherwise we penalize the peer that sent the bad transaction, with the assumption that
499        // the peer should have known that this transaction is bad (e.g. violating consensus rules)
500        if let Some(peers) = peers {
501            for peer_id in peers {
502                self.report_peer_bad_transactions(peer_id);
503            }
504        }
505        self.metrics.bad_imports.increment(1);
506        self.bad_imports.insert(err.hash);
507    }
508
509    /// Runs an operation to fetch hashes that are cached in [`TransactionFetcher`].
510    fn on_fetch_hashes_pending_fetch(&mut self) {
511        // try drain transaction hashes pending fetch
512        let info = &self.pending_pool_imports_info;
513        let max_pending_pool_imports = info.max_pending_pool_imports;
514        let has_capacity_wrt_pending_pool_imports =
515            |divisor| info.has_capacity(max_pending_pool_imports / divisor);
516
517        self.transaction_fetcher
518            .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports);
519    }
520
521    fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
522        let kind = match req_err {
523            RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
524            RequestError::Timeout => ReputationChangeKind::Timeout,
525            RequestError::ChannelClosed | RequestError::ConnectionDropped => {
526                // peer is already disconnected
527                return
528            }
529            RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
530        };
531        self.report_peer(peer_id, kind);
532    }
533
534    #[inline]
535    fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
536        let metrics = &self.metrics;
537
538        let TxManagerPollDurations {
539            acc_network_events,
540            acc_pending_imports,
541            acc_tx_events,
542            acc_imported_txns,
543            acc_fetch_events,
544            acc_pending_fetch,
545            acc_cmds,
546        } = poll_durations;
547
548        // update metrics for whole poll function
549        metrics.duration_poll_tx_manager.set(start.elapsed().as_secs_f64());
550        // update metrics for nested expressions
551        metrics.acc_duration_poll_network_events.set(acc_network_events.as_secs_f64());
552        metrics.acc_duration_poll_pending_pool_imports.set(acc_pending_imports.as_secs_f64());
553        metrics.acc_duration_poll_transaction_events.set(acc_tx_events.as_secs_f64());
554        metrics.acc_duration_poll_imported_transactions.set(acc_imported_txns.as_secs_f64());
555        metrics.acc_duration_poll_fetch_events.set(acc_fetch_events.as_secs_f64());
556        metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
557        metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
558    }
559}
560
561impl<Pool: TransactionPool, N: NetworkPrimitives, PBundle: TransactionPolicies>
562    TransactionsManager<Pool, N, PBundle>
563{
564    /// Processes a batch import results.
565    fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<AddedTransactionOutcome>>) {
566        for res in batch_results {
567            match res {
568                Ok(AddedTransactionOutcome { hash, .. }) => {
569                    self.on_good_import(hash);
570                }
571                Err(err) => {
572                    self.on_bad_import(err);
573                }
574            }
575        }
576    }
577
578    /// Request handler for an incoming `NewPooledTransactionHashes`
579    fn on_new_pooled_transaction_hashes(
580        &mut self,
581        peer_id: PeerId,
582        msg: NewPooledTransactionHashes,
583    ) {
584        // If the node is initially syncing, ignore transactions
585        if self.network.is_initially_syncing() {
586            return
587        }
588        if self.network.tx_gossip_disabled() {
589            return
590        }
591
592        // get handle to peer's session, if the session is still active
593        let Some(peer) = self.peers.get_mut(&peer_id) else {
594            trace!(
595                peer_id = format!("{peer_id:#}"),
596                ?msg,
597                "discarding announcement from inactive peer"
598            );
599
600            return
601        };
602        let client = peer.client_version.clone();
603
604        // keep track of the transactions the peer knows
605        let mut count_txns_already_seen_by_peer = 0;
606        for tx in msg.iter_hashes().copied() {
607            if !peer.seen_transactions.insert(tx) {
608                count_txns_already_seen_by_peer += 1;
609            }
610        }
611        if count_txns_already_seen_by_peer > 0 {
612            // this may occur if transactions are sent or announced to a peer, at the same time as
613            // the peer sends/announces those hashes to us. this is because, marking
614            // txns as seen by a peer is done optimistically upon sending them to the
615            // peer.
616            self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
617            self.metrics
618                .occurrences_hash_already_seen_by_peer
619                .increment(count_txns_already_seen_by_peer);
620
621            trace!(target: "net::tx",
622                %count_txns_already_seen_by_peer,
623                peer_id=format!("{peer_id:#}"),
624                ?client,
625                "Peer sent hashes that have already been marked as seen by peer"
626            );
627
628            self.report_already_seen(peer_id);
629        }
630
631        // 1. filter out spam
632        if msg.is_empty() {
633            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
634            return;
635        }
636
637        let original_len = msg.len();
638        let mut partially_valid_msg = msg.dedup();
639
640        if partially_valid_msg.len() != original_len {
641            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
642        }
643
644        // 2. filter out transactions pending import to pool
645        partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
646
647        // 3. filter out known hashes
648        //
649        // known txns have already been successfully fetched or received over gossip.
650        //
651        // most hashes will be filtered out here since this the mempool protocol is a gossip
652        // protocol, healthy peers will send many of the same hashes.
653        //
654        let hashes_count_pre_pool_filter = partially_valid_msg.len();
655        self.pool.retain_unknown(&mut partially_valid_msg);
656        if hashes_count_pre_pool_filter > partially_valid_msg.len() {
657            let already_known_hashes_count =
658                hashes_count_pre_pool_filter - partially_valid_msg.len();
659            self.metrics
660                .occurrences_hashes_already_in_pool
661                .increment(already_known_hashes_count as u64);
662        }
663
664        if partially_valid_msg.is_empty() {
665            // nothing to request
666            return
667        }
668
669        // 4. filter out invalid entries (spam)
670        //
671        // validates messages with respect to the given network, e.g. allowed tx types
672        //
673        let mut should_report_peer = false;
674        let mut tx_types_counter = TxTypesCounter::default();
675
676        let is_eth68_message = partially_valid_msg
677            .msg_version()
678            .expect("partially valid announcement should have a version")
679            .is_eth68();
680
681        partially_valid_msg.retain(|tx_hash, metadata_ref_mut| {
682            let (ty_byte, size_val) = match *metadata_ref_mut {
683                Some((ty, size)) => {
684                    if !is_eth68_message {
685                        should_report_peer = true;
686                    }
687                    (ty, size)
688                }
689                None => {
690                    if is_eth68_message {
691                        should_report_peer = true;
692                        return false;
693                    }
694                    (0u8, 0)
695                }
696            };
697
698            if is_eth68_message {
699                if let Some((actual_ty_byte, _)) = *metadata_ref_mut {
700                    if let Ok(parsed_tx_type) = TxType::try_from(actual_ty_byte) {
701                        tx_types_counter.increase_by_tx_type(parsed_tx_type);
702                    }
703                }
704            }
705
706            let decision = self
707                .policies
708                .announcement_filter()
709                .decide_on_announcement(ty_byte, tx_hash, size_val);
710
711            match decision {
712                AnnouncementAcceptance::Accept => true,
713                AnnouncementAcceptance::Ignore => false,
714                AnnouncementAcceptance::Reject { penalize_peer } => {
715                    if penalize_peer {
716                        should_report_peer = true;
717                    }
718                    false
719                }
720            }
721        });
722
723        if is_eth68_message {
724            self.announced_tx_types_metrics.update_eth68_announcement_metrics(tx_types_counter);
725        }
726
727        if should_report_peer {
728            self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
729        }
730
731        let mut valid_announcement_data =
732            ValidAnnouncementData::from_partially_valid_data(partially_valid_msg);
733
734        if valid_announcement_data.is_empty() {
735            // no valid announcement data
736            return
737        }
738
739        // 5. filter out already seen unknown hashes
740        //
741        // seen hashes are already in the tx fetcher, pending fetch.
742        //
743        // for any seen hashes add the peer as fallback. unseen hashes are loaded into the tx
744        // fetcher, hence they should be valid at this point.
745        let bad_imports = &self.bad_imports;
746        self.transaction_fetcher.filter_unseen_and_pending_hashes(
747            &mut valid_announcement_data,
748            |hash| bad_imports.contains(hash),
749            &peer_id,
750            &client,
751        );
752
753        if valid_announcement_data.is_empty() {
754            // nothing to request
755            return
756        }
757
758        trace!(target: "net::tx::propagation",
759            peer_id=format!("{peer_id:#}"),
760            hashes_len=valid_announcement_data.len(),
761            hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
762            msg_version=%valid_announcement_data.msg_version(),
763            client_version=%client,
764            "received previously unseen and pending hashes in announcement from peer"
765        );
766
767        // only send request for hashes to idle peer, otherwise buffer hashes storing peer as
768        // fallback
769        if !self.transaction_fetcher.is_idle(&peer_id) {
770            // load message version before announcement data is destructed in packing
771            let msg_version = valid_announcement_data.msg_version();
772            let (hashes, _version) = valid_announcement_data.into_request_hashes();
773
774            trace!(target: "net::tx",
775                peer_id=format!("{peer_id:#}"),
776                hashes=?*hashes,
777                %msg_version,
778                %client,
779                "buffering hashes announced by busy peer"
780            );
781
782            self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
783
784            return
785        }
786
787        let mut hashes_to_request =
788            RequestTxHashes::with_capacity(valid_announcement_data.len() / 4);
789        let surplus_hashes =
790            self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
791
792        if !surplus_hashes.is_empty() {
793            trace!(target: "net::tx",
794                peer_id=format!("{peer_id:#}"),
795                surplus_hashes=?*surplus_hashes,
796                %client,
797                "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
798            );
799
800            self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
801        }
802
803        trace!(target: "net::tx",
804            peer_id=format!("{peer_id:#}"),
805            hashes=?*hashes_to_request,
806            %client,
807            "sending hashes in `GetPooledTransactions` request to peer's session"
808        );
809
810        // request the missing transactions
811        //
812        // get handle to peer's session again, at this point we know it exists
813        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
814        if let Some(failed_to_request_hashes) =
815            self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
816        {
817            let conn_eth_version = peer.version;
818
819            trace!(target: "net::tx",
820                peer_id=format!("{peer_id:#}"),
821                failed_to_request_hashes=?*failed_to_request_hashes,
822                %conn_eth_version,
823                %client,
824                "sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
825            );
826            self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
827        }
828    }
829}
830
831impl<Pool, N, PBundle> TransactionsManager<Pool, N, PBundle>
832where
833    Pool: TransactionPool + Unpin + 'static,
834
835    N: NetworkPrimitives<
836            BroadcastedTransaction: SignedTransaction,
837            PooledTransaction: SignedTransaction,
838        > + Unpin,
839
840    PBundle: TransactionPolicies,
841    Pool::Transaction:
842        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
843{
844    /// Invoked when transactions in the local mempool are considered __pending__.
845    ///
846    /// When a transaction in the local mempool is moved to the pending pool, we propagate them to
847    /// connected peers over network using the `Transactions` and `NewPooledTransactionHashes`
848    /// messages. The Transactions message relays complete transaction objects and is typically
849    /// sent to a small, random fraction of connected peers.
850    ///
851    /// All other peers receive a notification of the transaction hash and can request the
852    /// complete transaction object if it is unknown to them. The dissemination of complete
853    /// transactions to a fraction of peers usually ensures that all nodes receive the transaction
854    /// and won't need to request it.
855    fn on_new_pending_transactions(&mut self, hashes: Vec<TxHash>) {
856        // Nothing to propagate while initially syncing
857        if self.network.is_initially_syncing() {
858            return
859        }
860        if self.network.tx_gossip_disabled() {
861            return
862        }
863
864        trace!(target: "net::tx", num_hashes=?hashes.len(), "Start propagating transactions");
865
866        self.propagate_all(hashes);
867    }
868
869    /// Propagate the full transactions to a specific peer.
870    ///
871    /// Returns the propagated transactions.
872    fn propagate_full_transactions_to_peer(
873        &mut self,
874        txs: Vec<TxHash>,
875        peer_id: PeerId,
876        propagation_mode: PropagationMode,
877    ) -> Option<PropagatedTransactions> {
878        trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
879
880        let peer = self.peers.get_mut(&peer_id)?;
881        let mut propagated = PropagatedTransactions::default();
882
883        // filter all transactions unknown to the peer
884        let mut full_transactions = FullTransactionsBuilder::new(peer.version);
885
886        let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
887
888        if propagation_mode.is_forced() {
889            // skip cache check if forced
890            full_transactions.extend(to_propagate);
891        } else {
892            // Iterate through the transactions to propagate and fill the hashes and full
893            // transaction
894            for tx in to_propagate {
895                if !peer.seen_transactions.contains(tx.tx_hash()) {
896                    // Only include if the peer hasn't seen the transaction
897                    full_transactions.push(&tx);
898                }
899            }
900        }
901
902        if full_transactions.is_empty() {
903            // nothing to propagate
904            return None
905        }
906
907        let PropagateTransactions { pooled, full } = full_transactions.build();
908
909        // send hashes if any
910        if let Some(new_pooled_hashes) = pooled {
911            for hash in new_pooled_hashes.iter_hashes().copied() {
912                propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
913                // mark transaction as seen by peer
914                peer.seen_transactions.insert(hash);
915            }
916
917            // send hashes of transactions
918            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
919        }
920
921        // send full transactions, if any
922        if let Some(new_full_transactions) = full {
923            for tx in &new_full_transactions {
924                propagated.0.entry(*tx.tx_hash()).or_default().push(PropagateKind::Full(peer_id));
925                // mark transaction as seen by peer
926                peer.seen_transactions.insert(*tx.tx_hash());
927            }
928
929            // send full transactions
930            self.network.send_transactions(peer_id, new_full_transactions);
931        }
932
933        // Update propagated transactions metrics
934        self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
935
936        Some(propagated)
937    }
938
939    /// Propagate the transaction hashes to the given peer
940    ///
941    /// Note: This will only send the hashes for transactions that exist in the pool.
942    fn propagate_hashes_to(
943        &mut self,
944        hashes: Vec<TxHash>,
945        peer_id: PeerId,
946        propagation_mode: PropagationMode,
947    ) {
948        trace!(target: "net::tx", "Start propagating transactions as hashes");
949
950        // This fetches a transactions from the pool, including the blob transactions, which are
951        // only ever sent as hashes.
952        let propagated = {
953            let Some(peer) = self.peers.get_mut(&peer_id) else {
954                // no such peer
955                return
956            };
957
958            let to_propagate = self
959                .pool
960                .get_all(hashes)
961                .into_iter()
962                .map(PropagateTransaction::pool_tx)
963                .collect::<Vec<_>>();
964
965            let mut propagated = PropagatedTransactions::default();
966
967            // check if transaction is known to peer
968            let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
969
970            if propagation_mode.is_forced() {
971                hashes.extend(to_propagate)
972            } else {
973                for tx in to_propagate {
974                    if !peer.seen_transactions.contains(tx.tx_hash()) {
975                        // Include if the peer hasn't seen it
976                        hashes.push(&tx);
977                    }
978                }
979            }
980
981            let new_pooled_hashes = hashes.build();
982
983            if new_pooled_hashes.is_empty() {
984                // nothing to propagate
985                return
986            }
987
988            for hash in new_pooled_hashes.iter_hashes().copied() {
989                propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
990            }
991
992            trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
993
994            // send hashes of transactions
995            self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
996
997            // Update propagated transactions metrics
998            self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
999
1000            propagated
1001        };
1002
1003        // notify pool so events get fired
1004        self.pool.on_propagated(propagated);
1005    }
1006
1007    /// Propagate the transactions to all connected peers either as full objects or hashes.
1008    ///
1009    /// The message for new pooled hashes depends on the negotiated version of the stream.
1010    /// See [`NewPooledTransactionHashes`]
1011    ///
1012    /// Note: EIP-4844 are disallowed from being broadcast in full and are only ever sent as hashes, see also <https://eips.ethereum.org/EIPS/eip-4844#networking>.
1013    fn propagate_transactions(
1014        &mut self,
1015        to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
1016        propagation_mode: PropagationMode,
1017    ) -> PropagatedTransactions {
1018        let mut propagated = PropagatedTransactions::default();
1019        if self.network.tx_gossip_disabled() {
1020            return propagated
1021        }
1022
1023        // send full transactions to a set of the connected peers based on the configured mode
1024        let max_num_full = self.config.propagation_mode.full_peer_count(self.peers.len());
1025
1026        // Note: Assuming ~random~ order due to random state of the peers map hasher
1027        for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
1028            if !self.policies.propagation_policy().can_propagate(peer) {
1029                // skip peers we should not propagate to
1030                continue
1031            }
1032            // determine whether to send full tx objects or hashes.
1033            let mut builder = if peer_idx > max_num_full {
1034                PropagateTransactionsBuilder::pooled(peer.version)
1035            } else {
1036                PropagateTransactionsBuilder::full(peer.version)
1037            };
1038
1039            if propagation_mode.is_forced() {
1040                builder.extend(to_propagate.iter());
1041            } else {
1042                // Iterate through the transactions to propagate and fill the hashes and full
1043                // transaction lists, before deciding whether or not to send full transactions to
1044                // the peer.
1045                for tx in &to_propagate {
1046                    // Only proceed if the transaction is not in the peer's list of seen
1047                    // transactions
1048                    if !peer.seen_transactions.contains(tx.tx_hash()) {
1049                        builder.push(tx);
1050                    }
1051                }
1052            }
1053
1054            if builder.is_empty() {
1055                trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
1056                continue
1057            }
1058
1059            let PropagateTransactions { pooled, full } = builder.build();
1060
1061            // send hashes if any
1062            if let Some(mut new_pooled_hashes) = pooled {
1063                // enforce tx soft limit per message for the (unlikely) event the number of
1064                // hashes exceeds it
1065                new_pooled_hashes
1066                    .truncate(SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE);
1067
1068                for hash in new_pooled_hashes.iter_hashes().copied() {
1069                    propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
1070                    // mark transaction as seen by peer
1071                    peer.seen_transactions.insert(hash);
1072                }
1073
1074                trace!(target: "net::tx", ?peer_id, num_txs=?new_pooled_hashes.len(), "Propagating tx hashes to peer");
1075
1076                // send hashes of transactions
1077                self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
1078            }
1079
1080            // send full transactions, if any
1081            if let Some(new_full_transactions) = full {
1082                for tx in &new_full_transactions {
1083                    propagated
1084                        .0
1085                        .entry(*tx.tx_hash())
1086                        .or_default()
1087                        .push(PropagateKind::Full(*peer_id));
1088                    // mark transaction as seen by peer
1089                    peer.seen_transactions.insert(*tx.tx_hash());
1090                }
1091
1092                trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
1093
1094                // send full transactions
1095                self.network.send_transactions(*peer_id, new_full_transactions);
1096            }
1097        }
1098
1099        // Update propagated transactions metrics
1100        self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
1101
1102        propagated
1103    }
1104
1105    /// Propagates the given transactions to the peers
1106    ///
1107    /// This fetches all transaction from the pool, including the 4844 blob transactions but
1108    /// __without__ their sidecar, because 4844 transactions are only ever announced as hashes.
1109    fn propagate_all(&mut self, hashes: Vec<TxHash>) {
1110        if self.peers.is_empty() {
1111            // nothing to propagate
1112            return
1113        }
1114        let propagated = self.propagate_transactions(
1115            self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
1116            PropagationMode::Basic,
1117        );
1118
1119        // notify pool so events get fired
1120        self.pool.on_propagated(propagated);
1121    }
1122
1123    /// Request handler for an incoming request for transactions
1124    fn on_get_pooled_transactions(
1125        &mut self,
1126        peer_id: PeerId,
1127        request: GetPooledTransactions,
1128        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
1129    ) {
1130        if let Some(peer) = self.peers.get_mut(&peer_id) {
1131            if self.network.tx_gossip_disabled() {
1132                let _ = response.send(Ok(PooledTransactions::default()));
1133                return
1134            }
1135            let transactions = self.pool.get_pooled_transaction_elements(
1136                request.0,
1137                GetPooledTransactionLimit::ResponseSizeSoftLimit(
1138                    self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
1139                ),
1140            );
1141            trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
1142
1143            // we sent a response at which point we assume that the peer is aware of the
1144            // transactions
1145            peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.tx_hash()));
1146
1147            let resp = PooledTransactions(transactions);
1148            let _ = response.send(Ok(resp));
1149        }
1150    }
1151
1152    /// Handles a command received from a detached [`TransactionsHandle`]
1153    fn on_command(&mut self, cmd: TransactionsCommand<N>) {
1154        match cmd {
1155            TransactionsCommand::PropagateHash(hash) => {
1156                self.on_new_pending_transactions(vec![hash])
1157            }
1158            TransactionsCommand::PropagateHashesTo(hashes, peer) => {
1159                self.propagate_hashes_to(hashes, peer, PropagationMode::Forced)
1160            }
1161            TransactionsCommand::GetActivePeers(tx) => {
1162                let peers = self.peers.keys().copied().collect::<HashSet<_>>();
1163                tx.send(peers).ok();
1164            }
1165            TransactionsCommand::PropagateTransactionsTo(txs, peer) => {
1166                if let Some(propagated) =
1167                    self.propagate_full_transactions_to_peer(txs, peer, PropagationMode::Forced)
1168                {
1169                    self.pool.on_propagated(propagated);
1170                }
1171            }
1172            TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
1173            TransactionsCommand::BroadcastTransactions(txs) => {
1174                let propagated = self.propagate_transactions(txs, PropagationMode::Forced);
1175                self.pool.on_propagated(propagated);
1176            }
1177            TransactionsCommand::GetTransactionHashes { peers, tx } => {
1178                let mut res = HashMap::with_capacity(peers.len());
1179                for peer_id in peers {
1180                    let hashes = self
1181                        .peers
1182                        .get(&peer_id)
1183                        .map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
1184                        .unwrap_or_default();
1185                    res.insert(peer_id, hashes);
1186                }
1187                tx.send(res).ok();
1188            }
1189            TransactionsCommand::GetPeerSender { peer_id, peer_request_sender } => {
1190                let sender = self.peers.get(&peer_id).map(|peer| peer.request_tx.clone());
1191                peer_request_sender.send(sender).ok();
1192            }
1193        }
1194    }
1195
1196    /// Handles session establishment and peer transactions initialization.
1197    ///
1198    /// This is invoked when a new session is established.
1199    fn handle_peer_session(
1200        &mut self,
1201        info: SessionInfo,
1202        messages: PeerRequestSender<PeerRequest<N>>,
1203    ) {
1204        let SessionInfo { peer_id, client_version, version, .. } = info;
1205
1206        // Insert a new peer into the peerset.
1207        let peer = PeerMetadata::<N>::new(
1208            messages,
1209            version,
1210            client_version,
1211            self.config.max_transactions_seen_by_peer_history,
1212            info.peer_kind,
1213        );
1214        let peer = match self.peers.entry(peer_id) {
1215            Entry::Occupied(mut entry) => {
1216                entry.insert(peer);
1217                entry.into_mut()
1218            }
1219            Entry::Vacant(entry) => entry.insert(peer),
1220        };
1221
1222        self.policies.propagation_policy_mut().on_session_established(peer);
1223
1224        // Send a `NewPooledTransactionHashes` to the peer with up to
1225        // `SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE`
1226        // transactions in the pool.
1227        if self.network.is_initially_syncing() || self.network.tx_gossip_disabled() {
1228            trace!(target: "net::tx", ?peer_id, "Skipping transaction broadcast: node syncing or gossip disabled");
1229            return
1230        }
1231
1232        // Get transactions to broadcast
1233        let pooled_txs = self.pool.pooled_transactions_max(
1234            SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE,
1235        );
1236        if pooled_txs.is_empty() {
1237            trace!(target: "net::tx", ?peer_id, "No transactions in the pool to broadcast");
1238            return;
1239        }
1240
1241        // Build and send transaction hashes message
1242        let mut msg_builder = PooledTransactionsHashesBuilder::new(version);
1243        for pooled_tx in pooled_txs {
1244            peer.seen_transactions.insert(*pooled_tx.hash());
1245            msg_builder.push_pooled(pooled_tx);
1246        }
1247
1248        debug!(target: "net::tx", ?peer_id, tx_count = msg_builder.is_empty(), "Broadcasting transaction hashes");
1249        let msg = msg_builder.build();
1250        self.network.send_transactions_hashes(peer_id, msg);
1251    }
1252
1253    /// Handles a received event related to common network events.
1254    fn on_network_event(&mut self, event_result: NetworkEvent<PeerRequest<N>>) {
1255        match event_result {
1256            NetworkEvent::Peer(PeerEvent::SessionClosed { peer_id, .. }) => {
1257                // remove the peer
1258
1259                let peer = self.peers.remove(&peer_id);
1260                if let Some(mut peer) = peer {
1261                    self.policies.propagation_policy_mut().on_session_closed(&mut peer);
1262                }
1263                self.transaction_fetcher.remove_peer(&peer_id);
1264            }
1265            NetworkEvent::ActivePeerSession { info, messages } => {
1266                // process active peer session and broadcast available transaction from the pool
1267                self.handle_peer_session(info, messages);
1268            }
1269            NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
1270                let peer_id = info.peer_id;
1271                // get messages from existing peer
1272                let messages = match self.peers.get(&peer_id) {
1273                    Some(p) => p.request_tx.clone(),
1274                    None => {
1275                        debug!(target: "net::tx", ?peer_id, "No peer request sender found");
1276                        return;
1277                    }
1278                };
1279                self.handle_peer_session(info, messages);
1280            }
1281            _ => {}
1282        }
1283    }
1284
1285    /// Handles dedicated transaction events related to the `eth` protocol.
1286    fn on_network_tx_event(&mut self, event: NetworkTransactionEvent<N>) {
1287        match event {
1288            NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
1289                // ensure we didn't receive any blob transactions as these are disallowed to be
1290                // broadcasted in full
1291
1292                let has_blob_txs = msg.has_eip4844();
1293
1294                let non_blob_txs = msg
1295                    .0
1296                    .into_iter()
1297                    .map(N::PooledTransaction::try_from)
1298                    .filter_map(Result::ok)
1299                    .collect();
1300
1301                self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
1302
1303                if has_blob_txs {
1304                    debug!(target: "net::tx", ?peer_id, "received bad full blob transaction broadcast");
1305                    self.report_peer_bad_transactions(peer_id);
1306                }
1307            }
1308            NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
1309                self.on_new_pooled_transaction_hashes(peer_id, msg)
1310            }
1311            NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
1312                self.on_get_pooled_transactions(peer_id, request, response)
1313            }
1314            NetworkTransactionEvent::GetTransactionsHandle(response) => {
1315                let _ = response.send(Some(self.handle()));
1316            }
1317        }
1318    }
1319
1320    /// Starts the import process for the given transactions.
1321    fn import_transactions(
1322        &mut self,
1323        peer_id: PeerId,
1324        transactions: PooledTransactions<N::PooledTransaction>,
1325        source: TransactionSource,
1326    ) {
1327        // If the node is pipeline syncing, ignore transactions
1328        if self.network.is_initially_syncing() {
1329            return
1330        }
1331        if self.network.tx_gossip_disabled() {
1332            return
1333        }
1334
1335        let Some(peer) = self.peers.get_mut(&peer_id) else { return };
1336        let mut transactions = transactions.0;
1337
1338        // mark the transactions as received
1339        self.transaction_fetcher
1340            .remove_hashes_from_transaction_fetcher(transactions.iter().map(|tx| *tx.tx_hash()));
1341
1342        // track that the peer knows these transaction, but only if this is a new broadcast.
1343        // If we received the transactions as the response to our `GetPooledTransactions``
1344        // requests (based on received `NewPooledTransactionHashes`) then we already
1345        // recorded the hashes as seen by this peer in `Self::on_new_pooled_transaction_hashes`.
1346        let mut num_already_seen_by_peer = 0;
1347        for tx in &transactions {
1348            if source.is_broadcast() && !peer.seen_transactions.insert(*tx.tx_hash()) {
1349                num_already_seen_by_peer += 1;
1350            }
1351        }
1352
1353        // 1. filter out txns already inserted into pool
1354        let txns_count_pre_pool_filter = transactions.len();
1355        self.pool.retain_unknown(&mut transactions);
1356        if txns_count_pre_pool_filter > transactions.len() {
1357            let already_known_txns_count = txns_count_pre_pool_filter - transactions.len();
1358            self.metrics
1359                .occurrences_transactions_already_in_pool
1360                .increment(already_known_txns_count as u64);
1361        }
1362
1363        // tracks the quality of the given transactions
1364        let mut has_bad_transactions = false;
1365
1366        // 2. filter out transactions that are invalid or already pending import pre-size to avoid
1367        //    reallocations
1368        let mut new_txs = Vec::with_capacity(transactions.len());
1369        for tx in transactions {
1370            // recover transaction
1371            let tx = match tx.try_into_recovered() {
1372                Ok(tx) => tx,
1373                Err(badtx) => {
1374                    trace!(target: "net::tx",
1375                        peer_id=format!("{peer_id:#}"),
1376                        hash=%badtx.tx_hash(),
1377                        client_version=%peer.client_version,
1378                        "failed ecrecovery for transaction"
1379                    );
1380                    has_bad_transactions = true;
1381                    continue
1382                }
1383            };
1384
1385            match self.transactions_by_peers.entry(*tx.tx_hash()) {
1386                Entry::Occupied(mut entry) => {
1387                    // transaction was already inserted
1388                    entry.get_mut().insert(peer_id);
1389                }
1390                Entry::Vacant(entry) => {
1391                    if self.bad_imports.contains(tx.tx_hash()) {
1392                        trace!(target: "net::tx",
1393                            peer_id=format!("{peer_id:#}"),
1394                            hash=%tx.tx_hash(),
1395                            client_version=%peer.client_version,
1396                            "received a known bad transaction from peer"
1397                        );
1398                        has_bad_transactions = true;
1399                    } else {
1400                        // this is a new transaction that should be imported into the pool
1401
1402                        let pool_transaction = Pool::Transaction::from_pooled(tx);
1403                        new_txs.push(pool_transaction);
1404
1405                        entry.insert(HashSet::from([peer_id]));
1406                    }
1407                }
1408            }
1409        }
1410        new_txs.shrink_to_fit();
1411
1412        // 3. import new transactions as a batch to minimize lock contention on the underlying
1413        // pool
1414        if !new_txs.is_empty() {
1415            let pool = self.pool.clone();
1416            // update metrics
1417            let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
1418            metric_pending_pool_imports.increment(new_txs.len() as f64);
1419
1420            // update self-monitoring info
1421            self.pending_pool_imports_info
1422                .pending_pool_imports
1423                .fetch_add(new_txs.len(), Ordering::Relaxed);
1424            let tx_manager_info_pending_pool_imports =
1425                self.pending_pool_imports_info.pending_pool_imports.clone();
1426
1427            trace!(target: "net::tx::propagation", new_txs_len=?new_txs.len(), "Importing new transactions");
1428            let import = Box::pin(async move {
1429                let added = new_txs.len();
1430                let res = pool.add_external_transactions(new_txs).await;
1431
1432                // update metrics
1433                metric_pending_pool_imports.decrement(added as f64);
1434                // update self-monitoring info
1435                tx_manager_info_pending_pool_imports.fetch_sub(added, Ordering::Relaxed);
1436
1437                res
1438            });
1439
1440            self.pool_imports.push(import);
1441        }
1442
1443        if num_already_seen_by_peer > 0 {
1444            self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
1445            self.metrics
1446                .occurrences_of_transaction_already_seen_by_peer
1447                .increment(num_already_seen_by_peer);
1448            trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions");
1449        }
1450
1451        if has_bad_transactions {
1452            // peer sent us invalid transactions
1453            self.report_peer_bad_transactions(peer_id)
1454        }
1455
1456        if num_already_seen_by_peer > 0 {
1457            self.report_already_seen(peer_id);
1458        }
1459    }
1460
1461    /// Processes a [`FetchEvent`].
1462    fn on_fetch_event(&mut self, fetch_event: FetchEvent<N::PooledTransaction>) {
1463        match fetch_event {
1464            FetchEvent::TransactionsFetched { peer_id, transactions, report_peer } => {
1465                self.import_transactions(peer_id, transactions, TransactionSource::Response);
1466                if report_peer {
1467                    self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
1468                }
1469            }
1470            FetchEvent::FetchError { peer_id, error } => {
1471                trace!(target: "net::tx", ?peer_id, %error, "requesting transactions from peer failed");
1472                self.on_request_error(peer_id, error);
1473            }
1474            FetchEvent::EmptyResponse { peer_id } => {
1475                trace!(target: "net::tx", ?peer_id, "peer returned empty response");
1476            }
1477        }
1478    }
1479}
1480
1481/// An endless future. Preemption ensure that future is non-blocking, nonetheless. See
1482/// [`crate::NetworkManager`] for more context on the design pattern.
1483///
1484/// This should be spawned or used as part of `tokio::select!`.
1485//
1486// spawned in `NodeConfig::start_network`(reth_node_core::NodeConfig) and
1487// `NetworkConfig::start_network`(reth_network::NetworkConfig)
1488impl<
1489        Pool: TransactionPool + Unpin + 'static,
1490        N: NetworkPrimitives<
1491                BroadcastedTransaction: SignedTransaction,
1492                PooledTransaction: SignedTransaction,
1493            > + Unpin,
1494        PBundle: TransactionPolicies + Unpin,
1495    > Future for TransactionsManager<Pool, N, PBundle>
1496where
1497    Pool::Transaction:
1498        PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
1499{
1500    type Output = ();
1501
1502    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1503        let start = Instant::now();
1504        let mut poll_durations = TxManagerPollDurations::default();
1505
1506        let this = self.get_mut();
1507
1508        // All streams are polled until their corresponding budget is exhausted, then we manually
1509        // yield back control to tokio. See `NetworkManager` for more context on the design
1510        // pattern.
1511
1512        // Advance network/peer related events (update peers map).
1513        let maybe_more_network_events = metered_poll_nested_stream_with_budget!(
1514            poll_durations.acc_network_events,
1515            "net::tx",
1516            "Network events stream",
1517            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1518            this.network_events.poll_next_unpin(cx),
1519            |event| this.on_network_event(event)
1520        );
1521
1522        // Advances new __pending__ transactions, transactions that were successfully inserted into
1523        // pending set in pool (are valid), and propagates them (inform peers which
1524        // transactions we have seen).
1525        //
1526        // We try to drain this to batch the transactions in a single message.
1527        //
1528        // We don't expect this buffer to be large, since only pending transactions are
1529        // emitted here.
1530        let mut new_txs = Vec::new();
1531        let maybe_more_pending_txns = metered_poll_nested_stream_with_budget!(
1532            poll_durations.acc_imported_txns,
1533            "net::tx",
1534            "Pending transactions stream",
1535            DEFAULT_BUDGET_TRY_DRAIN_POOL_IMPORTS,
1536            this.pending_transactions.poll_next_unpin(cx),
1537            |hash| new_txs.push(hash)
1538        );
1539        if !new_txs.is_empty() {
1540            this.on_new_pending_transactions(new_txs);
1541        }
1542
1543        // Advance inflight fetch requests (flush transaction fetcher and queue for
1544        // import to pool).
1545        //
1546        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1547        // (2 MiB / 10 bytes > 200k transactions).
1548        //
1549        // Since transactions aren't validated until they are inserted into the pool,
1550        // this can potentially queue >200k transactions for insertion to pool. More
1551        // if the message size is bigger than the soft limit on a `PooledTransactions`
1552        // response which is 2 MiB.
1553        let maybe_more_tx_fetch_events = metered_poll_nested_stream_with_budget!(
1554            poll_durations.acc_fetch_events,
1555            "net::tx",
1556            "Transaction fetch events stream",
1557            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1558            this.transaction_fetcher.poll_next_unpin(cx),
1559            |event| this.on_fetch_event(event),
1560        );
1561
1562        // Advance incoming transaction events (stream new txns/announcements from
1563        // network manager and queue for import to pool/fetch txns).
1564        //
1565        // This will potentially remove hashes from hashes pending fetch, it the event
1566        // is an announcement (if same hashes are announced that didn't fit into a
1567        // previous request).
1568        //
1569        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1570        // (128 KiB / 10 bytes > 13k transactions).
1571        //
1572        // If this is an event with `Transactions` message, since transactions aren't
1573        // validated until they are inserted into the pool, this can potentially queue
1574        // >13k transactions for insertion to pool. More if the message size is bigger
1575        // than the soft limit on a `Transactions` broadcast message, which is 128 KiB.
1576        let maybe_more_tx_events = metered_poll_nested_stream_with_budget!(
1577            poll_durations.acc_tx_events,
1578            "net::tx",
1579            "Network transaction events stream",
1580            DEFAULT_BUDGET_TRY_DRAIN_NETWORK_TRANSACTION_EVENTS,
1581            this.transaction_events.poll_next_unpin(cx),
1582            |event| this.on_network_tx_event(event),
1583        );
1584
1585        // Advance pool imports (flush txns to pool).
1586        //
1587        // Note, this is done in batches. A batch is filled from one `Transactions`
1588        // broadcast messages or one `PooledTransactions` response at a time. The
1589        // minimum batch size is 1 transaction (and might often be the case with blob
1590        // transactions).
1591        //
1592        // The smallest decodable transaction is an empty legacy transaction, 10 bytes
1593        // (2 MiB / 10 bytes > 200k transactions).
1594        //
1595        // Since transactions aren't validated until they are inserted into the pool,
1596        // this can potentially validate >200k transactions. More if the message size
1597        // is bigger than the soft limit on a `PooledTransactions` response which is
1598        // 2 MiB (`Transactions` broadcast messages is smaller, 128 KiB).
1599        let maybe_more_pool_imports = metered_poll_nested_stream_with_budget!(
1600            poll_durations.acc_pending_imports,
1601            "net::tx",
1602            "Batched pool imports stream",
1603            DEFAULT_BUDGET_TRY_DRAIN_PENDING_POOL_IMPORTS,
1604            this.pool_imports.poll_next_unpin(cx),
1605            |batch_results| this.on_batch_import_result(batch_results)
1606        );
1607
1608        // Tries to drain hashes pending fetch cache if the tx manager currently has
1609        // capacity for this (fetch txns).
1610        //
1611        // Sends at most one request.
1612        duration_metered_exec!(
1613            {
1614                if this.has_capacity_for_fetching_pending_hashes() {
1615                    this.on_fetch_hashes_pending_fetch();
1616                }
1617            },
1618            poll_durations.acc_pending_fetch
1619        );
1620
1621        // Advance commands (propagate/fetch/serve txns).
1622        let maybe_more_commands = metered_poll_nested_stream_with_budget!(
1623            poll_durations.acc_cmds,
1624            "net::tx",
1625            "Commands channel",
1626            DEFAULT_BUDGET_TRY_DRAIN_STREAM,
1627            this.command_rx.poll_next_unpin(cx),
1628            |cmd| this.on_command(cmd)
1629        );
1630
1631        this.transaction_fetcher.update_metrics();
1632
1633        // all channels are fully drained and import futures pending
1634        if maybe_more_network_events ||
1635            maybe_more_commands ||
1636            maybe_more_tx_events ||
1637            maybe_more_tx_fetch_events ||
1638            maybe_more_pool_imports ||
1639            maybe_more_pending_txns
1640        {
1641            // make sure we're woken up again
1642            cx.waker().wake_by_ref();
1643            return Poll::Pending
1644        }
1645
1646        this.update_poll_metrics(start, poll_durations);
1647
1648        Poll::Pending
1649    }
1650}
1651
1652/// Represents the different modes of transaction propagation.
1653///
1654/// This enum is used to determine how transactions are propagated to peers in the network.
1655#[derive(Debug, Copy, Clone, Eq, PartialEq)]
1656enum PropagationMode {
1657    /// Default propagation mode.
1658    ///
1659    /// Transactions are only sent to peers that haven't seen them yet.
1660    Basic,
1661    /// Forced propagation mode.
1662    ///
1663    /// Transactions are sent to all peers regardless of whether they have been sent or received
1664    /// before.
1665    Forced,
1666}
1667
1668impl PropagationMode {
1669    /// Returns `true` if the propagation kind is `Forced`.
1670    const fn is_forced(self) -> bool {
1671        matches!(self, Self::Forced)
1672    }
1673}
1674
1675/// A transaction that's about to be propagated to multiple peers.
1676#[derive(Debug, Clone)]
1677struct PropagateTransaction<T = TransactionSigned> {
1678    size: usize,
1679    transaction: Arc<T>,
1680}
1681
1682impl<T: SignedTransaction> PropagateTransaction<T> {
1683    /// Create a new instance from a transaction.
1684    pub fn new(transaction: T) -> Self {
1685        let size = transaction.length();
1686        Self { size, transaction: Arc::new(transaction) }
1687    }
1688
1689    /// Create a new instance from a pooled transaction
1690    fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
1691    where
1692        P: PoolTransaction<Consensus = T>,
1693    {
1694        let size = tx.encoded_length();
1695        let transaction = tx.transaction.clone_into_consensus();
1696        let transaction = Arc::new(transaction.into_inner());
1697        Self { size, transaction }
1698    }
1699
1700    fn tx_hash(&self) -> &TxHash {
1701        self.transaction.tx_hash()
1702    }
1703}
1704
1705/// Helper type to construct the appropriate message to send to the peer based on whether the peer
1706/// should receive them in full or as pooled
1707#[derive(Debug, Clone)]
1708enum PropagateTransactionsBuilder<T> {
1709    Pooled(PooledTransactionsHashesBuilder),
1710    Full(FullTransactionsBuilder<T>),
1711}
1712
1713impl<T> PropagateTransactionsBuilder<T> {
1714    /// Create a builder for pooled transactions
1715    fn pooled(version: EthVersion) -> Self {
1716        Self::Pooled(PooledTransactionsHashesBuilder::new(version))
1717    }
1718
1719    /// Create a builder that sends transactions in full and records transactions that don't fit.
1720    fn full(version: EthVersion) -> Self {
1721        Self::Full(FullTransactionsBuilder::new(version))
1722    }
1723
1724    /// Returns true if no transactions are recorded.
1725    fn is_empty(&self) -> bool {
1726        match self {
1727            Self::Pooled(builder) => builder.is_empty(),
1728            Self::Full(builder) => builder.is_empty(),
1729        }
1730    }
1731
1732    /// Consumes the type and returns the built messages that should be sent to the peer.
1733    fn build(self) -> PropagateTransactions<T> {
1734        match self {
1735            Self::Pooled(pooled) => {
1736                PropagateTransactions { pooled: Some(pooled.build()), full: None }
1737            }
1738            Self::Full(full) => full.build(),
1739        }
1740    }
1741}
1742
1743impl<T: SignedTransaction> PropagateTransactionsBuilder<T> {
1744    /// Appends all transactions
1745    fn extend<'a>(&mut self, txs: impl IntoIterator<Item = &'a PropagateTransaction<T>>) {
1746        for tx in txs {
1747            self.push(tx);
1748        }
1749    }
1750
1751    /// Appends a transaction to the list.
1752    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1753        match self {
1754            Self::Pooled(builder) => builder.push(transaction),
1755            Self::Full(builder) => builder.push(transaction),
1756        }
1757    }
1758}
1759
1760/// Represents how the transactions should be sent to a peer if any.
1761struct PropagateTransactions<T> {
1762    /// The pooled transaction hashes to send.
1763    pooled: Option<NewPooledTransactionHashes>,
1764    /// The transactions to send in full.
1765    full: Option<Vec<Arc<T>>>,
1766}
1767
1768/// Helper type for constructing the full transaction message that enforces the
1769/// [`DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE`] for full transaction broadcast
1770/// and enforces other propagation rules for EIP-4844 and tracks those transactions that can't be
1771/// broadcasted in full.
1772#[derive(Debug, Clone)]
1773struct FullTransactionsBuilder<T> {
1774    /// The soft limit to enforce for a single broadcast message of full transactions.
1775    total_size: usize,
1776    /// All transactions to be broadcasted.
1777    transactions: Vec<Arc<T>>,
1778    /// Transactions that didn't fit into the broadcast message
1779    pooled: PooledTransactionsHashesBuilder,
1780}
1781
1782impl<T> FullTransactionsBuilder<T> {
1783    /// Create a builder for the negotiated version of the peer's session
1784    fn new(version: EthVersion) -> Self {
1785        Self {
1786            total_size: 0,
1787            pooled: PooledTransactionsHashesBuilder::new(version),
1788            transactions: vec![],
1789        }
1790    }
1791
1792    /// Returns whether or not any transactions are in the [`FullTransactionsBuilder`].
1793    fn is_empty(&self) -> bool {
1794        self.transactions.is_empty() && self.pooled.is_empty()
1795    }
1796
1797    /// Returns the messages that should be propagated to the peer.
1798    fn build(self) -> PropagateTransactions<T> {
1799        let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
1800        let full = Some(self.transactions).filter(|full| !full.is_empty());
1801        PropagateTransactions { pooled, full }
1802    }
1803}
1804
1805impl<T: SignedTransaction> FullTransactionsBuilder<T> {
1806    /// Appends all transactions.
1807    fn extend(&mut self, txs: impl IntoIterator<Item = PropagateTransaction<T>>) {
1808        for tx in txs {
1809            self.push(&tx)
1810        }
1811    }
1812
1813    /// Append a transaction to the list of full transaction if the total message bytes size doesn't
1814    /// exceed the soft maximum target byte size. The limit is soft, meaning if one single
1815    /// transaction goes over the limit, it will be broadcasted in its own [`Transactions`]
1816    /// message. The same pattern is followed in filling a [`GetPooledTransactions`] request in
1817    /// [`TransactionFetcher::fill_request_from_hashes_pending_fetch`].
1818    ///
1819    /// If the transaction is unsuitable for broadcast or would exceed the softlimit, it is appended
1820    /// to list of pooled transactions, (e.g. 4844 transactions).
1821    /// See also [`SignedTransaction::is_broadcastable_in_full`].
1822    fn push(&mut self, transaction: &PropagateTransaction<T>) {
1823        // Do not send full 4844 transaction hashes to peers.
1824        //
1825        //  Nodes MUST NOT automatically broadcast blob transactions to their peers.
1826        //  Instead, those transactions are only announced using
1827        //  `NewPooledTransactionHashes` messages, and can then be manually requested
1828        //  via `GetPooledTransactions`.
1829        //
1830        // From: <https://eips.ethereum.org/EIPS/eip-4844#networking>
1831        if !transaction.transaction.is_broadcastable_in_full() {
1832            self.pooled.push(transaction);
1833            return
1834        }
1835
1836        let new_size = self.total_size + transaction.size;
1837        if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
1838            self.total_size > 0
1839        {
1840            // transaction does not fit into the message
1841            self.pooled.push(transaction);
1842            return
1843        }
1844
1845        self.total_size = new_size;
1846        self.transactions.push(Arc::clone(&transaction.transaction));
1847    }
1848}
1849
1850/// A helper type to create the pooled transactions message based on the negotiated version of the
1851/// session with the peer
1852#[derive(Debug, Clone)]
1853enum PooledTransactionsHashesBuilder {
1854    Eth66(NewPooledTransactionHashes66),
1855    Eth68(NewPooledTransactionHashes68),
1856}
1857
1858// === impl PooledTransactionsHashesBuilder ===
1859
1860impl PooledTransactionsHashesBuilder {
1861    /// Push a transaction from the pool to the list.
1862    fn push_pooled<T: PoolTransaction>(&mut self, pooled_tx: Arc<ValidPoolTransaction<T>>) {
1863        match self {
1864            Self::Eth66(msg) => msg.0.push(*pooled_tx.hash()),
1865            Self::Eth68(msg) => {
1866                msg.hashes.push(*pooled_tx.hash());
1867                msg.sizes.push(pooled_tx.encoded_length());
1868                msg.types.push(pooled_tx.transaction.ty());
1869            }
1870        }
1871    }
1872
1873    /// Returns whether or not any transactions are in the [`PooledTransactionsHashesBuilder`].
1874    fn is_empty(&self) -> bool {
1875        match self {
1876            Self::Eth66(hashes) => hashes.is_empty(),
1877            Self::Eth68(hashes) => hashes.is_empty(),
1878        }
1879    }
1880
1881    /// Appends all hashes
1882    fn extend<T: SignedTransaction>(
1883        &mut self,
1884        txs: impl IntoIterator<Item = PropagateTransaction<T>>,
1885    ) {
1886        for tx in txs {
1887            self.push(&tx);
1888        }
1889    }
1890
1891    fn push<T: SignedTransaction>(&mut self, tx: &PropagateTransaction<T>) {
1892        match self {
1893            Self::Eth66(msg) => msg.0.push(*tx.tx_hash()),
1894            Self::Eth68(msg) => {
1895                msg.hashes.push(*tx.tx_hash());
1896                msg.sizes.push(tx.size);
1897                msg.types.push(tx.transaction.ty());
1898            }
1899        }
1900    }
1901
1902    /// Create a builder for the negotiated version of the peer's session
1903    fn new(version: EthVersion) -> Self {
1904        match version {
1905            EthVersion::Eth66 | EthVersion::Eth67 => Self::Eth66(Default::default()),
1906            EthVersion::Eth68 | EthVersion::Eth69 => Self::Eth68(Default::default()),
1907        }
1908    }
1909
1910    fn build(self) -> NewPooledTransactionHashes {
1911        match self {
1912            Self::Eth66(msg) => msg.into(),
1913            Self::Eth68(msg) => msg.into(),
1914        }
1915    }
1916}
1917
1918/// How we received the transactions.
1919enum TransactionSource {
1920    /// Transactions were broadcast to us via [`Transactions`] message.
1921    Broadcast,
1922    /// Transactions were sent as the response of [`fetcher::GetPooledTxRequest`] issued by us.
1923    Response,
1924}
1925
1926// === impl TransactionSource ===
1927
1928impl TransactionSource {
1929    /// Whether the transaction were sent as broadcast.
1930    const fn is_broadcast(&self) -> bool {
1931        matches!(self, Self::Broadcast)
1932    }
1933}
1934
1935/// Tracks a single peer in the context of [`TransactionsManager`].
1936#[derive(Debug)]
1937pub struct PeerMetadata<N: NetworkPrimitives = EthNetworkPrimitives> {
1938    /// Optimistically keeps track of transactions that we know the peer has seen. Optimistic, in
1939    /// the sense that transactions are preemptively marked as seen by peer when they are sent to
1940    /// the peer.
1941    seen_transactions: LruCache<TxHash>,
1942    /// A communication channel directly to the peer's session task.
1943    request_tx: PeerRequestSender<PeerRequest<N>>,
1944    /// negotiated version of the session.
1945    version: EthVersion,
1946    /// The peer's client version.
1947    client_version: Arc<str>,
1948    /// The kind of peer.
1949    peer_kind: PeerKind,
1950}
1951
1952impl<N: NetworkPrimitives> PeerMetadata<N> {
1953    /// Returns a new instance of [`PeerMetadata`].
1954    pub fn new(
1955        request_tx: PeerRequestSender<PeerRequest<N>>,
1956        version: EthVersion,
1957        client_version: Arc<str>,
1958        max_transactions_seen_by_peer: u32,
1959        peer_kind: PeerKind,
1960    ) -> Self {
1961        Self {
1962            seen_transactions: LruCache::new(max_transactions_seen_by_peer),
1963            request_tx,
1964            version,
1965            client_version,
1966            peer_kind,
1967        }
1968    }
1969
1970    /// Returns a reference to the peer's request sender channel.
1971    pub const fn request_tx(&self) -> &PeerRequestSender<PeerRequest<N>> {
1972        &self.request_tx
1973    }
1974
1975    /// Return a
1976    pub const fn seen_transactions_mut(&mut self) -> &mut LruCache<TxHash> {
1977        &mut self.seen_transactions
1978    }
1979
1980    /// Returns the negotiated `EthVersion` of the session.
1981    pub const fn version(&self) -> EthVersion {
1982        self.version
1983    }
1984
1985    /// Returns a reference to the peer's client version string.
1986    pub fn client_version(&self) -> &str {
1987        &self.client_version
1988    }
1989
1990    /// Returns the peer's kind.
1991    pub const fn peer_kind(&self) -> PeerKind {
1992        self.peer_kind
1993    }
1994}
1995
1996/// Commands to send to the [`TransactionsManager`]
1997#[derive(Debug)]
1998enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
1999    /// Propagate a transaction hash to the network.
2000    PropagateHash(B256),
2001    /// Propagate transaction hashes to a specific peer.
2002    PropagateHashesTo(Vec<B256>, PeerId),
2003    /// Request the list of active peer IDs from the [`TransactionsManager`].
2004    GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
2005    /// Propagate a collection of full transactions to a specific peer.
2006    PropagateTransactionsTo(Vec<TxHash>, PeerId),
2007    /// Propagate a collection of hashes to all peers.
2008    PropagateTransactions(Vec<TxHash>),
2009    /// Propagate a collection of broadcastable transactions in full to all peers.
2010    BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
2011    /// Request transaction hashes known by specific peers from the [`TransactionsManager`].
2012    GetTransactionHashes {
2013        peers: Vec<PeerId>,
2014        tx: oneshot::Sender<HashMap<PeerId, HashSet<TxHash>>>,
2015    },
2016    /// Requests a clone of the sender channel to the peer.
2017    GetPeerSender {
2018        peer_id: PeerId,
2019        peer_request_sender: oneshot::Sender<Option<PeerRequestSender<PeerRequest<N>>>>,
2020    },
2021}
2022
2023/// All events related to transactions emitted by the network.
2024#[derive(Debug)]
2025pub enum NetworkTransactionEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
2026    /// Represents the event of receiving a list of transactions from a peer.
2027    ///
2028    /// This indicates transactions that were broadcasted to us from the peer.
2029    IncomingTransactions {
2030        /// The ID of the peer from which the transactions were received.
2031        peer_id: PeerId,
2032        /// The received transactions.
2033        msg: Transactions<N::BroadcastedTransaction>,
2034    },
2035    /// Represents the event of receiving a list of transaction hashes from a peer.
2036    IncomingPooledTransactionHashes {
2037        /// The ID of the peer from which the transaction hashes were received.
2038        peer_id: PeerId,
2039        /// The received new pooled transaction hashes.
2040        msg: NewPooledTransactionHashes,
2041    },
2042    /// Represents the event of receiving a `GetPooledTransactions` request from a peer.
2043    GetPooledTransactions {
2044        /// The ID of the peer from which the request was received.
2045        peer_id: PeerId,
2046        /// The received `GetPooledTransactions` request.
2047        request: GetPooledTransactions,
2048        /// The sender for responding to the request with a result of `PooledTransactions`.
2049        response: oneshot::Sender<RequestResult<PooledTransactions<N::PooledTransaction>>>,
2050    },
2051    /// Represents the event of receiving a `GetTransactionsHandle` request.
2052    GetTransactionsHandle(oneshot::Sender<Option<TransactionsHandle<N>>>),
2053}
2054
2055/// Tracks stats about the [`TransactionsManager`].
2056#[derive(Debug)]
2057pub struct PendingPoolImportsInfo {
2058    /// Number of transactions about to be inserted into the pool.
2059    pending_pool_imports: Arc<AtomicUsize>,
2060    /// Max number of transactions allowed to be imported concurrently.
2061    max_pending_pool_imports: usize,
2062}
2063
2064impl PendingPoolImportsInfo {
2065    /// Returns a new [`PendingPoolImportsInfo`].
2066    pub fn new(max_pending_pool_imports: usize) -> Self {
2067        Self { pending_pool_imports: Arc::new(AtomicUsize::default()), max_pending_pool_imports }
2068    }
2069
2070    /// Returns `true` if the number of pool imports is under a given tolerated max.
2071    pub fn has_capacity(&self, max_pending_pool_imports: usize) -> bool {
2072        self.pending_pool_imports.load(Ordering::Relaxed) < max_pending_pool_imports
2073    }
2074}
2075
2076impl Default for PendingPoolImportsInfo {
2077    fn default() -> Self {
2078        Self::new(DEFAULT_MAX_COUNT_PENDING_POOL_IMPORTS)
2079    }
2080}
2081
2082#[derive(Debug, Default)]
2083struct TxManagerPollDurations {
2084    acc_network_events: Duration,
2085    acc_pending_imports: Duration,
2086    acc_tx_events: Duration,
2087    acc_imported_txns: Duration,
2088    acc_fetch_events: Duration,
2089    acc_pending_fetch: Duration,
2090    acc_cmds: Duration,
2091}
2092
2093#[cfg(test)]
2094mod tests {
2095    use super::*;
2096    use crate::{
2097        test_utils::{
2098            transactions::{buffer_hash_to_tx_fetcher, new_mock_session, new_tx_manager},
2099            Testnet,
2100        },
2101        transactions::config::RelaxedEthAnnouncementFilter,
2102        NetworkConfigBuilder, NetworkManager,
2103    };
2104    use alloy_consensus::{TxEip1559, TxLegacy};
2105    use alloy_primitives::{hex, Signature, TxKind, U256};
2106    use alloy_rlp::Decodable;
2107    use futures::FutureExt;
2108    use reth_chainspec::MIN_TRANSACTION_GAS;
2109    use reth_ethereum_primitives::{PooledTransactionVariant, Transaction, TransactionSigned};
2110    use reth_network_api::{NetworkInfo, PeerKind};
2111    use reth_network_p2p::{
2112        error::{RequestError, RequestResult},
2113        sync::{NetworkSyncUpdater, SyncState},
2114    };
2115    use reth_storage_api::noop::NoopProvider;
2116    use reth_transaction_pool::test_utils::{
2117        testing_pool, MockTransaction, MockTransactionFactory, TestPool,
2118    };
2119    use secp256k1::SecretKey;
2120    use std::{
2121        future::poll_fn,
2122        net::{IpAddr, Ipv4Addr, SocketAddr},
2123        str::FromStr,
2124    };
2125    use tracing::error;
2126
2127    #[tokio::test(flavor = "multi_thread")]
2128    async fn test_ignored_tx_broadcasts_while_initially_syncing() {
2129        reth_tracing::init_test_tracing();
2130        let net = Testnet::create(3).await;
2131
2132        let mut handles = net.handles();
2133        let handle0 = handles.next().unwrap();
2134        let handle1 = handles.next().unwrap();
2135
2136        drop(handles);
2137        let handle = net.spawn();
2138
2139        let listener0 = handle0.event_listener();
2140        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2141        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2142
2143        let client = NoopProvider::default();
2144        let pool = testing_pool();
2145        let config = NetworkConfigBuilder::eth(secret_key)
2146            .disable_discovery()
2147            .listener_port(0)
2148            .build(client);
2149        let transactions_manager_config = config.transactions_manager_config.clone();
2150        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2151            .await
2152            .unwrap()
2153            .into_builder()
2154            .transactions(pool.clone(), transactions_manager_config)
2155            .split_with_handle();
2156
2157        tokio::task::spawn(network);
2158
2159        // go to syncing (pipeline sync)
2160        network_handle.update_sync_state(SyncState::Syncing);
2161        assert!(NetworkInfo::is_syncing(&network_handle));
2162        assert!(NetworkInfo::is_initially_syncing(&network_handle));
2163
2164        // wait for all initiator connections
2165        let mut established = listener0.take(2);
2166        while let Some(ev) = established.next().await {
2167            match ev {
2168                NetworkEvent::Peer(PeerEvent::SessionEstablished(info)) => {
2169                    // to insert a new peer in transactions peerset
2170                    transactions
2171                        .on_network_event(NetworkEvent::Peer(PeerEvent::SessionEstablished(info)))
2172                }
2173                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2174                ev => {
2175                    error!("unexpected event {ev:?}")
2176                }
2177            }
2178        }
2179        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2180        let input = hex!(
2181            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2182        );
2183        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2184        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2185            peer_id: *handle1.peer_id(),
2186            msg: Transactions(vec![signed_tx.clone()]),
2187        });
2188        poll_fn(|cx| {
2189            let _ = transactions.poll_unpin(cx);
2190            Poll::Ready(())
2191        })
2192        .await;
2193        assert!(pool.is_empty());
2194        handle.terminate().await;
2195    }
2196
2197    #[tokio::test(flavor = "multi_thread")]
2198    async fn test_tx_broadcasts_through_two_syncs() {
2199        reth_tracing::init_test_tracing();
2200        let net = Testnet::create(3).await;
2201
2202        let mut handles = net.handles();
2203        let handle0 = handles.next().unwrap();
2204        let handle1 = handles.next().unwrap();
2205
2206        drop(handles);
2207        let handle = net.spawn();
2208
2209        let listener0 = handle0.event_listener();
2210        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2211        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2212
2213        let client = NoopProvider::default();
2214        let pool = testing_pool();
2215        let config = NetworkConfigBuilder::new(secret_key)
2216            .disable_discovery()
2217            .listener_port(0)
2218            .build(client);
2219        let transactions_manager_config = config.transactions_manager_config.clone();
2220        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2221            .await
2222            .unwrap()
2223            .into_builder()
2224            .transactions(pool.clone(), transactions_manager_config)
2225            .split_with_handle();
2226
2227        tokio::task::spawn(network);
2228
2229        // go to syncing (pipeline sync) to idle and then to syncing (live)
2230        network_handle.update_sync_state(SyncState::Syncing);
2231        assert!(NetworkInfo::is_syncing(&network_handle));
2232        network_handle.update_sync_state(SyncState::Idle);
2233        assert!(!NetworkInfo::is_syncing(&network_handle));
2234        network_handle.update_sync_state(SyncState::Syncing);
2235        assert!(NetworkInfo::is_syncing(&network_handle));
2236
2237        // wait for all initiator connections
2238        let mut established = listener0.take(2);
2239        while let Some(ev) = established.next().await {
2240            match ev {
2241                NetworkEvent::ActivePeerSession { .. } |
2242                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2243                    // to insert a new peer in transactions peerset
2244                    transactions.on_network_event(ev);
2245                }
2246                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2247                _ => {
2248                    error!("unexpected event {ev:?}")
2249                }
2250            }
2251        }
2252        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2253        let input = hex!(
2254            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2255        );
2256        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2257        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2258            peer_id: *handle1.peer_id(),
2259            msg: Transactions(vec![signed_tx.clone()]),
2260        });
2261        poll_fn(|cx| {
2262            let _ = transactions.poll_unpin(cx);
2263            Poll::Ready(())
2264        })
2265        .await;
2266        assert!(!NetworkInfo::is_initially_syncing(&network_handle));
2267        assert!(NetworkInfo::is_syncing(&network_handle));
2268        assert!(!pool.is_empty());
2269        handle.terminate().await;
2270    }
2271
2272    // Ensure that the transaction manager correctly handles the `IncomingPooledTransactionHashes`
2273    // event and is able to retrieve the corresponding transactions.
2274    #[tokio::test(flavor = "multi_thread")]
2275    async fn test_handle_incoming_transactions_hashes() {
2276        reth_tracing::init_test_tracing();
2277
2278        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2279        let client = NoopProvider::default();
2280
2281        let config = NetworkConfigBuilder::new(secret_key)
2282            // let OS choose port
2283            .listener_port(0)
2284            .disable_discovery()
2285            .build(client);
2286
2287        let pool = testing_pool();
2288
2289        let transactions_manager_config = config.transactions_manager_config.clone();
2290        let (_network_handle, _network, mut tx_manager, _) = NetworkManager::new(config)
2291            .await
2292            .unwrap()
2293            .into_builder()
2294            .transactions(pool.clone(), transactions_manager_config)
2295            .split_with_handle();
2296
2297        let peer_id_1 = PeerId::new([1; 64]);
2298        let eth_version = EthVersion::Eth66;
2299
2300        let txs = vec![TransactionSigned::new_unhashed(
2301            Transaction::Legacy(TxLegacy {
2302                chain_id: Some(4),
2303                nonce: 15u64,
2304                gas_price: 2200000000,
2305                gas_limit: 34811,
2306                to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2307                value: U256::from(1234u64),
2308                input: Default::default(),
2309            }),
2310            Signature::new(
2311                U256::from_str(
2312                    "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2313                )
2314                .unwrap(),
2315                U256::from_str(
2316                    "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2317                )
2318                .unwrap(),
2319                true,
2320            ),
2321        )];
2322
2323        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2324
2325        let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2326        tx_manager.peers.insert(peer_id_1, peer_1);
2327
2328        assert!(pool.is_empty());
2329
2330        tx_manager.on_network_tx_event(NetworkTransactionEvent::IncomingPooledTransactionHashes {
2331            peer_id: peer_id_1,
2332            msg: NewPooledTransactionHashes::from(NewPooledTransactionHashes66::from(
2333                txs_hashes.clone(),
2334            )),
2335        });
2336
2337        // mock session of peer_1 receives request
2338        let req = to_mock_session_rx
2339            .recv()
2340            .await
2341            .expect("peer_1 session should receive request with buffered hashes");
2342        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2343        assert_eq!(request, GetPooledTransactions::from(txs_hashes.clone()));
2344
2345        let message: Vec<PooledTransactionVariant> = txs
2346            .into_iter()
2347            .map(|tx| {
2348                PooledTransactionVariant::try_from(tx)
2349                    .expect("Failed to convert MockTransaction to PooledTransaction")
2350            })
2351            .collect();
2352
2353        // return the transactions corresponding to the transaction hashes.
2354        response
2355            .send(Ok(PooledTransactions(message)))
2356            .expect("should send peer_1 response to tx manager");
2357
2358        // adance the transaction manager future
2359        poll_fn(|cx| {
2360            let _ = tx_manager.poll_unpin(cx);
2361            Poll::Ready(())
2362        })
2363        .await;
2364
2365        // ensure that the transactions corresponding to the transaction hashes have been
2366        // successfully retrieved and stored in the Pool.
2367        assert_eq!(pool.get_all(txs_hashes.clone()).len(), txs_hashes.len());
2368    }
2369
2370    #[tokio::test(flavor = "multi_thread")]
2371    async fn test_handle_incoming_transactions() {
2372        reth_tracing::init_test_tracing();
2373        let net = Testnet::create(3).await;
2374
2375        let mut handles = net.handles();
2376        let handle0 = handles.next().unwrap();
2377        let handle1 = handles.next().unwrap();
2378
2379        drop(handles);
2380        let handle = net.spawn();
2381
2382        let listener0 = handle0.event_listener();
2383
2384        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2385        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2386
2387        let client = NoopProvider::default();
2388        let pool = testing_pool();
2389        let config = NetworkConfigBuilder::new(secret_key)
2390            .disable_discovery()
2391            .listener_port(0)
2392            .build(client);
2393        let transactions_manager_config = config.transactions_manager_config.clone();
2394        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2395            .await
2396            .unwrap()
2397            .into_builder()
2398            .transactions(pool.clone(), transactions_manager_config)
2399            .split_with_handle();
2400        tokio::task::spawn(network);
2401
2402        network_handle.update_sync_state(SyncState::Idle);
2403
2404        assert!(!NetworkInfo::is_syncing(&network_handle));
2405
2406        // wait for all initiator connections
2407        let mut established = listener0.take(2);
2408        while let Some(ev) = established.next().await {
2409            match ev {
2410                NetworkEvent::ActivePeerSession { .. } |
2411                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2412                    // to insert a new peer in transactions peerset
2413                    transactions.on_network_event(ev);
2414                }
2415                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2416                ev => {
2417                    error!("unexpected event {ev:?}")
2418                }
2419            }
2420        }
2421        // random tx: <https://etherscan.io/getRawTx?tx=0x9448608d36e721ef403c53b00546068a6474d6cbab6816c3926de449898e7bce>
2422        let input = hex!(
2423            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
2424        );
2425        let signed_tx = TransactionSigned::decode(&mut &input[..]).unwrap();
2426        transactions.on_network_tx_event(NetworkTransactionEvent::IncomingTransactions {
2427            peer_id: *handle1.peer_id(),
2428            msg: Transactions(vec![signed_tx.clone()]),
2429        });
2430        assert!(transactions
2431            .transactions_by_peers
2432            .get(signed_tx.tx_hash())
2433            .unwrap()
2434            .contains(handle1.peer_id()));
2435
2436        // advance the transaction manager future
2437        poll_fn(|cx| {
2438            let _ = transactions.poll_unpin(cx);
2439            Poll::Ready(())
2440        })
2441        .await;
2442
2443        assert!(!pool.is_empty());
2444        assert!(pool.get(signed_tx.tx_hash()).is_some());
2445        handle.terminate().await;
2446    }
2447
2448    #[tokio::test(flavor = "multi_thread")]
2449    async fn test_on_get_pooled_transactions_network() {
2450        reth_tracing::init_test_tracing();
2451        let net = Testnet::create(2).await;
2452
2453        let mut handles = net.handles();
2454        let handle0 = handles.next().unwrap();
2455        let handle1 = handles.next().unwrap();
2456
2457        drop(handles);
2458        let handle = net.spawn();
2459
2460        let listener0 = handle0.event_listener();
2461
2462        handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
2463        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2464
2465        let client = NoopProvider::default();
2466        let pool = testing_pool();
2467        let config = NetworkConfigBuilder::new(secret_key)
2468            .disable_discovery()
2469            .listener_port(0)
2470            .build(client);
2471        let transactions_manager_config = config.transactions_manager_config.clone();
2472        let (network_handle, network, mut transactions, _) = NetworkManager::new(config)
2473            .await
2474            .unwrap()
2475            .into_builder()
2476            .transactions(pool.clone(), transactions_manager_config)
2477            .split_with_handle();
2478        tokio::task::spawn(network);
2479
2480        network_handle.update_sync_state(SyncState::Idle);
2481
2482        assert!(!NetworkInfo::is_syncing(&network_handle));
2483
2484        // wait for all initiator connections
2485        let mut established = listener0.take(2);
2486        while let Some(ev) = established.next().await {
2487            match ev {
2488                NetworkEvent::ActivePeerSession { .. } |
2489                NetworkEvent::Peer(PeerEvent::SessionEstablished(_)) => {
2490                    transactions.on_network_event(ev);
2491                }
2492                NetworkEvent::Peer(PeerEvent::PeerAdded(_peer_id)) => {}
2493                ev => {
2494                    error!("unexpected event {ev:?}")
2495                }
2496            }
2497        }
2498        handle.terminate().await;
2499
2500        let tx = MockTransaction::eip1559();
2501        let _ = transactions
2502            .pool
2503            .add_transaction(reth_transaction_pool::TransactionOrigin::External, tx.clone())
2504            .await;
2505
2506        let request = GetPooledTransactions(vec![*tx.get_hash()]);
2507
2508        let (send, receive) =
2509            oneshot::channel::<RequestResult<PooledTransactions<PooledTransactionVariant>>>();
2510
2511        transactions.on_network_tx_event(NetworkTransactionEvent::GetPooledTransactions {
2512            peer_id: *handle1.peer_id(),
2513            request,
2514            response: send,
2515        });
2516
2517        match receive.await.unwrap() {
2518            Ok(PooledTransactions(transactions)) => {
2519                assert_eq!(transactions.len(), 1);
2520            }
2521            Err(e) => {
2522                panic!("error: {e:?}");
2523            }
2524        }
2525    }
2526
2527    // Ensure that when the remote peer only returns part of the requested transactions, the
2528    // replied transactions are removed from the `tx_fetcher`, while the unresponsive ones are
2529    // re-buffered.
2530    #[tokio::test]
2531    async fn test_partially_tx_response() {
2532        reth_tracing::init_test_tracing();
2533
2534        let mut tx_manager = new_tx_manager().await.0;
2535        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2536
2537        let peer_id_1 = PeerId::new([1; 64]);
2538        let eth_version = EthVersion::Eth66;
2539
2540        let txs = vec![
2541            TransactionSigned::new_unhashed(
2542                Transaction::Legacy(TxLegacy {
2543                    chain_id: Some(4),
2544                    nonce: 15u64,
2545                    gas_price: 2200000000,
2546                    gas_limit: 34811,
2547                    to: TxKind::Call(hex!("cf7f9e66af820a19257a2108375b180b0ec49167").into()),
2548                    value: U256::from(1234u64),
2549                    input: Default::default(),
2550                }),
2551                Signature::new(
2552                    U256::from_str(
2553                        "0x35b7bfeb9ad9ece2cbafaaf8e202e706b4cfaeb233f46198f00b44d4a566a981",
2554                    )
2555                    .unwrap(),
2556                    U256::from_str(
2557                        "0x612638fb29427ca33b9a3be2a0a561beecfe0269655be160d35e72d366a6a860",
2558                    )
2559                    .unwrap(),
2560                    true,
2561                ),
2562            ),
2563            TransactionSigned::new_unhashed(
2564                Transaction::Eip1559(TxEip1559 {
2565                    chain_id: 4,
2566                    nonce: 26u64,
2567                    max_priority_fee_per_gas: 1500000000,
2568                    max_fee_per_gas: 1500000013,
2569                    gas_limit: MIN_TRANSACTION_GAS,
2570                    to: TxKind::Call(hex!("61815774383099e24810ab832a5b2a5425c154d5").into()),
2571                    value: U256::from(3000000000000000000u64),
2572                    input: Default::default(),
2573                    access_list: Default::default(),
2574                }),
2575                Signature::new(
2576                    U256::from_str(
2577                        "0x59e6b67f48fb32e7e570dfb11e042b5ad2e55e3ce3ce9cd989c7e06e07feeafd",
2578                    )
2579                    .unwrap(),
2580                    U256::from_str(
2581                        "0x016b83f4f980694ed2eee4d10667242b1f40dc406901b34125b008d334d47469",
2582                    )
2583                    .unwrap(),
2584                    true,
2585                ),
2586            ),
2587        ];
2588
2589        let txs_hashes: Vec<B256> = txs.iter().map(|tx| *tx.hash()).collect();
2590
2591        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2592        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2593        // fetch
2594        peer_1.seen_transactions.insert(txs_hashes[0]);
2595        peer_1.seen_transactions.insert(txs_hashes[1]);
2596        tx_manager.peers.insert(peer_id_1, peer_1);
2597
2598        buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[0], peer_id_1, 0, None);
2599        buffer_hash_to_tx_fetcher(tx_fetcher, txs_hashes[1], peer_id_1, 0, None);
2600
2601        // peer_1 is idle
2602        assert!(tx_fetcher.is_idle(&peer_id_1));
2603        assert_eq!(tx_fetcher.active_peers.len(), 0);
2604
2605        // sends requests for buffered hashes to peer_1
2606        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2607
2608        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2609        // as long as request is in flight peer_1 is not idle
2610        assert!(!tx_fetcher.is_idle(&peer_id_1));
2611        assert_eq!(tx_fetcher.active_peers.len(), 1);
2612
2613        // mock session of peer_1 receives request
2614        let req = to_mock_session_rx
2615            .recv()
2616            .await
2617            .expect("peer_1 session should receive request with buffered hashes");
2618        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2619
2620        let message: Vec<PooledTransactionVariant> = txs
2621            .into_iter()
2622            .take(1)
2623            .map(|tx| {
2624                PooledTransactionVariant::try_from(tx)
2625                    .expect("Failed to convert MockTransaction to PooledTransaction")
2626            })
2627            .collect();
2628        // response partial request
2629        response
2630            .send(Ok(PooledTransactions(message)))
2631            .expect("should send peer_1 response to tx manager");
2632        let Some(FetchEvent::TransactionsFetched { peer_id, .. }) = tx_fetcher.next().await else {
2633            unreachable!()
2634        };
2635
2636        // request has resolved, peer_1 is idle again
2637        assert!(tx_fetcher.is_idle(&peer_id));
2638        assert_eq!(tx_fetcher.active_peers.len(), 0);
2639        // failing peer_1's request buffers requested hashes for retry.
2640        assert_eq!(tx_fetcher.num_pending_hashes(), 1);
2641    }
2642
2643    #[tokio::test]
2644    async fn test_max_retries_tx_request() {
2645        reth_tracing::init_test_tracing();
2646
2647        let mut tx_manager = new_tx_manager().await.0;
2648        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2649
2650        let peer_id_1 = PeerId::new([1; 64]);
2651        let peer_id_2 = PeerId::new([2; 64]);
2652        let eth_version = EthVersion::Eth66;
2653        let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
2654
2655        let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
2656        // mark hashes as seen by peer so it can fish them out from the cache for hashes pending
2657        // fetch
2658        peer_1.seen_transactions.insert(seen_hashes[0]);
2659        peer_1.seen_transactions.insert(seen_hashes[1]);
2660        tx_manager.peers.insert(peer_id_1, peer_1);
2661
2662        // hashes are seen and currently not inflight, with one fallback peer, and are buffered
2663        // for first retry in reverse order to make index 0 lru
2664        let retries = 1;
2665        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[1], peer_id_1, retries, None);
2666        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[0], peer_id_1, retries, None);
2667
2668        // peer_1 is idle
2669        assert!(tx_fetcher.is_idle(&peer_id_1));
2670        assert_eq!(tx_fetcher.active_peers.len(), 0);
2671
2672        // sends request for buffered hashes to peer_1
2673        tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
2674
2675        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2676
2677        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2678        // as long as request is in inflight peer_1 is not idle
2679        assert!(!tx_fetcher.is_idle(&peer_id_1));
2680        assert_eq!(tx_fetcher.active_peers.len(), 1);
2681
2682        // mock session of peer_1 receives request
2683        let req = to_mock_session_rx
2684            .recv()
2685            .await
2686            .expect("peer_1 session should receive request with buffered hashes");
2687        let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
2688        let GetPooledTransactions(hashes) = request;
2689
2690        let hashes = hashes.into_iter().collect::<HashSet<_>>();
2691
2692        assert_eq!(hashes, seen_hashes.into_iter().collect::<HashSet<_>>());
2693
2694        // fail request to peer_1
2695        response
2696            .send(Err(RequestError::BadResponse))
2697            .expect("should send peer_1 response to tx manager");
2698        let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
2699            unreachable!()
2700        };
2701
2702        // request has resolved, peer_1 is idle again
2703        assert!(tx_fetcher.is_idle(&peer_id));
2704        assert_eq!(tx_fetcher.active_peers.len(), 0);
2705        // failing peer_1's request buffers requested hashes for retry
2706        assert_eq!(tx_fetcher.num_pending_hashes(), 2);
2707
2708        let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
2709        tx_manager.peers.insert(peer_id_2, peer_2);
2710
2711        // peer_2 announces same hashes as peer_1
2712        let msg =
2713            NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
2714        tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
2715
2716        let tx_fetcher = &mut tx_manager.transaction_fetcher;
2717
2718        // peer_2 should be in active_peers.
2719        assert_eq!(tx_fetcher.active_peers.len(), 1);
2720
2721        // since hashes are already seen, no changes to length of unknown hashes
2722        assert_eq!(tx_fetcher.num_all_hashes(), 2);
2723        // but hashes are taken out of buffer and packed into request to peer_2
2724        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2725
2726        // mock session of peer_2 receives request
2727        let req = to_mock_session_rx
2728            .recv()
2729            .await
2730            .expect("peer_2 session should receive request with buffered hashes");
2731        let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
2732
2733        // report failed request to tx manager
2734        response
2735            .send(Err(RequestError::BadResponse))
2736            .expect("should send peer_2 response to tx manager");
2737        let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
2738
2739        // `MAX_REQUEST_RETRIES_PER_TX_HASH`, 2, for hashes reached so this time won't be buffered
2740        // for retry
2741        assert_eq!(tx_fetcher.num_pending_hashes(), 0);
2742        assert_eq!(tx_fetcher.active_peers.len(), 0);
2743    }
2744
2745    #[test]
2746    fn test_transaction_builder_empty() {
2747        let mut builder =
2748            PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
2749        assert!(builder.is_empty());
2750
2751        let mut factory = MockTransactionFactory::default();
2752        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2753        builder.push(&tx);
2754        assert!(!builder.is_empty());
2755
2756        let txs = builder.build();
2757        assert!(txs.full.is_none());
2758        let txs = txs.pooled.unwrap();
2759        assert_eq!(txs.len(), 1);
2760    }
2761
2762    #[test]
2763    fn test_transaction_builder_large() {
2764        let mut builder =
2765            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2766        assert!(builder.is_empty());
2767
2768        let mut factory = MockTransactionFactory::default();
2769        let mut tx = factory.create_eip1559();
2770        // create a transaction that still fits
2771        tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
2772        let tx = Arc::new(tx);
2773        let tx = PropagateTransaction::pool_tx(tx);
2774        builder.push(&tx);
2775        assert!(!builder.is_empty());
2776
2777        let txs = builder.clone().build();
2778        assert!(txs.pooled.is_none());
2779        let txs = txs.full.unwrap();
2780        assert_eq!(txs.len(), 1);
2781
2782        builder.push(&tx);
2783
2784        let txs = builder.clone().build();
2785        let pooled = txs.pooled.unwrap();
2786        assert_eq!(pooled.len(), 1);
2787        let txs = txs.full.unwrap();
2788        assert_eq!(txs.len(), 1);
2789    }
2790
2791    #[test]
2792    fn test_transaction_builder_eip4844() {
2793        let mut builder =
2794            PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
2795        assert!(builder.is_empty());
2796
2797        let mut factory = MockTransactionFactory::default();
2798        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
2799        builder.push(&tx);
2800        assert!(!builder.is_empty());
2801
2802        let txs = builder.clone().build();
2803        assert!(txs.full.is_none());
2804        let txs = txs.pooled.unwrap();
2805        assert_eq!(txs.len(), 1);
2806
2807        let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
2808        builder.push(&tx);
2809
2810        let txs = builder.clone().build();
2811        let pooled = txs.pooled.unwrap();
2812        assert_eq!(pooled.len(), 1);
2813        let txs = txs.full.unwrap();
2814        assert_eq!(txs.len(), 1);
2815    }
2816
2817    #[tokio::test]
2818    async fn test_propagate_full() {
2819        reth_tracing::init_test_tracing();
2820
2821        let (mut tx_manager, network) = new_tx_manager().await;
2822        let peer_id = PeerId::random();
2823
2824        // ensure not syncing
2825        network.handle().update_sync_state(SyncState::Idle);
2826
2827        // mock a peer
2828        let (tx, _rx) = mpsc::channel::<PeerRequest>(1);
2829
2830        let session_info = SessionInfo {
2831            peer_id,
2832            remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
2833            client_version: Arc::from(""),
2834            capabilities: Arc::new(vec![].into()),
2835            status: Arc::new(Default::default()),
2836            version: EthVersion::Eth68,
2837            peer_kind: PeerKind::Basic,
2838        };
2839        let messages: PeerRequestSender<PeerRequest> = PeerRequestSender::new(peer_id, tx);
2840        tx_manager
2841            .on_network_event(NetworkEvent::ActivePeerSession { info: session_info, messages });
2842        let mut propagate = vec![];
2843        let mut factory = MockTransactionFactory::default();
2844        let eip1559_tx = Arc::new(factory.create_eip1559());
2845        propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
2846        let eip4844_tx = Arc::new(factory.create_eip4844());
2847        propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
2848
2849        let propagated =
2850            tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);
2851        assert_eq!(propagated.0.len(), 2);
2852        let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
2853        assert_eq!(prop_txs.len(), 1);
2854        assert!(prop_txs[0].is_full());
2855
2856        let prop_txs = propagated.0.get(eip4844_tx.transaction.hash()).unwrap();
2857        assert_eq!(prop_txs.len(), 1);
2858        assert!(prop_txs[0].is_hash());
2859
2860        let peer = tx_manager.peers.get(&peer_id).unwrap();
2861        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2862        assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
2863        peer.seen_transactions.contains(eip4844_tx.transaction.hash());
2864
2865        // propagate again
2866        let propagated = tx_manager.propagate_transactions(propagate, PropagationMode::Basic);
2867        assert!(propagated.0.is_empty());
2868    }
2869
2870    #[tokio::test]
2871    async fn test_relaxed_filter_ignores_unknown_tx_types() {
2872        reth_tracing::init_test_tracing();
2873
2874        let transactions_manager_config = TransactionsManagerConfig::default();
2875
2876        let propagation_policy = TransactionPropagationKind::default();
2877        let announcement_policy = RelaxedEthAnnouncementFilter::default();
2878
2879        let policy_bundle = NetworkPolicies::new(propagation_policy, announcement_policy);
2880
2881        let pool = testing_pool();
2882        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
2883        let client = NoopProvider::default();
2884
2885        let network_config = NetworkConfigBuilder::new(secret_key)
2886            .listener_port(0)
2887            .disable_discovery()
2888            .build(client.clone());
2889
2890        let mut network_manager = NetworkManager::new(network_config).await.unwrap();
2891        let (to_tx_manager_tx, from_network_rx) =
2892            mpsc::unbounded_channel::<NetworkTransactionEvent<EthNetworkPrimitives>>();
2893        network_manager.set_transactions(to_tx_manager_tx);
2894        let network_handle = network_manager.handle().clone();
2895        let network_service_handle = tokio::spawn(network_manager);
2896
2897        let mut tx_manager = TransactionsManager::<
2898            TestPool,
2899            EthNetworkPrimitives,
2900            NetworkPolicies<TransactionPropagationKind, RelaxedEthAnnouncementFilter>,
2901        >::with_policy(
2902            network_handle.clone(),
2903            pool.clone(),
2904            from_network_rx,
2905            transactions_manager_config,
2906            policy_bundle,
2907        );
2908
2909        let peer_id = PeerId::random();
2910        let eth_version = EthVersion::Eth68;
2911        let (mock_peer_metadata, mut mock_session_rx) = new_mock_session(peer_id, eth_version);
2912        tx_manager.peers.insert(peer_id, mock_peer_metadata);
2913
2914        let mut tx_factory = MockTransactionFactory::default();
2915
2916        let valid_known_tx = tx_factory.create_eip1559();
2917        let known_tx_signed: Arc<ValidPoolTransaction<MockTransaction>> = Arc::new(valid_known_tx);
2918
2919        let known_tx_hash = *known_tx_signed.hash();
2920        let known_tx_type_byte = known_tx_signed.transaction.tx_type();
2921        let known_tx_size = known_tx_signed.encoded_length();
2922
2923        let unknown_tx_hash = B256::random();
2924        let unknown_tx_type_byte = 0xff_u8;
2925        let unknown_tx_size = 150;
2926
2927        let announcement_msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
2928            types: vec![known_tx_type_byte, unknown_tx_type_byte],
2929            sizes: vec![known_tx_size, unknown_tx_size],
2930            hashes: vec![known_tx_hash, unknown_tx_hash],
2931        });
2932
2933        tx_manager.on_new_pooled_transaction_hashes(peer_id, announcement_msg);
2934
2935        poll_fn(|cx| {
2936            let _ = tx_manager.poll_unpin(cx);
2937            Poll::Ready(())
2938        })
2939        .await;
2940
2941        let mut requested_hashes_in_getpooled = HashSet::new();
2942        let mut unexpected_request_received = false;
2943
2944        match tokio::time::timeout(std::time::Duration::from_millis(200), mock_session_rx.recv())
2945            .await
2946        {
2947            Ok(Some(PeerRequest::GetPooledTransactions { request, response: tx_response_ch })) => {
2948                let GetPooledTransactions(hashes) = request;
2949                for hash in hashes {
2950                    requested_hashes_in_getpooled.insert(hash);
2951                }
2952                let _ = tx_response_ch.send(Ok(PooledTransactions(vec![])));
2953            }
2954            Ok(Some(other_request)) => {
2955                tracing::error!(?other_request, "Received unexpected PeerRequest type");
2956                unexpected_request_received = true;
2957            }
2958            Ok(None) => tracing::info!("Mock session channel closed or no request received."),
2959            Err(_timeout_err) => {
2960                tracing::info!("Timeout: No GetPooledTransactions request received.")
2961            }
2962        }
2963
2964        assert!(
2965            requested_hashes_in_getpooled.contains(&known_tx_hash),
2966            "Should have requested the known EIP-1559 transaction. Requested: {requested_hashes_in_getpooled:?}"
2967        );
2968        assert!(
2969            !requested_hashes_in_getpooled.contains(&unknown_tx_hash),
2970            "Should NOT have requested the unknown transaction type. Requested: {requested_hashes_in_getpooled:?}"
2971        );
2972        assert!(
2973            !unexpected_request_received,
2974            "An unexpected P2P request was received by the mock peer."
2975        );
2976
2977        network_service_handle.abort();
2978    }
2979}