reth_network/transactions/
fetcher.rs

1//! `TransactionFetcher` is responsible for rate limiting and retry logic for fetching
2//! transactions. Upon receiving an announcement, functionality of the `TransactionFetcher` is
3//! used for filtering out hashes 1) for which the tx is already known and 2) unknown but the hash
4//! is already seen in a previous announcement. The hashes that remain from an announcement are
5//! then packed into a request with respect to the [`EthVersion`] of the announcement. Any hashes
6//! that don't fit into the request, are buffered in the `TransactionFetcher`. If on the other
7//! hand, space remains, hashes that the peer has previously announced are taken out of buffered
8//! hashes to fill the request up. The [`GetPooledTransactions`] request is then sent to the
9//! peer's session, this marks the peer as active with respect to
10//! `MAX_CONCURRENT_TX_REQUESTS_PER_PEER`.
11//!
12//! When a peer buffers hashes in the `TransactionsManager::on_new_pooled_transaction_hashes`
13//! pipeline, it is stored as fallback peer for those hashes. When [`TransactionsManager`] is
14//! polled, it checks if any of fallback peer is idle. If so, it packs a request for that peer,
15//! filling it from the buffered hashes. It does so until there are no more idle peers or until
16//! the hashes buffer is empty.
17//!
18//! If a [`GetPooledTransactions`] request resolves with an error, the hashes in the request are
19//! buffered with respect to `MAX_REQUEST_RETRIES_PER_TX_HASH`. So is the case if the request
20//! resolves with partial success, that is some of the requested hashes are not in the response,
21//! these are then buffered.
22//!
23//! Most healthy peers will send the same hashes in their announcements, as RLPx is a gossip
24//! protocol. This means it's unlikely, that a valid hash, will be buffered for very long
25//! before it's re-tried. Nonetheless, the capacity of the buffered hashes cache must be large
26//! enough to buffer many hashes during network failure, to allow for recovery.
27
28use super::{
29    config::TransactionFetcherConfig,
30    constants::{tx_fetcher::*, SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST},
31    MessageFilter, PeerMetadata, PooledTransactions,
32    SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
33};
34use crate::{
35    cache::{LruCache, LruMap},
36    duration_metered_exec,
37    metrics::TransactionFetcherMetrics,
38    transactions::{validation, PartiallyFilterMessage},
39};
40use alloy_consensus::transaction::PooledTransaction;
41use alloy_primitives::TxHash;
42use derive_more::{Constructor, Deref};
43use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
44use pin_project::pin_project;
45use reth_eth_wire::{
46    DedupPayload, EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
47    PartiallyValidData, RequestTxHashes, ValidAnnouncementData,
48};
49use reth_eth_wire_types::{EthNetworkPrimitives, NetworkPrimitives};
50use reth_network_api::PeerRequest;
51use reth_network_p2p::error::{RequestError, RequestResult};
52use reth_network_peers::PeerId;
53use reth_primitives_traits::SignedTransaction;
54use schnellru::ByLength;
55use std::{
56    collections::HashMap,
57    pin::Pin,
58    task::{ready, Context, Poll},
59    time::Duration,
60};
61use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
62use tracing::trace;
63use validation::FilterOutcome;
64
65/// The type responsible for fetching missing transactions from peers.
66///
67/// This will keep track of unique transaction hashes that are currently being fetched and submits
68/// new requests on announced hashes.
69#[derive(Debug)]
70#[pin_project]
71pub struct TransactionFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
72    /// All peers with to which a [`GetPooledTransactions`] request is inflight.
73    pub active_peers: LruMap<PeerId, u8, ByLength>,
74    /// All currently active [`GetPooledTransactions`] requests.
75    ///
76    /// The set of hashes encompassed by these requests are a subset of all hashes in the fetcher.
77    /// It's disjoint from the set of hashes which are awaiting an idle fallback peer in order to
78    /// be fetched.
79    #[pin]
80    pub inflight_requests: FuturesUnordered<GetPooledTxRequestFut<N::PooledTransaction>>,
81    /// Hashes that are awaiting an idle fallback peer so they can be fetched.
82    ///
83    /// This is a subset of all hashes in the fetcher, and is disjoint from the set of hashes for
84    /// which a [`GetPooledTransactions`] request is inflight.
85    pub hashes_pending_fetch: LruCache<TxHash>,
86    /// Tracks all hashes in the transaction fetcher.
87    pub hashes_fetch_inflight_and_pending_fetch: LruMap<TxHash, TxFetchMetadata, ByLength>,
88    /// Filter for valid announcement and response data.
89    pub(super) filter_valid_message: MessageFilter,
90    /// Info on capacity of the transaction fetcher.
91    pub info: TransactionFetcherInfo,
92    #[doc(hidden)]
93    metrics: TransactionFetcherMetrics,
94}
95
96impl<N: NetworkPrimitives> TransactionFetcher<N> {
97    /// Removes the peer from the active set.
98    pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) {
99        self.active_peers.remove(peer_id);
100    }
101
102    /// Updates metrics.
103    #[inline]
104    pub fn update_metrics(&self) {
105        let metrics = &self.metrics;
106
107        metrics.inflight_transaction_requests.set(self.inflight_requests.len() as f64);
108
109        let hashes_pending_fetch = self.hashes_pending_fetch.len() as f64;
110        let total_hashes = self.hashes_fetch_inflight_and_pending_fetch.len() as f64;
111
112        metrics.hashes_pending_fetch.set(hashes_pending_fetch);
113        metrics.hashes_inflight_transaction_requests.set(total_hashes - hashes_pending_fetch);
114    }
115
116    #[inline]
117    fn update_pending_fetch_cache_search_metrics(&self, durations: TxFetcherSearchDurations) {
118        let metrics = &self.metrics;
119
120        let TxFetcherSearchDurations { find_idle_peer, fill_request } = durations;
121        metrics
122            .duration_find_idle_fallback_peer_for_any_pending_hash
123            .set(find_idle_peer.as_secs_f64());
124        metrics.duration_fill_request_from_hashes_pending_fetch.set(fill_request.as_secs_f64());
125    }
126
127    /// Sets up transaction fetcher with config
128    pub fn with_transaction_fetcher_config(config: &TransactionFetcherConfig) -> Self {
129        let TransactionFetcherConfig {
130            max_inflight_requests,
131            max_capacity_cache_txns_pending_fetch,
132            ..
133        } = *config;
134
135        let info = config.clone().into();
136
137        let metrics = TransactionFetcherMetrics::default();
138        metrics.capacity_inflight_requests.increment(max_inflight_requests as u64);
139
140        Self {
141            active_peers: LruMap::new(max_inflight_requests),
142            hashes_pending_fetch: LruCache::new(max_capacity_cache_txns_pending_fetch),
143            hashes_fetch_inflight_and_pending_fetch: LruMap::new(
144                max_inflight_requests + max_capacity_cache_txns_pending_fetch,
145            ),
146            info,
147            metrics,
148            ..Default::default()
149        }
150    }
151
152    /// Removes the specified hashes from inflight tracking.
153    #[inline]
154    pub fn remove_hashes_from_transaction_fetcher<I>(&mut self, hashes: I)
155    where
156        I: IntoIterator<Item = TxHash>,
157    {
158        for hash in hashes {
159            self.hashes_fetch_inflight_and_pending_fetch.remove(&hash);
160            self.hashes_pending_fetch.remove(&hash);
161        }
162    }
163
164    /// Updates peer's activity status upon a resolved [`GetPooledTxRequest`].
165    fn decrement_inflight_request_count_for(&mut self, peer_id: &PeerId) {
166        let remove = || -> bool {
167            if let Some(inflight_count) = self.active_peers.get(peer_id) {
168                *inflight_count = inflight_count.saturating_sub(1);
169                if *inflight_count == 0 {
170                    return true
171                }
172            }
173            false
174        }();
175
176        if remove {
177            self.active_peers.remove(peer_id);
178        }
179    }
180
181    /// Returns `true` if peer is idle with respect to `self.inflight_requests`.
182    #[inline]
183    pub fn is_idle(&self, peer_id: &PeerId) -> bool {
184        let Some(inflight_count) = self.active_peers.peek(peer_id) else { return true };
185        if *inflight_count < self.info.max_inflight_requests_per_peer {
186            return true
187        }
188        false
189    }
190
191    /// Returns any idle peer for the given hash.
192    pub fn get_idle_peer_for(&self, hash: TxHash) -> Option<&PeerId> {
193        let TxFetchMetadata { fallback_peers, .. } =
194            self.hashes_fetch_inflight_and_pending_fetch.peek(&hash)?;
195
196        for peer_id in fallback_peers.iter() {
197            if self.is_idle(peer_id) {
198                return Some(peer_id)
199            }
200        }
201
202        None
203    }
204
205    /// Returns any idle peer for any hash pending fetch. If one is found, the corresponding
206    /// hash is written to the request buffer that is passed as parameter.
207    ///
208    /// Loops through the hashes pending fetch in lru order until one is found with an idle
209    /// fallback peer, or the budget passed as parameter is depleted, whatever happens first.
210    pub fn find_any_idle_fallback_peer_for_any_pending_hash(
211        &mut self,
212        hashes_to_request: &mut RequestTxHashes,
213        mut budget: Option<usize>, // search fallback peers for max `budget` lru pending hashes
214    ) -> Option<PeerId> {
215        let mut hashes_pending_fetch_iter = self.hashes_pending_fetch.iter();
216
217        let idle_peer = loop {
218            let &hash = hashes_pending_fetch_iter.next()?;
219
220            let idle_peer = self.get_idle_peer_for(hash);
221
222            if idle_peer.is_some() {
223                hashes_to_request.insert(hash);
224                break idle_peer.copied()
225            }
226
227            if let Some(ref mut bud) = budget {
228                *bud = bud.saturating_sub(1);
229                if *bud == 0 {
230                    return None
231                }
232            }
233        };
234        let hash = hashes_to_request.iter().next()?;
235
236        // pop hash that is loaded in request buffer from cache of hashes pending fetch
237        drop(hashes_pending_fetch_iter);
238        _ = self.hashes_pending_fetch.remove(hash);
239
240        idle_peer
241    }
242
243    /// Packages hashes for a [`GetPooledTxRequest`] up to limit. Returns left over hashes. Takes
244    /// a [`RequestTxHashes`] buffer as parameter for filling with hashes to request.
245    ///
246    /// Returns left over hashes.
247    pub fn pack_request(
248        &self,
249        hashes_to_request: &mut RequestTxHashes,
250        hashes_from_announcement: ValidAnnouncementData,
251    ) -> RequestTxHashes {
252        if hashes_from_announcement.msg_version().is_eth68() {
253            return self.pack_request_eth68(hashes_to_request, hashes_from_announcement)
254        }
255        self.pack_request_eth66(hashes_to_request, hashes_from_announcement)
256    }
257
258    /// Packages hashes for a [`GetPooledTxRequest`] from an
259    /// [`Eth68`](reth_eth_wire::EthVersion::Eth68) announcement up to limit as defined by protocol
260    /// version 68. Takes a [`RequestTxHashes`] buffer as parameter for filling with hashes to
261    /// request.
262    ///
263    /// Returns left over hashes.
264    ///
265    /// Loops through hashes passed as parameter and checks if a hash fits in the expected
266    /// response. If no, it's added to surplus hashes. If yes, it's added to hashes to the request
267    /// and expected response size is accumulated.
268    pub fn pack_request_eth68(
269        &self,
270        hashes_to_request: &mut RequestTxHashes,
271        hashes_from_announcement: impl HandleMempoolData
272            + IntoIterator<Item = (TxHash, Option<(u8, usize)>)>,
273    ) -> RequestTxHashes {
274        let mut acc_size_response = 0;
275
276        let mut hashes_from_announcement_iter = hashes_from_announcement.into_iter();
277
278        if let Some((hash, Some((_ty, size)))) = hashes_from_announcement_iter.next() {
279            hashes_to_request.insert(hash);
280
281            // tx is really big, pack request with single tx
282            if size >= self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request {
283                return hashes_from_announcement_iter.collect()
284            }
285            acc_size_response = size;
286        }
287
288        let mut surplus_hashes = RequestTxHashes::default();
289
290        // folds size based on expected response size  and adds selected hashes to the request
291        // list and the other hashes to the surplus list
292        loop {
293            let Some((hash, metadata)) = hashes_from_announcement_iter.next() else { break };
294
295            let Some((_ty, size)) = metadata else {
296                unreachable!("this method is called upon reception of an eth68 announcement")
297            };
298
299            let next_acc_size = acc_size_response + size;
300
301            if next_acc_size <=
302                self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request
303            {
304                // only update accumulated size of tx response if tx will fit in without exceeding
305                // soft limit
306                acc_size_response = next_acc_size;
307                _ = hashes_to_request.insert(hash)
308            } else {
309                _ = surplus_hashes.insert(hash)
310            }
311
312            let free_space =
313                self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request -
314                    acc_size_response;
315
316            if free_space < MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED {
317                break
318            }
319        }
320
321        surplus_hashes.extend(hashes_from_announcement_iter.map(|(hash, _metadata)| hash));
322
323        surplus_hashes
324    }
325
326    /// Packages hashes for a [`GetPooledTxRequest`] from an
327    /// [`Eth66`](reth_eth_wire::EthVersion::Eth66) announcement up to limit as defined by
328    /// protocol version 66. Takes a [`RequestTxHashes`] buffer as parameter for filling with
329    /// hashes to request.
330    ///
331    /// Returns left over hashes.
332    pub fn pack_request_eth66(
333        &self,
334        hashes_to_request: &mut RequestTxHashes,
335        hashes_from_announcement: ValidAnnouncementData,
336    ) -> RequestTxHashes {
337        let (mut hashes, _version) = hashes_from_announcement.into_request_hashes();
338        if hashes.len() <= SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST {
339            *hashes_to_request = hashes;
340            hashes_to_request.shrink_to_fit();
341
342            RequestTxHashes::default()
343        } else {
344            let surplus_hashes =
345                hashes.retain_count(SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST);
346            *hashes_to_request = hashes;
347            hashes_to_request.shrink_to_fit();
348
349            surplus_hashes
350        }
351    }
352
353    /// Tries to buffer hashes for retry.
354    pub fn try_buffer_hashes_for_retry(
355        &mut self,
356        mut hashes: RequestTxHashes,
357        peer_failed_to_serve: &PeerId,
358    ) {
359        // It could be that the txns have been received over broadcast in the time being. Remove
360        // the peer as fallback peer so it isn't request again for these hashes.
361        hashes.retain(|hash| {
362            if let Some(entry) = self.hashes_fetch_inflight_and_pending_fetch.get(hash) {
363                entry.fallback_peers_mut().remove(peer_failed_to_serve);
364                return true
365            }
366            // tx has been seen over broadcast in the time it took for the request to resolve
367            false
368        });
369
370        self.buffer_hashes(hashes, None)
371    }
372
373    /// Number of hashes pending fetch.
374    pub fn num_pending_hashes(&self) -> usize {
375        self.hashes_pending_fetch.len()
376    }
377
378    /// Number of all transaction hashes in the fetcher.
379    pub fn num_all_hashes(&self) -> usize {
380        self.hashes_fetch_inflight_and_pending_fetch.len()
381    }
382
383    /// Buffers hashes. Note: Only peers that haven't yet tried to request the hashes should be
384    /// passed as `fallback_peer` parameter! For re-buffering hashes on failed request, use
385    /// [`TransactionFetcher::try_buffer_hashes_for_retry`]. Hashes that have been re-requested
386    /// [`DEFAULT_MAX_RETRIES`], are dropped.
387    pub fn buffer_hashes(&mut self, hashes: RequestTxHashes, fallback_peer: Option<PeerId>) {
388        for hash in hashes {
389            // hash could have been evicted from bounded lru map
390            if self.hashes_fetch_inflight_and_pending_fetch.peek(&hash).is_none() {
391                continue
392            }
393
394            let Some(TxFetchMetadata { retries, fallback_peers, .. }) =
395                self.hashes_fetch_inflight_and_pending_fetch.get(&hash)
396            else {
397                return
398            };
399
400            if let Some(peer_id) = fallback_peer {
401                // peer has not yet requested hash
402                fallback_peers.insert(peer_id);
403            } else {
404                if *retries >= DEFAULT_MAX_RETRIES {
405                    trace!(target: "net::tx",
406                        %hash,
407                        retries,
408                        "retry limit for `GetPooledTransactions` requests reached for hash, dropping hash"
409                    );
410
411                    self.hashes_fetch_inflight_and_pending_fetch.remove(&hash);
412                    self.hashes_pending_fetch.remove(&hash);
413                    continue
414                }
415                *retries += 1;
416            }
417
418            if let (_, Some(evicted_hash)) = self.hashes_pending_fetch.insert_and_get_evicted(hash)
419            {
420                self.hashes_fetch_inflight_and_pending_fetch.remove(&evicted_hash);
421                self.hashes_pending_fetch.remove(&evicted_hash);
422            }
423        }
424    }
425
426    /// Tries to request hashes pending fetch.
427    ///
428    /// Finds the first buffered hash with a fallback peer that is idle, if any. Fills the rest of
429    /// the request by checking the transactions seen by the peer against the buffer.
430    pub fn on_fetch_pending_hashes(
431        &mut self,
432        peers: &HashMap<PeerId, PeerMetadata<N>>,
433        has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
434    ) {
435        let mut hashes_to_request = RequestTxHashes::with_capacity(
436            DEFAULT_MARGINAL_COUNT_HASHES_GET_POOLED_TRANSACTIONS_REQUEST,
437        );
438        let mut search_durations = TxFetcherSearchDurations::default();
439
440        // budget to look for an idle peer before giving up
441        let budget_find_idle_fallback_peer = self
442            .search_breadth_budget_find_idle_fallback_peer(&has_capacity_wrt_pending_pool_imports);
443
444        let peer_id = duration_metered_exec!(
445            {
446                let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash(
447                    &mut hashes_to_request,
448                    budget_find_idle_fallback_peer,
449                ) else {
450                    // no peers are idle or budget is depleted
451                    return
452                };
453
454                peer_id
455            },
456            search_durations.find_idle_peer
457        );
458
459        // peer should always exist since `is_session_active` already checked
460        let Some(peer) = peers.get(&peer_id) else { return };
461        let conn_eth_version = peer.version;
462
463        // fill the request with more hashes pending fetch that have been announced by the peer.
464        // the search for more hashes is done with respect to the given budget, which determines
465        // how many hashes to loop through before giving up. if no more hashes are found wrt to
466        // the budget, the single hash that was taken out of the cache above is sent in a request.
467        let budget_fill_request = self
468            .search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
469                &has_capacity_wrt_pending_pool_imports,
470            );
471
472        duration_metered_exec!(
473            {
474                self.fill_request_from_hashes_pending_fetch(
475                    &mut hashes_to_request,
476                    &peer.seen_transactions,
477                    budget_fill_request,
478                )
479            },
480            search_durations.fill_request
481        );
482
483        self.update_pending_fetch_cache_search_metrics(search_durations);
484
485        trace!(target: "net::tx",
486            peer_id=format!("{peer_id:#}"),
487            hashes=?*hashes_to_request,
488            %conn_eth_version,
489            "requesting hashes that were stored pending fetch from peer"
490        );
491
492        // request the buffered missing transactions
493        if let Some(failed_to_request_hashes) =
494            self.request_transactions_from_peer(hashes_to_request, peer)
495        {
496            trace!(target: "net::tx",
497                peer_id=format!("{peer_id:#}"),
498                ?failed_to_request_hashes,
499                %conn_eth_version,
500                "failed sending request to peer's session, buffering hashes"
501            );
502
503            self.buffer_hashes(failed_to_request_hashes, Some(peer_id));
504        }
505    }
506
507    /// Filters out hashes that have been seen before. For hashes that have already been seen, the
508    /// peer is added as fallback peer.
509    pub fn filter_unseen_and_pending_hashes(
510        &mut self,
511        new_announced_hashes: &mut ValidAnnouncementData,
512        is_tx_bad_import: impl Fn(&TxHash) -> bool,
513        peer_id: &PeerId,
514        client_version: &str,
515    ) {
516        let mut previously_unseen_hashes_count = 0;
517
518        let msg_version = new_announced_hashes.msg_version();
519
520        // filter out inflight hashes, and register the peer as fallback for all inflight hashes
521        new_announced_hashes.retain(|hash, metadata| {
522
523            // occupied entry
524            if let Some(TxFetchMetadata{ tx_encoded_length: previously_seen_size, ..}) = self.hashes_fetch_inflight_and_pending_fetch.peek_mut(hash) {
525                // update size metadata if available
526                if let Some((_ty, size)) = metadata {
527                    if let Some(prev_size) = previously_seen_size {
528                        // check if this peer is announcing a different size than a previous peer
529                        if size != prev_size {
530                            trace!(target: "net::tx",
531                                peer_id=format!("{peer_id:#}"),
532                                %hash,
533                                size,
534                                previously_seen_size,
535                                %client_version,
536                                "peer announced a different size for tx, this is especially worrying if one size is much bigger..."
537                            );
538                        }
539                    }
540                    // believe the most recent peer to announce tx
541                    *previously_seen_size = Some(*size);
542                }
543
544                // hash has been seen but is not inflight
545                if self.hashes_pending_fetch.remove(hash) {
546                    return true
547                }
548
549                return false
550            }
551
552            // vacant entry
553
554            if is_tx_bad_import(hash) {
555                return false
556            }
557
558            previously_unseen_hashes_count += 1;
559
560            if self.hashes_fetch_inflight_and_pending_fetch.get_or_insert(*hash, ||
561                TxFetchMetadata{retries: 0, fallback_peers: LruCache::new(DEFAULT_MAX_COUNT_FALLBACK_PEERS as u32), tx_encoded_length: None}
562            ).is_none() {
563
564                trace!(target: "net::tx",
565                    peer_id=format!("{peer_id:#}"),
566                    %hash,
567                    ?msg_version,
568                    %client_version,
569                    "failed to cache new announced hash from peer in schnellru::LruMap, dropping hash"
570                );
571
572                return false
573            }
574            true
575        });
576
577        trace!(target: "net::tx",
578            peer_id=format!("{peer_id:#}"),
579            previously_unseen_hashes_count=previously_unseen_hashes_count,
580            msg_version=?msg_version,
581            client_version=%client_version,
582            "received previously unseen hashes in announcement from peer"
583        );
584    }
585
586    /// Requests the missing transactions from the previously unseen announced hashes of the peer.
587    /// Returns the requested hashes if the request concurrency limit is reached or if the request
588    /// fails to send over the channel to the peer's session task.
589    ///
590    /// This filters all announced hashes that are already in flight, and requests the missing,
591    /// while marking the given peer as an alternative peer for the hashes that are already in
592    /// flight.
593    pub fn request_transactions_from_peer(
594        &mut self,
595        new_announced_hashes: RequestTxHashes,
596        peer: &PeerMetadata<N>,
597    ) -> Option<RequestTxHashes> {
598        let peer_id: PeerId = peer.request_tx.peer_id;
599        let conn_eth_version = peer.version;
600
601        if self.active_peers.len() >= self.info.max_inflight_requests {
602            trace!(target: "net::tx",
603                peer_id=format!("{peer_id:#}"),
604                hashes=?*new_announced_hashes,
605                %conn_eth_version,
606                max_inflight_transaction_requests=self.info.max_inflight_requests,
607                "limit for concurrent `GetPooledTransactions` requests reached, dropping request for hashes to peer"
608            );
609            return Some(new_announced_hashes)
610        }
611
612        let Some(inflight_count) = self.active_peers.get_or_insert(peer_id, || 0) else {
613            trace!(target: "net::tx",
614                peer_id=format!("{peer_id:#}"),
615                hashes=?*new_announced_hashes,
616                conn_eth_version=%conn_eth_version,
617                "failed to cache active peer in schnellru::LruMap, dropping request to peer"
618            );
619            return Some(new_announced_hashes)
620        };
621
622        if *inflight_count >= self.info.max_inflight_requests_per_peer {
623            trace!(target: "net::tx",
624                peer_id=format!("{peer_id:#}"),
625                hashes=?*new_announced_hashes,
626                %conn_eth_version,
627                max_concurrent_tx_reqs_per_peer=self.info.max_inflight_requests_per_peer,
628                "limit for concurrent `GetPooledTransactions` requests per peer reached"
629            );
630            return Some(new_announced_hashes)
631        }
632
633        #[cfg(debug_assertions)]
634        {
635            for hash in &new_announced_hashes {
636                if self.hashes_pending_fetch.contains(hash) {
637                    tracing::debug!(target: "net::tx", "`{}` should have been taken out of buffer before packing in a request, breaks invariant `@hashes_pending_fetch` and `@inflight_requests`, `@hashes_fetch_inflight_and_pending_fetch` for `{}`: {:?}",
638                        format!("{:?}", new_announced_hashes), // Assuming new_announced_hashes can be debug-printed directly
639                        format!("{:?}", new_announced_hashes),
640                        new_announced_hashes.iter().map(|hash| {
641                            let metadata = self.hashes_fetch_inflight_and_pending_fetch.get(hash);
642                            // Assuming you only need `retries` and `tx_encoded_length` for debugging
643                            (*hash, metadata.map(|m| (m.retries, m.tx_encoded_length)))
644                        }).collect::<Vec<(TxHash, Option<(u8, Option<usize>)>)>>())
645                }
646            }
647        }
648
649        let (response, rx) = oneshot::channel();
650        let req = PeerRequest::GetPooledTransactions {
651            request: GetPooledTransactions(new_announced_hashes.iter().copied().collect()),
652            response,
653        };
654
655        // try to send the request to the peer
656        if let Err(err) = peer.request_tx.try_send(req) {
657            // peer channel is full
658            return match err {
659                TrySendError::Full(_) | TrySendError::Closed(_) => {
660                    self.metrics.egress_peer_channel_full.increment(1);
661                    Some(new_announced_hashes)
662                }
663            }
664        }
665
666        *inflight_count += 1;
667        // stores a new request future for the request
668        self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, new_announced_hashes, rx));
669
670        None
671    }
672
673    /// Tries to fill request with hashes pending fetch so that the expected [`PooledTransactions`]
674    /// response is full enough. A mutable reference to a list of hashes to request is passed as
675    /// parameter. A budget is passed as parameter, this ensures that the node stops searching
676    /// for more hashes after the budget is depleted. Under bad network conditions, the cache of
677    /// hashes pending fetch may become very full for a while. As the node recovers, the hashes
678    /// pending fetch cache should get smaller. The budget should aim to be big enough to loop
679    /// through all buffered hashes in good network conditions.
680    ///
681    /// The request hashes buffer is filled as if it's an eth68 request, i.e. smartly assemble
682    /// the request based on expected response size. For any hash missing size metadata, it is
683    /// guessed at [`AVERAGE_BYTE_SIZE_TX_ENCODED`].
684    ///
685    /// Loops through hashes pending fetch and does:
686    ///
687    /// 1. Check if a hash pending fetch is seen by peer.
688    /// 2. Optimistically include the hash in the request.
689    /// 3. Accumulate expected total response size.
690    /// 4. Check if acc size and hashes count is at limit, if so stop looping.
691    /// 5. Remove hashes to request from cache of hashes pending fetch.
692    pub fn fill_request_from_hashes_pending_fetch(
693        &mut self,
694        hashes_to_request: &mut RequestTxHashes,
695        seen_hashes: &LruCache<TxHash>,
696        mut budget_fill_request: Option<usize>, // check max `budget` lru pending hashes
697    ) {
698        let Some(hash) = hashes_to_request.iter().next() else { return };
699
700        let mut acc_size_response = self
701            .hashes_fetch_inflight_and_pending_fetch
702            .get(hash)
703            .and_then(|entry| entry.tx_encoded_len())
704            .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
705
706        // if request full enough already, we're satisfied, send request for single tx
707        if acc_size_response >=
708            DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES
709        {
710            return
711        }
712
713        // try to fill request by checking if any other hashes pending fetch (in lru order) are
714        // also seen by peer
715        for hash in self.hashes_pending_fetch.iter() {
716            // 1. Check if a hash pending fetch is seen by peer.
717            if !seen_hashes.contains(hash) {
718                continue
719            };
720
721            // 2. Optimistically include the hash in the request.
722            hashes_to_request.insert(*hash);
723
724            // 3. Accumulate expected total response size.
725            let size = self
726                .hashes_fetch_inflight_and_pending_fetch
727                .get(hash)
728                .and_then(|entry| entry.tx_encoded_len())
729                .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
730
731            acc_size_response += size;
732
733            // 4. Check if acc size or hashes count is at limit, if so stop looping.
734            // if expected response is full enough or the number of hashes in the request is
735            // enough, we're satisfied
736            if acc_size_response >=
737                DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES ||
738                hashes_to_request.len() >
739                    DEFAULT_SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST_ON_FETCH_PENDING_HASHES
740            {
741                break
742            }
743
744            if let Some(ref mut bud) = budget_fill_request {
745                *bud -= 1;
746                if *bud == 0 {
747                    break
748                }
749            }
750        }
751
752        // 5. Remove hashes to request from cache of hashes pending fetch.
753        for hash in hashes_to_request.iter() {
754            self.hashes_pending_fetch.remove(hash);
755        }
756    }
757
758    /// Returns `true` if [`TransactionFetcher`] has capacity to request pending hashes. Returns
759    /// `false` if [`TransactionFetcher`] is operating close to full capacity.
760    pub fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
761        let info = &self.info;
762
763        self.has_capacity(info.max_inflight_requests)
764    }
765
766    /// Returns `true` if the number of inflight requests are under a given tolerated max.
767    fn has_capacity(&self, max_inflight_requests: usize) -> bool {
768        self.inflight_requests.len() <= max_inflight_requests
769    }
770
771    /// Returns the limit to enforce when looking for any pending hash with an idle fallback peer.
772    ///
773    /// Returns `Some(limit)` if [`TransactionFetcher`] and the
774    /// [`TransactionPool`](reth_transaction_pool::TransactionPool) are operating close to full
775    /// capacity. Returns `None`, unlimited, if they are not that busy.
776    pub fn search_breadth_budget_find_idle_fallback_peer(
777        &self,
778        has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
779    ) -> Option<usize> {
780        let info = &self.info;
781
782        let tx_fetcher_has_capacity = self.has_capacity(
783            info.max_inflight_requests /
784                DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_IDLE_PEER,
785        );
786        let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
787            DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_IDLE_PEER,
788        );
789
790        if tx_fetcher_has_capacity && tx_pool_has_capacity {
791            // unlimited search breadth
792            None
793        } else {
794            // limited breadth of search for idle peer
795            let limit = DEFAULT_BUDGET_FIND_IDLE_FALLBACK_PEER;
796
797            trace!(target: "net::tx",
798                inflight_requests=self.inflight_requests.len(),
799                max_inflight_transaction_requests=info.max_inflight_requests,
800                hashes_pending_fetch=self.hashes_pending_fetch.len(),
801                limit,
802                "search breadth limited in search for idle fallback peer for some hash pending fetch"
803            );
804
805            Some(limit)
806        }
807    }
808
809    /// Returns the limit to enforce when looking for the intersection between hashes announced by
810    /// peer and hashes pending fetch.
811    ///
812    /// Returns `Some(limit)` if [`TransactionFetcher`] and the
813    /// [`TransactionPool`](reth_transaction_pool::TransactionPool) are operating close to full
814    /// capacity. Returns `None`, unlimited, if they are not that busy.
815    pub fn search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
816        &self,
817        has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
818    ) -> Option<usize> {
819        let info = &self.info;
820
821        let tx_fetcher_has_capacity = self.has_capacity(
822            info.max_inflight_requests /
823                DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_INTERSECTION,
824        );
825        let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
826            DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_INTERSECTION,
827        );
828
829        if tx_fetcher_has_capacity && tx_pool_has_capacity {
830            // unlimited search breadth
831            None
832        } else {
833            // limited breadth of search for idle peer
834            let limit = DEFAULT_BUDGET_FIND_INTERSECTION_ANNOUNCED_BY_PEER_AND_PENDING_FETCH;
835
836            trace!(target: "net::tx",
837                inflight_requests=self.inflight_requests.len(),
838                max_inflight_transaction_requests=self.info.max_inflight_requests,
839                hashes_pending_fetch=self.hashes_pending_fetch.len(),
840                limit=limit,
841                "search breadth limited in search for intersection of hashes announced by peer and hashes pending fetch"
842            );
843
844            Some(limit)
845        }
846    }
847
848    /// Returns the approx number of transactions that a [`GetPooledTransactions`] request will
849    /// have capacity for w.r.t. the given version of the protocol.
850    pub const fn approx_capacity_get_pooled_transactions_req(
851        &self,
852        announcement_version: EthVersion,
853    ) -> usize {
854        if announcement_version.is_eth68() {
855            approx_capacity_get_pooled_transactions_req_eth68(&self.info)
856        } else {
857            approx_capacity_get_pooled_transactions_req_eth66()
858        }
859    }
860
861    /// Processes a resolved [`GetPooledTransactions`] request. Queues the outcome as a
862    /// [`FetchEvent`], which will then be streamed by
863    /// [`TransactionsManager`](super::TransactionsManager).
864    pub fn on_resolved_get_pooled_transactions_request_fut(
865        &mut self,
866        response: GetPooledTxResponse<N::PooledTransaction>,
867    ) -> FetchEvent<N::PooledTransaction> {
868        // update peer activity, requests for buffered hashes can only be made to idle
869        // fallback peers
870        let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response;
871
872        self.decrement_inflight_request_count_for(&peer_id);
873
874        match result {
875            Ok(Ok(transactions)) => {
876                //
877                // 1. peer has failed to serve any of the hashes it has announced to us that we,
878                // as a follow, have requested
879                //
880                if transactions.is_empty() {
881                    trace!(target: "net::tx",
882                        peer_id=format!("{peer_id:#}"),
883                        requested_hashes_len=requested_hashes.len(),
884                        "received empty `PooledTransactions` response from peer, peer failed to serve hashes it announced"
885                    );
886
887                    return FetchEvent::EmptyResponse { peer_id }
888                }
889
890                //
891                // 2. filter out hashes that we didn't request
892                //
893                let payload = UnverifiedPooledTransactions::new(transactions);
894
895                let unverified_len = payload.len();
896                let (verification_outcome, verified_payload) =
897                    payload.verify(&requested_hashes, &peer_id);
898
899                let unsolicited = unverified_len - verified_payload.len();
900                if unsolicited > 0 {
901                    self.metrics.unsolicited_transactions.increment(unsolicited as u64);
902                }
903                if verification_outcome == VerificationOutcome::ReportPeer {
904                    // todo: report peer for sending hashes that weren't requested
905                    trace!(target: "net::tx",
906                        peer_id=format!("{peer_id:#}"),
907                        unverified_len,
908                        verified_payload_len=verified_payload.len(),
909                        "received `PooledTransactions` response from peer with entries that didn't verify against request, filtered out transactions"
910                    );
911                }
912                // peer has only sent hashes that we didn't request
913                if verified_payload.is_empty() {
914                    return FetchEvent::FetchError { peer_id, error: RequestError::BadResponse }
915                }
916
917                //
918                // 3. stateless validation of payload, e.g. dedup
919                //
920                let unvalidated_payload_len = verified_payload.len();
921
922                let (validation_outcome, valid_payload) =
923                    self.filter_valid_message.partially_filter_valid_entries(verified_payload);
924
925                // todo: validate based on announced tx size/type and report peer for sending
926                // invalid response <https://github.com/paradigmxyz/reth/issues/6529>. requires
927                // passing the rlp encoded length down from active session along with the decoded
928                // tx.
929
930                if validation_outcome == FilterOutcome::ReportPeer {
931                    trace!(target: "net::tx",
932                        peer_id=format!("{peer_id:#}"),
933                        unvalidated_payload_len,
934                        valid_payload_len=valid_payload.len(),
935                        "received invalid `PooledTransactions` response from peer, filtered out duplicate entries"
936                    );
937                }
938                // valid payload will have at least one transaction at this point. even if the tx
939                // size/type announced by the peer is different to the actual tx size/type, pass on
940                // to pending pool imports pipeline for validation.
941
942                //
943                // 4. clear received hashes
944                //
945                let requested_hashes_len = requested_hashes.len();
946                let mut fetched = Vec::with_capacity(valid_payload.len());
947                requested_hashes.retain(|requested_hash| {
948                    if valid_payload.contains_key(requested_hash) {
949                        // hash is now known, stop tracking
950                        fetched.push(*requested_hash);
951                        return false
952                    }
953                    true
954                });
955                fetched.shrink_to_fit();
956                self.metrics.fetched_transactions.increment(fetched.len() as u64);
957
958                if fetched.len() < requested_hashes_len {
959                    trace!(target: "net::tx",
960                        peer_id=format!("{peer_id:#}"),
961                        requested_hashes_len=requested_hashes_len,
962                        fetched_len=fetched.len(),
963                        "peer failed to serve hashes it announced"
964                    );
965                }
966
967                //
968                // 5. buffer left over hashes
969                //
970                self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
971
972                let transactions = valid_payload.into_data().into_values().collect();
973
974                FetchEvent::TransactionsFetched { peer_id, transactions }
975            }
976            Ok(Err(req_err)) => {
977                self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
978                FetchEvent::FetchError { peer_id, error: req_err }
979            }
980            Err(_) => {
981                self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
982                // request channel closed/dropped
983                FetchEvent::FetchError { peer_id, error: RequestError::ChannelClosed }
984            }
985        }
986    }
987}
988
989impl<N: NetworkPrimitives> Stream for TransactionFetcher<N> {
990    type Item = FetchEvent<N::PooledTransaction>;
991
992    /// Advances all inflight requests and returns the next event.
993    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
994        // `FuturesUnordered` doesn't close when `None` is returned. so just return pending.
995        // <https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=815be2b6c8003303757c3ced135f363e>
996        if self.inflight_requests.is_empty() {
997            return Poll::Pending
998        }
999
1000        if let Some(resp) = ready!(self.inflight_requests.poll_next_unpin(cx)) {
1001            return Poll::Ready(Some(self.on_resolved_get_pooled_transactions_request_fut(resp)))
1002        }
1003
1004        Poll::Pending
1005    }
1006}
1007
1008impl<T: NetworkPrimitives> Default for TransactionFetcher<T> {
1009    fn default() -> Self {
1010        Self {
1011            active_peers: LruMap::new(DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS),
1012            inflight_requests: Default::default(),
1013            hashes_pending_fetch: LruCache::new(DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH),
1014            hashes_fetch_inflight_and_pending_fetch: LruMap::new(
1015                DEFAULT_MAX_CAPACITY_CACHE_INFLIGHT_AND_PENDING_FETCH,
1016            ),
1017            filter_valid_message: Default::default(),
1018            info: TransactionFetcherInfo::default(),
1019            metrics: Default::default(),
1020        }
1021    }
1022}
1023
1024/// Metadata of a transaction hash that is yet to be fetched.
1025#[derive(Debug, Constructor)]
1026pub struct TxFetchMetadata {
1027    /// The number of times a request attempt has been made for the hash.
1028    retries: u8,
1029    /// Peers that have announced the hash, but to which a request attempt has not yet been made.
1030    fallback_peers: LruCache<PeerId>,
1031    /// Size metadata of the transaction if it has been seen in an eth68 announcement.
1032    // todo: store all seen sizes as a `(size, peer_id)` tuple to catch peers that respond with
1033    // another size tx than they announced. alt enter in request (won't catch peers announcing
1034    // wrong size for requests assembled from hashes pending fetch if stored in request fut)
1035    tx_encoded_length: Option<usize>,
1036}
1037
1038impl TxFetchMetadata {
1039    /// Returns a mutable reference to the fallback peers cache for this transaction hash.
1040    pub const fn fallback_peers_mut(&mut self) -> &mut LruCache<PeerId> {
1041        &mut self.fallback_peers
1042    }
1043
1044    /// Returns the size of the transaction, if its hash has been received in any
1045    /// [`Eth68`](reth_eth_wire::EthVersion::Eth68) announcement. If the transaction hash has only
1046    /// been seen in [`Eth66`](reth_eth_wire::EthVersion::Eth66) announcements so far, this will
1047    /// return `None`.
1048    pub const fn tx_encoded_len(&self) -> Option<usize> {
1049        self.tx_encoded_length
1050    }
1051}
1052
1053/// Represents possible events from fetching transactions.
1054#[derive(Debug)]
1055pub enum FetchEvent<T = PooledTransaction> {
1056    /// Triggered when transactions are successfully fetched.
1057    TransactionsFetched {
1058        /// The ID of the peer from which transactions were fetched.
1059        peer_id: PeerId,
1060        /// The transactions that were fetched, if available.
1061        transactions: PooledTransactions<T>,
1062    },
1063    /// Triggered when there is an error in fetching transactions.
1064    FetchError {
1065        /// The ID of the peer from which an attempt to fetch transactions resulted in an error.
1066        peer_id: PeerId,
1067        /// The specific error that occurred while fetching.
1068        error: RequestError,
1069    },
1070    /// An empty response was received.
1071    EmptyResponse {
1072        /// The ID of the sender.
1073        peer_id: PeerId,
1074    },
1075}
1076
1077/// An inflight request for [`PooledTransactions`] from a peer.
1078#[derive(Debug)]
1079pub struct GetPooledTxRequest<T = PooledTransaction> {
1080    peer_id: PeerId,
1081    /// Transaction hashes that were requested, for cleanup purposes
1082    requested_hashes: RequestTxHashes,
1083    response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1084}
1085
1086/// Upon reception of a response, a [`GetPooledTxRequest`] is deconstructed to form a
1087/// [`GetPooledTxResponse`].
1088#[derive(Debug)]
1089pub struct GetPooledTxResponse<T = PooledTransaction> {
1090    peer_id: PeerId,
1091    /// Transaction hashes that were requested, for cleanup purposes, since peer may only return a
1092    /// subset of requested hashes.
1093    requested_hashes: RequestTxHashes,
1094    result: Result<RequestResult<PooledTransactions<T>>, RecvError>,
1095}
1096
1097/// Stores the response receiver made by sending a [`GetPooledTransactions`] request to a peer's
1098/// session.
1099#[must_use = "futures do nothing unless polled"]
1100#[pin_project::pin_project]
1101#[derive(Debug)]
1102pub struct GetPooledTxRequestFut<T = PooledTransaction> {
1103    #[pin]
1104    inner: Option<GetPooledTxRequest<T>>,
1105}
1106
1107impl<T> GetPooledTxRequestFut<T> {
1108    #[inline]
1109    const fn new(
1110        peer_id: PeerId,
1111        requested_hashes: RequestTxHashes,
1112        response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1113    ) -> Self {
1114        Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) }
1115    }
1116}
1117
1118impl<T> Future for GetPooledTxRequestFut<T> {
1119    type Output = GetPooledTxResponse<T>;
1120
1121    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1122        let mut req = self.as_mut().project().inner.take().expect("polled after completion");
1123        match req.response.poll_unpin(cx) {
1124            Poll::Ready(result) => Poll::Ready(GetPooledTxResponse {
1125                peer_id: req.peer_id,
1126                requested_hashes: req.requested_hashes,
1127                result,
1128            }),
1129            Poll::Pending => {
1130                self.project().inner.set(Some(req));
1131                Poll::Pending
1132            }
1133        }
1134    }
1135}
1136
1137/// Wrapper of unverified [`PooledTransactions`].
1138#[derive(Debug, Constructor, Deref)]
1139pub struct UnverifiedPooledTransactions<T> {
1140    txns: PooledTransactions<T>,
1141}
1142
1143/// [`PooledTransactions`] that have been successfully verified.
1144#[derive(Debug, Constructor, Deref)]
1145pub struct VerifiedPooledTransactions<T> {
1146    txns: PooledTransactions<T>,
1147}
1148
1149impl<T: SignedTransaction> DedupPayload for VerifiedPooledTransactions<T> {
1150    type Value = T;
1151
1152    fn is_empty(&self) -> bool {
1153        self.txns.is_empty()
1154    }
1155
1156    fn len(&self) -> usize {
1157        self.txns.len()
1158    }
1159
1160    fn dedup(self) -> PartiallyValidData<Self::Value> {
1161        PartiallyValidData::from_raw_data(
1162            self.txns.into_iter().map(|tx| (*tx.tx_hash(), tx)).collect(),
1163            None,
1164        )
1165    }
1166}
1167
1168trait VerifyPooledTransactionsResponse {
1169    type Transaction: SignedTransaction;
1170
1171    fn verify(
1172        self,
1173        requested_hashes: &RequestTxHashes,
1174        peer_id: &PeerId,
1175    ) -> (VerificationOutcome, VerifiedPooledTransactions<Self::Transaction>);
1176}
1177
1178impl<T: SignedTransaction> VerifyPooledTransactionsResponse for UnverifiedPooledTransactions<T> {
1179    type Transaction = T;
1180
1181    fn verify(
1182        self,
1183        requested_hashes: &RequestTxHashes,
1184        _peer_id: &PeerId,
1185    ) -> (VerificationOutcome, VerifiedPooledTransactions<T>) {
1186        let mut verification_outcome = VerificationOutcome::Ok;
1187
1188        let Self { mut txns } = self;
1189
1190        #[cfg(debug_assertions)]
1191        let mut tx_hashes_not_requested: smallvec::SmallVec<[TxHash; 16]> = smallvec::smallvec!();
1192        #[cfg(not(debug_assertions))]
1193        let mut tx_hashes_not_requested_count = 0;
1194
1195        txns.0.retain(|tx| {
1196            if !requested_hashes.contains(tx.tx_hash()) {
1197                verification_outcome = VerificationOutcome::ReportPeer;
1198
1199                #[cfg(debug_assertions)]
1200                tx_hashes_not_requested.push(*tx.tx_hash());
1201                #[cfg(not(debug_assertions))]
1202                {
1203                    tx_hashes_not_requested_count += 1;
1204                }
1205
1206                return false
1207            }
1208            true
1209        });
1210
1211        #[cfg(debug_assertions)]
1212        if !tx_hashes_not_requested.is_empty() {
1213            trace!(target: "net::tx",
1214                peer_id=format!("{_peer_id:#}"),
1215                ?tx_hashes_not_requested,
1216                "transactions in `PooledTransactions` response from peer were not requested"
1217            );
1218        }
1219        #[cfg(not(debug_assertions))]
1220        if tx_hashes_not_requested_count != 0 {
1221            trace!(target: "net::tx",
1222                peer_id=format!("{_peer_id:#}"),
1223                tx_hashes_not_requested_count,
1224                "transactions in `PooledTransactions` response from peer were not requested"
1225            );
1226        }
1227
1228        (verification_outcome, VerifiedPooledTransactions::new(txns))
1229    }
1230}
1231
1232/// Outcome from verifying a [`PooledTransactions`] response. Signals to caller whether to penalize
1233/// the sender of the response or not.
1234#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1235pub enum VerificationOutcome {
1236    /// Peer behaves appropriately.
1237    Ok,
1238    /// A penalty should be flagged for the peer. Peer sent a response with unacceptably
1239    /// invalid entries.
1240    ReportPeer,
1241}
1242
1243/// Tracks stats about the [`TransactionFetcher`].
1244#[derive(Debug, Constructor)]
1245pub struct TransactionFetcherInfo {
1246    /// Max inflight [`GetPooledTransactions`] requests.
1247    pub max_inflight_requests: usize,
1248    /// Max inflight [`GetPooledTransactions`] requests per peer.
1249    pub max_inflight_requests_per_peer: u8,
1250    /// Soft limit for the byte size of the expected [`PooledTransactions`] response, upon packing
1251    /// a [`GetPooledTransactions`] request with hashes (by default less than 2 MiB worth of
1252    /// transactions is requested).
1253    pub soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
1254    /// Soft limit for the byte size of a [`PooledTransactions`] response, upon assembling the
1255    /// response. Spec'd at 2 MiB, but can be adjusted for research purpose.
1256    pub soft_limit_byte_size_pooled_transactions_response: usize,
1257    /// Max capacity of the cache of transaction hashes, for transactions that weren't yet fetched.
1258    /// A transaction is pending fetch if its hash didn't fit into a [`GetPooledTransactions`] yet,
1259    /// or it wasn't returned upon request to peers.
1260    pub max_capacity_cache_txns_pending_fetch: u32,
1261}
1262
1263impl Default for TransactionFetcherInfo {
1264    fn default() -> Self {
1265        Self::new(
1266            DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS as usize,
1267            DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
1268            DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
1269            SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
1270            DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
1271        )
1272    }
1273}
1274
1275impl From<TransactionFetcherConfig> for TransactionFetcherInfo {
1276    fn from(config: TransactionFetcherConfig) -> Self {
1277        let TransactionFetcherConfig {
1278            max_inflight_requests,
1279            max_inflight_requests_per_peer,
1280            soft_limit_byte_size_pooled_transactions_response,
1281            soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1282            max_capacity_cache_txns_pending_fetch,
1283        } = config;
1284
1285        Self::new(
1286            max_inflight_requests as usize,
1287            max_inflight_requests_per_peer,
1288            soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1289            soft_limit_byte_size_pooled_transactions_response,
1290            max_capacity_cache_txns_pending_fetch,
1291        )
1292    }
1293}
1294
1295#[derive(Debug, Default)]
1296struct TxFetcherSearchDurations {
1297    find_idle_peer: Duration,
1298    fill_request: Duration,
1299}
1300
1301#[cfg(test)]
1302mod test {
1303    use super::*;
1304    use crate::test_utils::transactions::{buffer_hash_to_tx_fetcher, new_mock_session};
1305    use alloy_primitives::{hex, B256};
1306    use alloy_rlp::Decodable;
1307    use derive_more::IntoIterator;
1308    use reth_ethereum_primitives::TransactionSigned;
1309    use std::{collections::HashSet, str::FromStr};
1310
1311    #[derive(IntoIterator)]
1312    struct TestValidAnnouncementData(Vec<(TxHash, Option<(u8, usize)>)>);
1313
1314    impl HandleMempoolData for TestValidAnnouncementData {
1315        fn is_empty(&self) -> bool {
1316            self.0.is_empty()
1317        }
1318
1319        fn len(&self) -> usize {
1320            self.0.len()
1321        }
1322
1323        fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
1324            self.0.retain(|(hash, _)| f(hash))
1325        }
1326    }
1327
1328    impl HandleVersionedMempoolData for TestValidAnnouncementData {
1329        fn msg_version(&self) -> EthVersion {
1330            EthVersion::Eth68
1331        }
1332    }
1333
1334    #[test]
1335    fn pack_eth68_request() {
1336        reth_tracing::init_test_tracing();
1337
1338        // RIG TEST
1339
1340        let tx_fetcher = &mut TransactionFetcher::<EthNetworkPrimitives>::default();
1341
1342        let eth68_hashes = [
1343            B256::from_slice(&[1; 32]),
1344            B256::from_slice(&[2; 32]),
1345            B256::from_slice(&[3; 32]),
1346            B256::from_slice(&[4; 32]),
1347            B256::from_slice(&[5; 32]),
1348        ];
1349        let eth68_sizes = [
1350            DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ - MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED - 1, // first will fit
1351            DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ, // second won't
1352            2, // free space > `MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED`, third will fit, no more after this
1353            9,
1354            0,
1355        ];
1356
1357        let expected_request_hashes =
1358            [eth68_hashes[0], eth68_hashes[2]].into_iter().collect::<HashSet<_>>();
1359
1360        let expected_surplus_hashes =
1361            [eth68_hashes[1], eth68_hashes[3], eth68_hashes[4]].into_iter().collect::<HashSet<_>>();
1362
1363        let mut eth68_hashes_to_request = RequestTxHashes::with_capacity(3);
1364
1365        let valid_announcement_data = TestValidAnnouncementData(
1366            eth68_hashes
1367                .into_iter()
1368                .zip(eth68_sizes)
1369                .map(|(hash, size)| (hash, Some((0u8, size))))
1370                .collect::<Vec<_>>(),
1371        );
1372
1373        // TEST
1374
1375        let surplus_eth68_hashes =
1376            tx_fetcher.pack_request_eth68(&mut eth68_hashes_to_request, valid_announcement_data);
1377
1378        let eth68_hashes_to_request = eth68_hashes_to_request.into_iter().collect::<HashSet<_>>();
1379        let surplus_eth68_hashes = surplus_eth68_hashes.into_iter().collect::<HashSet<_>>();
1380
1381        assert_eq!(expected_request_hashes, eth68_hashes_to_request);
1382        assert_eq!(expected_surplus_hashes, surplus_eth68_hashes);
1383    }
1384
1385    #[tokio::test]
1386    async fn test_on_fetch_pending_hashes() {
1387        reth_tracing::init_test_tracing();
1388
1389        let tx_fetcher = &mut TransactionFetcher::default();
1390
1391        // RIG TEST
1392
1393        // hashes that will be fetched because they are stored as pending fetch
1394        let seen_hashes = [
1395            B256::from_slice(&[1; 32]),
1396            B256::from_slice(&[2; 32]),
1397            B256::from_slice(&[3; 32]),
1398            B256::from_slice(&[4; 32]),
1399        ];
1400        //
1401        // txns 1-3 are small, all will fit in request. no metadata has been made available for
1402        // hash 4, it has only been seen over eth66 conn, so average tx size will be assumed in
1403        // filling request.
1404        let seen_eth68_hashes_sizes = [120, 158, 116];
1405
1406        // peer that will fetch seen hashes because they are pending fetch
1407        let peer_1 = PeerId::new([1; 64]);
1408        // second peer, won't do anything in this test
1409        let peer_2 = PeerId::new([2; 64]);
1410
1411        // add seen hashes to peers seen transactions
1412        //
1413        // get handle for peer_1's session to receive request for pending hashes
1414        let (mut peer_1_data, mut peer_1_mock_session_rx) =
1415            new_mock_session(peer_1, EthVersion::Eth66);
1416        for hash in &seen_hashes {
1417            peer_1_data.seen_transactions.insert(*hash);
1418        }
1419        let (mut peer_2_data, _) = new_mock_session(peer_2, EthVersion::Eth66);
1420        for hash in &seen_hashes {
1421            peer_2_data.seen_transactions.insert(*hash);
1422        }
1423        let mut peers = HashMap::default();
1424        peers.insert(peer_1, peer_1_data);
1425        peers.insert(peer_2, peer_2_data);
1426
1427        // insert seen_hashes into tx fetcher
1428        for i in 0..3 {
1429            // insert peer_2 as fallback peer for seen_hashes
1430            buffer_hash_to_tx_fetcher(
1431                tx_fetcher,
1432                seen_hashes[i],
1433                peer_2,
1434                0,
1435                Some(seen_eth68_hashes_sizes[i]),
1436            );
1437        }
1438        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[3], peer_2, 0, None);
1439
1440        // insert pending hash without peer_1 as fallback peer, only with peer_2 as fallback peer
1441        let hash_other = B256::from_slice(&[5; 32]);
1442        buffer_hash_to_tx_fetcher(tx_fetcher, hash_other, peer_2, 0, None);
1443
1444        // add peer_1 as lru fallback peer for seen hashes
1445        for hash in &seen_hashes {
1446            buffer_hash_to_tx_fetcher(tx_fetcher, *hash, peer_1, 0, None);
1447        }
1448
1449        // seen hashes and the random hash from peer_2 are pending fetch
1450        assert_eq!(tx_fetcher.num_pending_hashes(), 5);
1451
1452        // TEST
1453
1454        tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
1455
1456        // mock session of peer_1 receives request
1457        let req = peer_1_mock_session_rx
1458            .recv()
1459            .await
1460            .expect("peer session should receive request with buffered hashes");
1461        let PeerRequest::GetPooledTransactions { request, .. } = req else { unreachable!() };
1462        let GetPooledTransactions(requested_hashes) = request;
1463
1464        assert_eq!(
1465            requested_hashes.into_iter().collect::<HashSet<_>>(),
1466            seen_hashes.into_iter().collect::<HashSet<_>>()
1467        )
1468    }
1469
1470    #[test]
1471    fn verify_response_hashes() {
1472        let input = hex!(
1473            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598daa"
1474        );
1475        let signed_tx_1: PooledTransaction =
1476            TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1477        let input = hex!(
1478            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
1479        );
1480        let signed_tx_2: PooledTransaction =
1481            TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1482
1483        // only tx 1 is requested
1484        let request_hashes = [
1485            B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e67890")
1486                .unwrap(),
1487            *signed_tx_1.hash(),
1488            B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e12345")
1489                .unwrap(),
1490            B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4edabe3")
1491                .unwrap(),
1492        ];
1493
1494        for hash in &request_hashes {
1495            assert_ne!(hash, signed_tx_2.hash())
1496        }
1497
1498        let request_hashes = RequestTxHashes::new(request_hashes.into_iter().collect());
1499
1500        // but response contains tx 1 + another tx
1501        let response_txns = PooledTransactions(vec![signed_tx_1.clone(), signed_tx_2]);
1502        let payload = UnverifiedPooledTransactions::new(response_txns);
1503
1504        let (outcome, verified_payload) = payload.verify(&request_hashes, &PeerId::ZERO);
1505
1506        assert_eq!(VerificationOutcome::ReportPeer, outcome);
1507        assert_eq!(1, verified_payload.len());
1508        assert!(verified_payload.contains(&signed_tx_1));
1509    }
1510}