Skip to main content

reth_network/transactions/
fetcher.rs

1//! `TransactionFetcher` is responsible for rate limiting and retry logic for fetching
2//! transactions. Upon receiving an announcement, functionality of the `TransactionFetcher` is
3//! used for filtering out hashes 1) for which the tx is already known and 2) unknown but the hash
4//! is already seen in a previous announcement. The hashes that remain from an announcement are
5//! then packed into a request with respect to the [`EthVersion`] of the announcement. Any hashes
6//! that don't fit into the request, are buffered in the `TransactionFetcher`. If on the other
7//! hand, space remains, hashes that the peer has previously announced are taken out of buffered
8//! hashes to fill the request up. The [`GetPooledTransactions`] request is then sent to the
9//! peer's session, this marks the peer as active with respect to
10//! `MAX_CONCURRENT_TX_REQUESTS_PER_PEER`.
11//!
12//! When a peer buffers hashes in the `TransactionsManager::on_new_pooled_transaction_hashes`
13//! pipeline, it is stored as fallback peer for those hashes. When [`TransactionsManager`] is
14//! polled, it checks if any of fallback peer is idle. If so, it packs a request for that peer,
15//! filling it from the buffered hashes. It does so until there are no more idle peers or until
16//! the hashes buffer is empty.
17//!
18//! If a [`GetPooledTransactions`] request resolves with an error, the hashes in the request are
19//! buffered with respect to `MAX_REQUEST_RETRIES_PER_TX_HASH`. So is the case if the request
20//! resolves with partial success, that is some of the requested hashes are not in the response,
21//! these are then buffered.
22//!
23//! Most healthy peers will send the same hashes in their announcements, as RLPx is a gossip
24//! protocol. This means it's unlikely, that a valid hash, will be buffered for very long
25//! before it's re-tried. Nonetheless, the capacity of the buffered hashes cache must be large
26//! enough to buffer many hashes during network failure, to allow for recovery.
27
28use super::{
29    config::TransactionFetcherConfig,
30    constants::{tx_fetcher::*, SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST},
31    PeerMetadata, PooledTransactions, SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
32};
33use crate::{
34    cache::{LruCache, LruMap},
35    duration_metered_exec,
36    metrics::TransactionFetcherMetrics,
37};
38use alloy_consensus::transaction::PooledTransaction;
39use alloy_primitives::{map::FbBuildHasher, TxHash};
40use derive_more::{Constructor, Deref};
41use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
42use pin_project::pin_project;
43use reth_eth_wire::{
44    DedupPayload, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
45    PartiallyValidData, RequestTxHashes, ValidAnnouncementData,
46};
47use reth_eth_wire_types::{EthNetworkPrimitives, NetworkPrimitives};
48use reth_network_api::PeerRequest;
49use reth_network_p2p::error::{RequestError, RequestResult};
50use reth_network_peers::PeerId;
51use reth_primitives_traits::SignedTransaction;
52use schnellru::ByLength;
53use std::{
54    collections::HashMap,
55    pin::Pin,
56    task::{ready, Context, Poll},
57    time::Duration,
58};
59use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
60use tracing::trace;
61
62/// The type responsible for fetching missing transactions from peers.
63///
64/// This will keep track of unique transaction hashes that are currently being fetched and submits
65/// new requests on announced hashes.
66#[derive(Debug)]
67#[pin_project]
68pub struct TransactionFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
69    /// All peers with to which a [`GetPooledTransactions`] request is inflight.
70    pub active_peers: LruMap<PeerId, u8, ByLength, FbBuildHasher<64>>,
71    /// All currently active [`GetPooledTransactions`] requests.
72    ///
73    /// The set of hashes encompassed by these requests are a subset of all hashes in the fetcher.
74    /// It's disjoint from the set of hashes which are awaiting an idle fallback peer in order to
75    /// be fetched.
76    #[pin]
77    pub inflight_requests: FuturesUnordered<GetPooledTxRequestFut<N::PooledTransaction>>,
78    /// Hashes that are awaiting an idle fallback peer so they can be fetched.
79    ///
80    /// This is a subset of all hashes in the fetcher, and is disjoint from the set of hashes for
81    /// which a [`GetPooledTransactions`] request is inflight.
82    pub hashes_pending_fetch: LruCache<TxHash, FbBuildHasher<32>>,
83    /// Tracks all hashes in the transaction fetcher.
84    pub hashes_fetch_inflight_and_pending_fetch:
85        LruMap<TxHash, TxFetchMetadata, ByLength, FbBuildHasher<32>>,
86    /// Info on capacity of the transaction fetcher.
87    pub info: TransactionFetcherInfo,
88    #[doc(hidden)]
89    metrics: TransactionFetcherMetrics,
90}
91
92impl<N: NetworkPrimitives> TransactionFetcher<N> {
93    /// Removes the peer from the active set.
94    pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) {
95        self.active_peers.remove(peer_id);
96    }
97
98    /// Updates metrics.
99    #[inline]
100    pub fn update_metrics(&self) {
101        let metrics = &self.metrics;
102
103        metrics.inflight_transaction_requests.set(self.inflight_requests.len() as f64);
104
105        let hashes_pending_fetch = self.hashes_pending_fetch.len() as f64;
106        let total_hashes = self.hashes_fetch_inflight_and_pending_fetch.len() as f64;
107
108        metrics.hashes_pending_fetch.set(hashes_pending_fetch);
109        metrics.hashes_inflight_transaction_requests.set(total_hashes - hashes_pending_fetch);
110    }
111
112    #[inline]
113    fn update_pending_fetch_cache_search_metrics(&self, durations: TxFetcherSearchDurations) {
114        let metrics = &self.metrics;
115
116        let TxFetcherSearchDurations { find_idle_peer, fill_request } = durations;
117        metrics
118            .duration_find_idle_fallback_peer_for_any_pending_hash
119            .set(find_idle_peer.as_secs_f64());
120        metrics.duration_fill_request_from_hashes_pending_fetch.set(fill_request.as_secs_f64());
121    }
122
123    /// Sets up transaction fetcher with config
124    pub fn with_transaction_fetcher_config(config: &TransactionFetcherConfig) -> Self {
125        let TransactionFetcherConfig {
126            max_inflight_requests,
127            max_capacity_cache_txns_pending_fetch,
128            ..
129        } = *config;
130
131        let info = config.clone().into();
132
133        let metrics = TransactionFetcherMetrics::default();
134        metrics.capacity_inflight_requests.increment(max_inflight_requests as u64);
135
136        Self {
137            active_peers: LruMap::with_hasher(max_inflight_requests, Default::default()),
138            hashes_pending_fetch: LruCache::with_hasher(
139                max_capacity_cache_txns_pending_fetch,
140                Default::default(),
141            ),
142            hashes_fetch_inflight_and_pending_fetch: LruMap::with_hasher(
143                max_inflight_requests + max_capacity_cache_txns_pending_fetch,
144                Default::default(),
145            ),
146            info,
147            metrics,
148            ..Default::default()
149        }
150    }
151
152    /// Removes the specified hashes from inflight tracking.
153    #[inline]
154    pub fn remove_hashes_from_transaction_fetcher<'a, I>(&mut self, hashes: I)
155    where
156        I: IntoIterator<Item = &'a TxHash>,
157    {
158        for hash in hashes {
159            self.hashes_fetch_inflight_and_pending_fetch.remove(hash);
160            self.hashes_pending_fetch.remove(hash);
161        }
162    }
163
164    /// Updates peer's activity status upon a resolved [`GetPooledTxRequest`].
165    fn decrement_inflight_request_count_for(&mut self, peer_id: &PeerId) {
166        let remove = || -> bool {
167            if let Some(inflight_count) = self.active_peers.get(peer_id) {
168                *inflight_count = inflight_count.saturating_sub(1);
169                if *inflight_count == 0 {
170                    return true
171                }
172            }
173            false
174        }();
175
176        if remove {
177            self.active_peers.remove(peer_id);
178        }
179    }
180
181    /// Returns `true` if peer is idle with respect to `self.inflight_requests`.
182    #[inline]
183    pub fn is_idle(&self, peer_id: &PeerId) -> bool {
184        let Some(inflight_count) = self.active_peers.peek(peer_id) else { return true };
185        if *inflight_count < self.info.max_inflight_requests_per_peer {
186            return true
187        }
188        false
189    }
190
191    /// Returns any idle peer for the given hash.
192    pub fn get_idle_peer_for(&self, hash: TxHash) -> Option<&PeerId> {
193        let TxFetchMetadata { fallback_peers, .. } =
194            self.hashes_fetch_inflight_and_pending_fetch.peek(&hash)?;
195
196        fallback_peers.iter().find(|peer_id| self.is_idle(peer_id))
197    }
198
199    /// Returns any idle peer for any hash pending fetch. If one is found, the corresponding
200    /// hash is written to the request buffer that is passed as parameter.
201    ///
202    /// Loops through the hashes pending fetch in lru order until one is found with an idle
203    /// fallback peer, or the budget passed as parameter is depleted, whatever happens first.
204    pub fn find_any_idle_fallback_peer_for_any_pending_hash(
205        &mut self,
206        hashes_to_request: &mut RequestTxHashes,
207        mut budget: Option<usize>, // search fallback peers for max `budget` lru pending hashes
208    ) -> Option<PeerId> {
209        let mut hashes_pending_fetch_iter = self.hashes_pending_fetch.iter();
210
211        let idle_peer = loop {
212            let &hash = hashes_pending_fetch_iter.next()?;
213
214            let idle_peer = self.get_idle_peer_for(hash);
215
216            if idle_peer.is_some() {
217                hashes_to_request.insert(hash);
218                break idle_peer.copied()
219            }
220
221            if let Some(ref mut bud) = budget {
222                *bud = bud.saturating_sub(1);
223                if *bud == 0 {
224                    return None
225                }
226            }
227        };
228        let hash = hashes_to_request.iter().next()?;
229
230        // pop hash that is loaded in request buffer from cache of hashes pending fetch
231        drop(hashes_pending_fetch_iter);
232        _ = self.hashes_pending_fetch.remove(hash);
233
234        idle_peer
235    }
236
237    /// Packages hashes for a [`GetPooledTxRequest`] up to limit. Returns left over hashes. Takes
238    /// a [`RequestTxHashes`] buffer as parameter for filling with hashes to request.
239    ///
240    /// Returns left over hashes.
241    pub fn pack_request(
242        &self,
243        hashes_to_request: &mut RequestTxHashes,
244        hashes_from_announcement: ValidAnnouncementData,
245    ) -> RequestTxHashes {
246        if hashes_from_announcement.msg_version().has_eth68_metadata() {
247            return self.pack_request_eth68(hashes_to_request, hashes_from_announcement)
248        }
249        self.pack_request_eth66(hashes_to_request, hashes_from_announcement)
250    }
251
252    /// Packages hashes for a [`GetPooledTxRequest`] from an
253    /// [`Eth68`](reth_eth_wire::EthVersion::Eth68) announcement up to limit as defined by protocol
254    /// version 68. Takes a [`RequestTxHashes`] buffer as parameter for filling with hashes to
255    /// request.
256    ///
257    /// Returns left over hashes.
258    ///
259    /// Loops through hashes passed as parameter and checks if a hash fits in the expected
260    /// response. If no, it's added to surplus hashes. If yes, it's added to hashes to the request
261    /// and expected response size is accumulated.
262    pub fn pack_request_eth68(
263        &self,
264        hashes_to_request: &mut RequestTxHashes,
265        hashes_from_announcement: impl HandleMempoolData
266            + IntoIterator<Item = (TxHash, Option<(u8, usize)>)>,
267    ) -> RequestTxHashes {
268        let mut acc_size_response = 0;
269
270        let mut hashes_from_announcement_iter = hashes_from_announcement.into_iter();
271
272        if let Some((hash, Some((_ty, size)))) = hashes_from_announcement_iter.next() {
273            hashes_to_request.insert(hash);
274
275            // tx is really big, pack request with single tx
276            if size >= self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request {
277                return hashes_from_announcement_iter.collect()
278            }
279            acc_size_response = size;
280        }
281
282        let mut surplus_hashes = RequestTxHashes::default();
283
284        // folds size based on expected response size  and adds selected hashes to the request
285        // list and the other hashes to the surplus list
286        for (hash, metadata) in hashes_from_announcement_iter.by_ref() {
287            let Some((_ty, size)) = metadata else {
288                unreachable!("this method is called upon reception of an eth68 announcement")
289            };
290
291            let next_acc_size = acc_size_response.checked_add(size).filter(|next_acc_size| {
292                *next_acc_size <=
293                    self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request
294            });
295
296            if let Some(next_acc_size) = next_acc_size {
297                // only update accumulated size of tx response if tx will fit in without exceeding
298                // soft limit
299                acc_size_response = next_acc_size;
300                _ = hashes_to_request.insert(hash)
301            } else {
302                _ = surplus_hashes.insert(hash)
303            }
304
305            let free_space =
306                self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request -
307                    acc_size_response;
308
309            if free_space < MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED {
310                break
311            }
312        }
313
314        surplus_hashes.extend(hashes_from_announcement_iter.map(|(hash, _metadata)| hash));
315
316        surplus_hashes
317    }
318
319    /// Packages hashes for a [`GetPooledTxRequest`] from an
320    /// [`Eth66`](reth_eth_wire::EthVersion::Eth66) announcement up to limit as defined by
321    /// protocol version 66. Takes a [`RequestTxHashes`] buffer as parameter for filling with
322    /// hashes to request.
323    ///
324    /// Returns left over hashes.
325    pub fn pack_request_eth66(
326        &self,
327        hashes_to_request: &mut RequestTxHashes,
328        hashes_from_announcement: ValidAnnouncementData,
329    ) -> RequestTxHashes {
330        let (mut hashes, _version) = hashes_from_announcement.into_request_hashes();
331        if hashes.len() <= SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST {
332            *hashes_to_request = hashes;
333            hashes_to_request.shrink_to_fit();
334
335            RequestTxHashes::default()
336        } else {
337            let surplus_hashes =
338                hashes.retain_count(SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST);
339            *hashes_to_request = hashes;
340            hashes_to_request.shrink_to_fit();
341
342            surplus_hashes
343        }
344    }
345
346    /// Tries to buffer hashes for retry.
347    pub fn try_buffer_hashes_for_retry(
348        &mut self,
349        mut hashes: RequestTxHashes,
350        peer_failed_to_serve: &PeerId,
351    ) {
352        // It could be that the txns have been received over broadcast in the time being. Remove
353        // the peer as fallback peer so it isn't request again for these hashes.
354        hashes.retain(|hash| {
355            if let Some(entry) = self.hashes_fetch_inflight_and_pending_fetch.get(hash) {
356                entry.fallback_peers_mut().remove(peer_failed_to_serve);
357                return true
358            }
359            // tx has been seen over broadcast in the time it took for the request to resolve
360            false
361        });
362
363        self.buffer_hashes(hashes, None)
364    }
365
366    /// Number of hashes pending fetch.
367    pub fn num_pending_hashes(&self) -> usize {
368        self.hashes_pending_fetch.len()
369    }
370
371    /// Number of all transaction hashes in the fetcher.
372    pub fn num_all_hashes(&self) -> usize {
373        self.hashes_fetch_inflight_and_pending_fetch.len()
374    }
375
376    /// Buffers hashes. Note: Only peers that haven't yet tried to request the hashes should be
377    /// passed as `fallback_peer` parameter! For re-buffering hashes on failed request, use
378    /// [`TransactionFetcher::try_buffer_hashes_for_retry`]. Hashes that have been re-requested
379    /// [`DEFAULT_MAX_RETRIES`], are dropped.
380    pub fn buffer_hashes(&mut self, hashes: RequestTxHashes, fallback_peer: Option<PeerId>) {
381        for hash in hashes {
382            // hash could have been evicted from bounded lru map
383            if self.hashes_fetch_inflight_and_pending_fetch.peek(&hash).is_none() {
384                continue
385            }
386
387            let Some(TxFetchMetadata { retries, fallback_peers, .. }) =
388                self.hashes_fetch_inflight_and_pending_fetch.get(&hash)
389            else {
390                continue
391            };
392
393            if let Some(peer_id) = fallback_peer {
394                // peer has not yet requested hash
395                fallback_peers.insert(peer_id);
396            } else {
397                if *retries >= DEFAULT_MAX_RETRIES {
398                    trace!(target: "net::tx",
399                        %hash,
400                        retries,
401                        "retry limit for `GetPooledTransactions` requests reached for hash, dropping hash"
402                    );
403
404                    self.hashes_fetch_inflight_and_pending_fetch.remove(&hash);
405                    self.hashes_pending_fetch.remove(&hash);
406                    continue
407                }
408                *retries += 1;
409            }
410
411            if let (_, Some(evicted_hash)) = self.hashes_pending_fetch.insert_and_get_evicted(hash)
412            {
413                self.hashes_fetch_inflight_and_pending_fetch.remove(&evicted_hash);
414            }
415        }
416    }
417
418    /// Tries to request hashes pending fetch.
419    ///
420    /// Finds the first buffered hash with a fallback peer that is idle, if any. Fills the rest of
421    /// the request by checking the transactions seen by the peer against the buffer.
422    pub fn on_fetch_pending_hashes(
423        &mut self,
424        peers: &HashMap<PeerId, PeerMetadata<N>>,
425        has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
426    ) -> bool {
427        let mut hashes_to_request = RequestTxHashes::with_capacity(
428            DEFAULT_MARGINAL_COUNT_HASHES_GET_POOLED_TRANSACTIONS_REQUEST,
429        );
430        let mut search_durations = TxFetcherSearchDurations::default();
431
432        // budget to look for an idle peer before giving up
433        let budget_find_idle_fallback_peer = self
434            .search_breadth_budget_find_idle_fallback_peer(&has_capacity_wrt_pending_pool_imports);
435
436        let peer_id = duration_metered_exec!(
437            {
438                let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash(
439                    &mut hashes_to_request,
440                    budget_find_idle_fallback_peer,
441                ) else {
442                    // no peers are idle or budget is depleted
443                    return false
444                };
445
446                peer_id
447            },
448            search_durations.find_idle_peer
449        );
450
451        // peer may have disconnected between idle check and here, re-buffer hashes so they
452        // aren't lost from the pending fetch cache
453        let Some(peer) = peers.get(&peer_id) else {
454            self.buffer_hashes(hashes_to_request, None);
455            return false
456        };
457        let conn_eth_version = peer.version;
458
459        // fill the request with more hashes pending fetch that have been announced by the peer.
460        // the search for more hashes is done with respect to the given budget, which determines
461        // how many hashes to loop through before giving up. if no more hashes are found wrt to
462        // the budget, the single hash that was taken out of the cache above is sent in a request.
463        let budget_fill_request = self
464            .search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
465                &has_capacity_wrt_pending_pool_imports,
466            );
467
468        duration_metered_exec!(
469            {
470                self.fill_request_from_hashes_pending_fetch(
471                    &mut hashes_to_request,
472                    &peer.seen_transactions,
473                    budget_fill_request,
474                )
475            },
476            search_durations.fill_request
477        );
478
479        self.update_pending_fetch_cache_search_metrics(search_durations);
480
481        trace!(target: "net::tx",
482            peer_id=format!("{peer_id:#}"),
483            hashes=?*hashes_to_request,
484            %conn_eth_version,
485            "requesting hashes that were stored pending fetch from peer"
486        );
487
488        // request the buffered missing transactions
489        if let Some(failed_to_request_hashes) =
490            self.request_transactions_from_peer(hashes_to_request, peer)
491        {
492            trace!(target: "net::tx",
493                peer_id=format!("{peer_id:#}"),
494                ?failed_to_request_hashes,
495                %conn_eth_version,
496                "failed sending request to peer's session, buffering hashes"
497            );
498
499            self.buffer_hashes(failed_to_request_hashes, Some(peer_id));
500            return false
501        }
502
503        true
504    }
505
506    /// Filters out hashes that have been seen before. For hashes that have already been seen, the
507    /// peer is added as fallback peer.
508    pub fn filter_unseen_and_pending_hashes(
509        &mut self,
510        new_announced_hashes: &mut ValidAnnouncementData,
511        is_tx_bad_import: impl Fn(&TxHash) -> bool,
512        peer_id: &PeerId,
513        client_version: &str,
514    ) {
515        let mut previously_unseen_hashes_count = 0;
516
517        let msg_version = new_announced_hashes.msg_version();
518
519        // filter out inflight hashes, and register the peer as fallback for all inflight hashes
520        new_announced_hashes.retain(|hash, metadata| {
521
522            // occupied entry
523            if let Some(TxFetchMetadata{ tx_encoded_length: previously_seen_size, ..}) = self.hashes_fetch_inflight_and_pending_fetch.peek_mut(hash) {
524                // update size metadata if available
525                if let Some((_ty, size)) = metadata {
526                    if let Some(prev_size) = previously_seen_size {
527                        // check if this peer is announcing a different size than a previous peer
528                        if size != prev_size {
529                            trace!(target: "net::tx",
530                                peer_id=format!("{peer_id:#}"),
531                                %hash,
532                                size,
533                                previously_seen_size,
534                                %client_version,
535                                "peer announced a different size for tx, this is especially worrying if one size is much bigger..."
536                            );
537                        }
538                    }
539                    // believe the most recent peer to announce tx
540                    *previously_seen_size = Some(*size);
541                }
542
543                // hash has been seen but is not inflight
544                if self.hashes_pending_fetch.remove(hash) {
545                    return true
546                }
547
548                return false
549            }
550
551            // vacant entry
552
553            if is_tx_bad_import(hash) {
554                return false
555            }
556
557            previously_unseen_hashes_count += 1;
558
559            if self
560                .hashes_fetch_inflight_and_pending_fetch
561                .get_or_insert(*hash, || TxFetchMetadata {
562                    retries: 0,
563                    fallback_peers: LruCache::with_hasher(
564                        DEFAULT_MAX_COUNT_FALLBACK_PEERS as u32,
565                        Default::default(),
566                    ),
567                    tx_encoded_length: None,
568                })
569                .is_none()
570            {
571
572                trace!(target: "net::tx",
573                    peer_id=format!("{peer_id:#}"),
574                    %hash,
575                    ?msg_version,
576                    %client_version,
577                    "failed to cache new announced hash from peer in schnellru::LruMap, dropping hash"
578                );
579
580                return false
581            }
582            true
583        });
584
585        trace!(target: "net::tx",
586            peer_id=format!("{peer_id:#}"),
587            previously_unseen_hashes_count=previously_unseen_hashes_count,
588            msg_version=?msg_version,
589            client_version=%client_version,
590            "received previously unseen hashes in announcement from peer"
591        );
592    }
593
594    /// Requests the missing transactions from the previously unseen announced hashes of the peer.
595    /// Returns the requested hashes if the request concurrency limit is reached or if the request
596    /// fails to send over the channel to the peer's session task.
597    ///
598    /// This filters all announced hashes that are already in flight, and requests the missing,
599    /// while marking the given peer as an alternative peer for the hashes that are already in
600    /// flight.
601    pub fn request_transactions_from_peer(
602        &mut self,
603        new_announced_hashes: RequestTxHashes,
604        peer: &PeerMetadata<N>,
605    ) -> Option<RequestTxHashes> {
606        let peer_id: PeerId = peer.request_tx.peer_id;
607        let conn_eth_version = peer.version;
608
609        if self.active_peers.len() >= self.info.max_inflight_requests {
610            trace!(target: "net::tx",
611                peer_id=format!("{peer_id:#}"),
612                hashes=?*new_announced_hashes,
613                %conn_eth_version,
614                max_inflight_transaction_requests=self.info.max_inflight_requests,
615                "limit for concurrent `GetPooledTransactions` requests reached, dropping request for hashes to peer"
616            );
617            return Some(new_announced_hashes)
618        }
619
620        let Some(inflight_count) = self.active_peers.get_or_insert(peer_id, || 0) else {
621            trace!(target: "net::tx",
622                peer_id=format!("{peer_id:#}"),
623                hashes=?*new_announced_hashes,
624                conn_eth_version=%conn_eth_version,
625                "failed to cache active peer in schnellru::LruMap, dropping request to peer"
626            );
627            return Some(new_announced_hashes)
628        };
629
630        if *inflight_count >= self.info.max_inflight_requests_per_peer {
631            trace!(target: "net::tx",
632                peer_id=format!("{peer_id:#}"),
633                hashes=?*new_announced_hashes,
634                %conn_eth_version,
635                max_concurrent_tx_reqs_per_peer=self.info.max_inflight_requests_per_peer,
636                "limit for concurrent `GetPooledTransactions` requests per peer reached"
637            );
638            return Some(new_announced_hashes)
639        }
640
641        #[cfg(debug_assertions)]
642        {
643            for hash in &new_announced_hashes {
644                if self.hashes_pending_fetch.contains(hash) {
645                    tracing::debug!(target: "net::tx", "`{}` should have been taken out of buffer before packing in a request, breaks invariant `@hashes_pending_fetch` and `@inflight_requests`, `@hashes_fetch_inflight_and_pending_fetch` for `{}`: {:?}",
646                        format!("{:?}", new_announced_hashes), // Assuming new_announced_hashes can be debug-printed directly
647                        format!("{:?}", new_announced_hashes),
648                        new_announced_hashes.iter().map(|hash| {
649                            let metadata = self.hashes_fetch_inflight_and_pending_fetch.get(hash);
650                            // Assuming you only need `retries` and `tx_encoded_length` for debugging
651                            (*hash, metadata.map(|m| (m.retries, m.tx_encoded_length)))
652                        }).collect::<Vec<(TxHash, Option<(u8, Option<usize>)>)>>())
653                }
654            }
655        }
656
657        let (response, rx) = oneshot::channel();
658        let req = PeerRequest::GetPooledTransactions {
659            request: GetPooledTransactions(new_announced_hashes.iter().copied().collect()),
660            response,
661        };
662
663        // try to send the request to the peer
664        if let Err(err) = peer.request_tx.try_send(req) {
665            // peer channel is full
666            return match err {
667                TrySendError::Full(_) | TrySendError::Closed(_) => {
668                    self.metrics.egress_peer_channel_full.increment(1);
669                    Some(new_announced_hashes)
670                }
671            }
672        }
673
674        *inflight_count += 1;
675        // stores a new request future for the request
676        self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, new_announced_hashes, rx));
677
678        None
679    }
680
681    /// Tries to fill request with hashes pending fetch so that the expected [`PooledTransactions`]
682    /// response is full enough. A mutable reference to a list of hashes to request is passed as
683    /// parameter. A budget is passed as parameter, this ensures that the node stops searching
684    /// for more hashes after the budget is depleted. Under bad network conditions, the cache of
685    /// hashes pending fetch may become very full for a while. As the node recovers, the hashes
686    /// pending fetch cache should get smaller. The budget should aim to be big enough to loop
687    /// through all buffered hashes in good network conditions.
688    ///
689    /// The request hashes buffer is filled as if it's an eth68 request, i.e. smartly assemble
690    /// the request based on expected response size. For any hash missing size metadata, it is
691    /// guessed at [`AVERAGE_BYTE_SIZE_TX_ENCODED`].
692    ///
693    /// Loops through hashes pending fetch and does:
694    ///
695    /// 1. Check if a hash pending fetch is seen by peer.
696    /// 2. Optimistically include the hash in the request.
697    /// 3. Accumulate expected total response size.
698    /// 4. Check if acc size and hashes count is at limit, if so stop looping.
699    /// 5. Remove hashes to request from cache of hashes pending fetch.
700    pub fn fill_request_from_hashes_pending_fetch(
701        &mut self,
702        hashes_to_request: &mut RequestTxHashes,
703        seen_hashes: &LruCache<TxHash, FbBuildHasher<32>>,
704        mut budget_fill_request: Option<usize>, // check max `budget` lru pending hashes
705    ) {
706        let Some(hash) = hashes_to_request.iter().next() else { return };
707
708        let mut acc_size_response = self
709            .hashes_fetch_inflight_and_pending_fetch
710            .get(hash)
711            .and_then(|entry| entry.tx_encoded_len())
712            .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
713
714        // if request full enough already, we're satisfied, send request for single tx
715        if acc_size_response >=
716            DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES
717        {
718            return
719        }
720
721        // try to fill request by checking if any other hashes pending fetch (in lru order) are
722        // also seen by peer
723        for hash in self.hashes_pending_fetch.iter() {
724            // 1. Check if a hash pending fetch is seen by peer.
725            if !seen_hashes.contains(hash) {
726                continue
727            };
728
729            // 2. Optimistically include the hash in the request.
730            hashes_to_request.insert(*hash);
731
732            // 3. Accumulate expected total response size.
733            let size = self
734                .hashes_fetch_inflight_and_pending_fetch
735                .get(hash)
736                .and_then(|entry| entry.tx_encoded_len())
737                .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
738
739            acc_size_response = acc_size_response.saturating_add(size);
740
741            // 4. Check if acc size or hashes count is at limit, if so stop looping.
742            // if expected response is full enough or the number of hashes in the request is
743            // enough, we're satisfied
744            if acc_size_response >=
745                DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES ||
746                hashes_to_request.len() >
747                    DEFAULT_SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST_ON_FETCH_PENDING_HASHES
748            {
749                break
750            }
751
752            if let Some(ref mut bud) = budget_fill_request {
753                *bud -= 1;
754                if *bud == 0 {
755                    break
756                }
757            }
758        }
759
760        // 5. Remove hashes to request from cache of hashes pending fetch.
761        for hash in hashes_to_request.iter() {
762            self.hashes_pending_fetch.remove(hash);
763        }
764    }
765
766    /// Returns `true` if [`TransactionFetcher`] has capacity to request pending hashes. Returns
767    /// `false` if [`TransactionFetcher`] is operating close to full capacity.
768    pub fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
769        let info = &self.info;
770
771        self.has_capacity(info.max_inflight_requests)
772    }
773
774    /// Returns `true` if the number of inflight requests are under a given tolerated max.
775    fn has_capacity(&self, max_inflight_requests: usize) -> bool {
776        self.inflight_requests.len() <= max_inflight_requests
777    }
778
779    /// Returns the limit to enforce when looking for any pending hash with an idle fallback peer.
780    ///
781    /// Returns `Some(limit)` if [`TransactionFetcher`] and the
782    /// [`TransactionPool`](reth_transaction_pool::TransactionPool) are operating close to full
783    /// capacity. Returns `None`, unlimited, if they are not that busy.
784    pub fn search_breadth_budget_find_idle_fallback_peer(
785        &self,
786        has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
787    ) -> Option<usize> {
788        let info = &self.info;
789
790        let tx_fetcher_has_capacity = self.has_capacity(
791            info.max_inflight_requests /
792                DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_IDLE_PEER,
793        );
794        let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
795            DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_IDLE_PEER,
796        );
797
798        if tx_fetcher_has_capacity && tx_pool_has_capacity {
799            // unlimited search breadth
800            None
801        } else {
802            // limited breadth of search for idle peer
803            let limit = DEFAULT_BUDGET_FIND_IDLE_FALLBACK_PEER;
804
805            trace!(target: "net::tx",
806                inflight_requests=self.inflight_requests.len(),
807                max_inflight_transaction_requests=info.max_inflight_requests,
808                hashes_pending_fetch=self.hashes_pending_fetch.len(),
809                limit,
810                "search breadth limited in search for idle fallback peer for some hash pending fetch"
811            );
812
813            Some(limit)
814        }
815    }
816
817    /// Returns the limit to enforce when looking for the intersection between hashes announced by
818    /// peer and hashes pending fetch.
819    ///
820    /// Returns `Some(limit)` if [`TransactionFetcher`] and the
821    /// [`TransactionPool`](reth_transaction_pool::TransactionPool) are operating close to full
822    /// capacity. Returns `None`, unlimited, if they are not that busy.
823    pub fn search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
824        &self,
825        has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
826    ) -> Option<usize> {
827        let info = &self.info;
828
829        let tx_fetcher_has_capacity = self.has_capacity(
830            info.max_inflight_requests /
831                DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_INTERSECTION,
832        );
833        let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
834            DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_INTERSECTION,
835        );
836
837        if tx_fetcher_has_capacity && tx_pool_has_capacity {
838            // unlimited search breadth
839            None
840        } else {
841            // limited breadth of search for idle peer
842            let limit = DEFAULT_BUDGET_FIND_INTERSECTION_ANNOUNCED_BY_PEER_AND_PENDING_FETCH;
843
844            trace!(target: "net::tx",
845                inflight_requests=self.inflight_requests.len(),
846                max_inflight_transaction_requests=self.info.max_inflight_requests,
847                hashes_pending_fetch=self.hashes_pending_fetch.len(),
848                limit=limit,
849                "search breadth limited in search for intersection of hashes announced by peer and hashes pending fetch"
850            );
851
852            Some(limit)
853        }
854    }
855
856    /// Processes a resolved [`GetPooledTransactions`] request. Queues the outcome as a
857    /// [`FetchEvent`], which will then be streamed by
858    /// [`TransactionsManager`](super::TransactionsManager).
859    pub fn on_resolved_get_pooled_transactions_request_fut(
860        &mut self,
861        response: GetPooledTxResponse<N::PooledTransaction>,
862    ) -> FetchEvent<N::PooledTransaction> {
863        // update peer activity, requests for buffered hashes can only be made to idle
864        // fallback peers
865        let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response;
866
867        self.decrement_inflight_request_count_for(&peer_id);
868
869        match result {
870            Ok(Ok(transactions)) => {
871                //
872                // 1. peer has failed to serve any of the hashes it has announced to us that we,
873                // as a follow, have requested
874                //
875                if transactions.is_empty() {
876                    trace!(target: "net::tx",
877                        peer_id=format!("{peer_id:#}"),
878                        requested_hashes_len=requested_hashes.len(),
879                        "received empty `PooledTransactions` response from peer, peer failed to serve hashes it announced"
880                    );
881
882                    return FetchEvent::EmptyResponse { peer_id }
883                }
884
885                //
886                // 2. filter out hashes that we didn't request
887                //
888                let payload = UnverifiedPooledTransactions::new(transactions);
889
890                let unverified_len = payload.len();
891                let (verification_outcome, verified_payload) =
892                    payload.verify(&requested_hashes, &peer_id);
893
894                let unsolicited = unverified_len - verified_payload.len();
895                if unsolicited > 0 {
896                    self.metrics.unsolicited_transactions.increment(unsolicited as u64);
897                }
898
899                let report_peer = if verification_outcome == VerificationOutcome::ReportPeer {
900                    trace!(target: "net::tx",
901                        peer_id=format!("{peer_id:#}"),
902                        unverified_len,
903                        verified_payload_len=verified_payload.len(),
904                        "received `PooledTransactions` response from peer with entries that didn't verify against request, filtered out transactions"
905                    );
906                    true
907                } else {
908                    false
909                };
910
911                // peer has only sent hashes that we didn't request
912                if verified_payload.is_empty() {
913                    return FetchEvent::FetchError { peer_id, error: RequestError::BadResponse }
914                }
915
916                //
917                // 3. stateless validation of payload, e.g. dedup
918                //
919                let unvalidated_payload_len = verified_payload.len();
920
921                let valid_payload = verified_payload.dedup();
922
923                // todo: validate based on announced tx size/type and report peer for sending
924                // invalid response <https://github.com/paradigmxyz/reth/issues/6529>. requires
925                // passing the rlp encoded length down from active session along with the decoded
926                // tx.
927
928                if valid_payload.len() != unvalidated_payload_len {
929                    trace!(target: "net::tx",
930                    peer_id=format!("{peer_id:#}"),
931                    unvalidated_payload_len,
932                    valid_payload_len=valid_payload.len(),
933                    "received `PooledTransactions` response from peer with duplicate entries, filtered them out"
934                    );
935                }
936                // valid payload will have at least one transaction at this point. even if the tx
937                // size/type announced by the peer is different to the actual tx size/type, pass on
938                // to pending pool imports pipeline for validation.
939
940                //
941                // 4. clear received hashes
942                //
943                let requested_hashes_len = requested_hashes.len();
944                let mut fetched = Vec::with_capacity(valid_payload.len());
945                requested_hashes.retain(|requested_hash| {
946                    if valid_payload.contains_key(requested_hash) {
947                        // hash is now known, stop tracking
948                        fetched.push(*requested_hash);
949                        return false
950                    }
951                    true
952                });
953                fetched.shrink_to_fit();
954                self.metrics.fetched_transactions.increment(fetched.len() as u64);
955
956                if fetched.len() < requested_hashes_len {
957                    trace!(target: "net::tx",
958                        peer_id=format!("{peer_id:#}"),
959                        requested_hashes_len=requested_hashes_len,
960                        fetched_len=fetched.len(),
961                        "peer failed to serve hashes it announced"
962                    );
963                }
964
965                //
966                // 5. buffer left over hashes
967                //
968                self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
969
970                let transactions = valid_payload.into_data().into_values().collect();
971
972                FetchEvent::TransactionsFetched { peer_id, transactions, report_peer }
973            }
974            Ok(Err(req_err)) => {
975                self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
976                FetchEvent::FetchError { peer_id, error: req_err }
977            }
978            Err(_) => {
979                self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
980                // request channel closed/dropped
981                FetchEvent::FetchError { peer_id, error: RequestError::ChannelClosed }
982            }
983        }
984    }
985}
986
987impl<N: NetworkPrimitives> Stream for TransactionFetcher<N> {
988    type Item = FetchEvent<N::PooledTransaction>;
989
990    /// Advances all inflight requests and returns the next event.
991    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
992        // `FuturesUnordered` doesn't close when `None` is returned. so just return pending.
993        // <https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=815be2b6c8003303757c3ced135f363e>
994        if self.inflight_requests.is_empty() {
995            return Poll::Pending
996        }
997
998        if let Some(resp) = ready!(self.inflight_requests.poll_next_unpin(cx)) {
999            return Poll::Ready(Some(self.on_resolved_get_pooled_transactions_request_fut(resp)))
1000        }
1001
1002        Poll::Pending
1003    }
1004}
1005
1006impl<T: NetworkPrimitives> Default for TransactionFetcher<T> {
1007    fn default() -> Self {
1008        Self {
1009            active_peers: LruMap::with_hasher(
1010                DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS,
1011                Default::default(),
1012            ),
1013            inflight_requests: Default::default(),
1014            hashes_pending_fetch: LruCache::with_hasher(
1015                DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
1016                Default::default(),
1017            ),
1018            hashes_fetch_inflight_and_pending_fetch: LruMap::with_hasher(
1019                DEFAULT_MAX_CAPACITY_CACHE_INFLIGHT_AND_PENDING_FETCH,
1020                Default::default(),
1021            ),
1022            info: TransactionFetcherInfo::default(),
1023            metrics: Default::default(),
1024        }
1025    }
1026}
1027
1028/// Metadata of a transaction hash that is yet to be fetched.
1029#[derive(Debug, Constructor)]
1030pub struct TxFetchMetadata {
1031    /// The number of times a request attempt has been made for the hash.
1032    retries: u8,
1033    /// Peers that have announced the hash, but to which a request attempt has not yet been made.
1034    fallback_peers: LruCache<PeerId, FbBuildHasher<64>>,
1035    /// Size metadata of the transaction if it has been seen in an eth68 announcement.
1036    // todo: store all seen sizes as a `(size, peer_id)` tuple to catch peers that respond with
1037    // another size tx than they announced. alt enter in request (won't catch peers announcing
1038    // wrong size for requests assembled from hashes pending fetch if stored in request fut)
1039    tx_encoded_length: Option<usize>,
1040}
1041
1042impl TxFetchMetadata {
1043    /// Returns a mutable reference to the fallback peers cache for this transaction hash.
1044    pub const fn fallback_peers_mut(&mut self) -> &mut LruCache<PeerId, FbBuildHasher<64>> {
1045        &mut self.fallback_peers
1046    }
1047
1048    /// Returns the size of the transaction, if its hash has been received in any
1049    /// [`Eth68`](reth_eth_wire::EthVersion::Eth68) announcement. If the transaction hash has only
1050    /// been seen in [`Eth66`](reth_eth_wire::EthVersion::Eth66) announcements so far, this will
1051    /// return `None`.
1052    pub const fn tx_encoded_len(&self) -> Option<usize> {
1053        self.tx_encoded_length
1054    }
1055}
1056
1057/// Represents possible events from fetching transactions.
1058#[derive(Debug)]
1059pub enum FetchEvent<T = PooledTransaction> {
1060    /// Triggered when transactions are successfully fetched.
1061    TransactionsFetched {
1062        /// The ID of the peer from which transactions were fetched.
1063        peer_id: PeerId,
1064        /// The transactions that were fetched, if available.
1065        transactions: PooledTransactions<T>,
1066        /// Whether the peer should be penalized for sending unsolicited transactions or for
1067        /// misbehavior.
1068        report_peer: bool,
1069    },
1070    /// Triggered when there is an error in fetching transactions.
1071    FetchError {
1072        /// The ID of the peer from which an attempt to fetch transactions resulted in an error.
1073        peer_id: PeerId,
1074        /// The specific error that occurred while fetching.
1075        error: RequestError,
1076    },
1077    /// An empty response was received.
1078    EmptyResponse {
1079        /// The ID of the sender.
1080        peer_id: PeerId,
1081    },
1082}
1083
1084/// An inflight request for [`PooledTransactions`] from a peer.
1085#[derive(Debug)]
1086pub struct GetPooledTxRequest<T = PooledTransaction> {
1087    peer_id: PeerId,
1088    /// Transaction hashes that were requested, for cleanup purposes
1089    requested_hashes: RequestTxHashes,
1090    response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1091}
1092
1093/// Upon reception of a response, a [`GetPooledTxRequest`] is deconstructed to form a
1094/// [`GetPooledTxResponse`].
1095#[derive(Debug)]
1096pub struct GetPooledTxResponse<T = PooledTransaction> {
1097    peer_id: PeerId,
1098    /// Transaction hashes that were requested, for cleanup purposes, since peer may only return a
1099    /// subset of requested hashes.
1100    requested_hashes: RequestTxHashes,
1101    result: Result<RequestResult<PooledTransactions<T>>, RecvError>,
1102}
1103
1104/// Stores the response receiver made by sending a [`GetPooledTransactions`] request to a peer's
1105/// session.
1106#[must_use = "futures do nothing unless polled"]
1107#[pin_project::pin_project]
1108#[derive(Debug)]
1109pub struct GetPooledTxRequestFut<T = PooledTransaction> {
1110    #[pin]
1111    inner: Option<GetPooledTxRequest<T>>,
1112}
1113
1114impl<T> GetPooledTxRequestFut<T> {
1115    #[inline]
1116    const fn new(
1117        peer_id: PeerId,
1118        requested_hashes: RequestTxHashes,
1119        response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1120    ) -> Self {
1121        Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) }
1122    }
1123}
1124
1125impl<T> Future for GetPooledTxRequestFut<T> {
1126    type Output = GetPooledTxResponse<T>;
1127
1128    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1129        let mut req = self.as_mut().project().inner.take().expect("polled after completion");
1130        match req.response.poll_unpin(cx) {
1131            Poll::Ready(result) => Poll::Ready(GetPooledTxResponse {
1132                peer_id: req.peer_id,
1133                requested_hashes: req.requested_hashes,
1134                result,
1135            }),
1136            Poll::Pending => {
1137                self.project().inner.set(Some(req));
1138                Poll::Pending
1139            }
1140        }
1141    }
1142}
1143
1144/// Wrapper of unverified [`PooledTransactions`].
1145#[derive(Debug, Constructor, Deref)]
1146pub struct UnverifiedPooledTransactions<T> {
1147    txns: PooledTransactions<T>,
1148}
1149
1150/// [`PooledTransactions`] that have been successfully verified.
1151#[derive(Debug, Constructor, Deref)]
1152pub struct VerifiedPooledTransactions<T> {
1153    txns: PooledTransactions<T>,
1154}
1155
1156impl<T: SignedTransaction> DedupPayload for VerifiedPooledTransactions<T> {
1157    type Value = T;
1158
1159    fn is_empty(&self) -> bool {
1160        self.txns.is_empty()
1161    }
1162
1163    fn len(&self) -> usize {
1164        self.txns.len()
1165    }
1166
1167    fn dedup(self) -> PartiallyValidData<Self::Value> {
1168        PartiallyValidData::from_raw_data(
1169            self.txns.into_iter().map(|tx| (*tx.tx_hash(), tx)).collect(),
1170            None,
1171        )
1172    }
1173}
1174
1175trait VerifyPooledTransactionsResponse {
1176    type Transaction: SignedTransaction;
1177
1178    fn verify(
1179        self,
1180        requested_hashes: &RequestTxHashes,
1181        peer_id: &PeerId,
1182    ) -> (VerificationOutcome, VerifiedPooledTransactions<Self::Transaction>);
1183}
1184
1185impl<T: SignedTransaction> VerifyPooledTransactionsResponse for UnverifiedPooledTransactions<T> {
1186    type Transaction = T;
1187
1188    fn verify(
1189        self,
1190        requested_hashes: &RequestTxHashes,
1191        _peer_id: &PeerId,
1192    ) -> (VerificationOutcome, VerifiedPooledTransactions<T>) {
1193        let mut verification_outcome = VerificationOutcome::Ok;
1194
1195        let Self { mut txns } = self;
1196
1197        #[cfg(debug_assertions)]
1198        let mut tx_hashes_not_requested: smallvec::SmallVec<[TxHash; 16]> = smallvec::smallvec!();
1199        #[cfg(not(debug_assertions))]
1200        let mut tx_hashes_not_requested_count = 0;
1201
1202        txns.0.retain(|tx| {
1203            if !requested_hashes.contains(tx.tx_hash()) {
1204                verification_outcome = VerificationOutcome::ReportPeer;
1205
1206                #[cfg(debug_assertions)]
1207                tx_hashes_not_requested.push(*tx.tx_hash());
1208                #[cfg(not(debug_assertions))]
1209                {
1210                    tx_hashes_not_requested_count += 1;
1211                }
1212
1213                return false
1214            }
1215            true
1216        });
1217
1218        #[cfg(debug_assertions)]
1219        if !tx_hashes_not_requested.is_empty() {
1220            trace!(target: "net::tx",
1221                peer_id=format!("{_peer_id:#}"),
1222                ?tx_hashes_not_requested,
1223                "transactions in `PooledTransactions` response from peer were not requested"
1224            );
1225        }
1226        #[cfg(not(debug_assertions))]
1227        if tx_hashes_not_requested_count != 0 {
1228            trace!(target: "net::tx",
1229                peer_id=format!("{_peer_id:#}"),
1230                tx_hashes_not_requested_count,
1231                "transactions in `PooledTransactions` response from peer were not requested"
1232            );
1233        }
1234
1235        (verification_outcome, VerifiedPooledTransactions::new(txns))
1236    }
1237}
1238
1239/// Outcome from verifying a [`PooledTransactions`] response. Signals to caller whether to penalize
1240/// the sender of the response or not.
1241#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1242pub enum VerificationOutcome {
1243    /// Peer behaves appropriately.
1244    Ok,
1245    /// A penalty should be flagged for the peer. Peer sent a response with unacceptably
1246    /// invalid entries.
1247    ReportPeer,
1248}
1249
1250/// Tracks stats about the [`TransactionFetcher`].
1251#[derive(Debug, Constructor)]
1252pub struct TransactionFetcherInfo {
1253    /// Max inflight [`GetPooledTransactions`] requests.
1254    pub max_inflight_requests: usize,
1255    /// Max inflight [`GetPooledTransactions`] requests per peer.
1256    pub max_inflight_requests_per_peer: u8,
1257    /// Soft limit for the byte size of the expected [`PooledTransactions`] response, upon packing
1258    /// a [`GetPooledTransactions`] request with hashes (by default less than 2 MiB worth of
1259    /// transactions is requested).
1260    pub soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
1261    /// Soft limit for the byte size of a [`PooledTransactions`] response, upon assembling the
1262    /// response. Spec'd at 2 MiB, but can be adjusted for research purpose.
1263    pub soft_limit_byte_size_pooled_transactions_response: usize,
1264    /// Max capacity of the cache of transaction hashes, for transactions that weren't yet fetched.
1265    /// A transaction is pending fetch if its hash didn't fit into a [`GetPooledTransactions`] yet,
1266    /// or it wasn't returned upon request to peers.
1267    pub max_capacity_cache_txns_pending_fetch: u32,
1268}
1269
1270impl Default for TransactionFetcherInfo {
1271    fn default() -> Self {
1272        Self::new(
1273            DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS as usize,
1274            DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
1275            DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
1276            SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
1277            DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
1278        )
1279    }
1280}
1281
1282impl From<TransactionFetcherConfig> for TransactionFetcherInfo {
1283    fn from(config: TransactionFetcherConfig) -> Self {
1284        let TransactionFetcherConfig {
1285            max_inflight_requests,
1286            max_inflight_requests_per_peer,
1287            soft_limit_byte_size_pooled_transactions_response,
1288            soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1289            max_capacity_cache_txns_pending_fetch,
1290        } = config;
1291
1292        Self::new(
1293            max_inflight_requests as usize,
1294            max_inflight_requests_per_peer,
1295            soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1296            soft_limit_byte_size_pooled_transactions_response,
1297            max_capacity_cache_txns_pending_fetch,
1298        )
1299    }
1300}
1301
1302#[derive(Debug, Default)]
1303struct TxFetcherSearchDurations {
1304    find_idle_peer: Duration,
1305    fill_request: Duration,
1306}
1307
1308#[cfg(test)]
1309mod test {
1310    use super::*;
1311    use crate::test_utils::transactions::{buffer_hash_to_tx_fetcher, new_mock_session};
1312    use alloy_primitives::{
1313        hex,
1314        map::{B256Map, B256Set, HashMap},
1315        B256,
1316    };
1317    use alloy_rlp::Decodable;
1318    use derive_more::IntoIterator;
1319    use reth_eth_wire_types::EthVersion;
1320    use reth_ethereum_primitives::TransactionSigned;
1321    use std::str::FromStr;
1322
1323    #[derive(IntoIterator)]
1324    struct TestValidAnnouncementData(Vec<(TxHash, Option<(u8, usize)>)>);
1325
1326    impl HandleMempoolData for TestValidAnnouncementData {
1327        fn is_empty(&self) -> bool {
1328            self.0.is_empty()
1329        }
1330
1331        fn len(&self) -> usize {
1332            self.0.len()
1333        }
1334
1335        fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
1336            self.0.retain(|(hash, _)| f(hash))
1337        }
1338    }
1339
1340    impl HandleVersionedMempoolData for TestValidAnnouncementData {
1341        fn msg_version(&self) -> EthVersion {
1342            EthVersion::Eth68
1343        }
1344    }
1345
1346    #[test]
1347    fn pack_eth68_request() {
1348        reth_tracing::init_test_tracing();
1349
1350        // RIG TEST
1351
1352        let tx_fetcher = &mut TransactionFetcher::<EthNetworkPrimitives>::default();
1353
1354        let eth68_hashes = [
1355            B256::from_slice(&[1; 32]),
1356            B256::from_slice(&[2; 32]),
1357            B256::from_slice(&[3; 32]),
1358            B256::from_slice(&[4; 32]),
1359            B256::from_slice(&[5; 32]),
1360        ];
1361        let eth68_sizes = [
1362            DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ - MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED - 1, // first will fit
1363            DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ, // second won't
1364            2, // free space > `MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED`, third will fit, no more after this
1365            9,
1366            0,
1367        ];
1368
1369        let expected_request_hashes =
1370            [eth68_hashes[0], eth68_hashes[2]].into_iter().collect::<B256Set>();
1371
1372        let expected_surplus_hashes =
1373            [eth68_hashes[1], eth68_hashes[3], eth68_hashes[4]].into_iter().collect::<B256Set>();
1374
1375        let mut eth68_hashes_to_request = RequestTxHashes::with_capacity(3);
1376
1377        let valid_announcement_data = TestValidAnnouncementData(
1378            eth68_hashes
1379                .into_iter()
1380                .zip(eth68_sizes)
1381                .map(|(hash, size)| (hash, Some((0u8, size))))
1382                .collect::<Vec<_>>(),
1383        );
1384
1385        // TEST
1386
1387        let surplus_eth68_hashes =
1388            tx_fetcher.pack_request_eth68(&mut eth68_hashes_to_request, valid_announcement_data);
1389
1390        let eth68_hashes_to_request = eth68_hashes_to_request.into_iter().collect::<B256Set>();
1391        let surplus_eth68_hashes = surplus_eth68_hashes.into_iter().collect::<B256Set>();
1392
1393        assert_eq!(expected_request_hashes, eth68_hashes_to_request);
1394        assert_eq!(expected_surplus_hashes, surplus_eth68_hashes);
1395    }
1396
1397    #[test]
1398    fn pack_eth68_request_does_not_overflow_announced_size() {
1399        reth_tracing::init_test_tracing();
1400
1401        let tx_fetcher = &mut TransactionFetcher::<EthNetworkPrimitives>::default();
1402
1403        let eth68_hashes =
1404            [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32]), B256::from_slice(&[3; 32])];
1405        let eth68_sizes = [
1406            DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ - MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED - 1,
1407            usize::MAX,
1408            2,
1409        ];
1410
1411        let expected_request_hashes =
1412            [eth68_hashes[0], eth68_hashes[2]].into_iter().collect::<B256Set>();
1413        let expected_surplus_hashes = std::iter::once(eth68_hashes[1]).collect::<B256Set>();
1414
1415        let mut eth68_hashes_to_request = RequestTxHashes::with_capacity(3);
1416        let valid_announcement_data = TestValidAnnouncementData(
1417            eth68_hashes
1418                .into_iter()
1419                .zip(eth68_sizes)
1420                .map(|(hash, size)| (hash, Some((0u8, size))))
1421                .collect::<Vec<_>>(),
1422        );
1423
1424        let surplus_eth68_hashes =
1425            tx_fetcher.pack_request_eth68(&mut eth68_hashes_to_request, valid_announcement_data);
1426
1427        let eth68_hashes_to_request = eth68_hashes_to_request.into_iter().collect::<B256Set>();
1428        let surplus_eth68_hashes = surplus_eth68_hashes.into_iter().collect::<B256Set>();
1429
1430        assert_eq!(expected_request_hashes, eth68_hashes_to_request);
1431        assert_eq!(expected_surplus_hashes, surplus_eth68_hashes);
1432    }
1433
1434    #[test]
1435    fn pack_eth72_request_uses_metadata_size_limit() {
1436        reth_tracing::init_test_tracing();
1437
1438        let tx_fetcher = &mut TransactionFetcher::<EthNetworkPrimitives>::default();
1439
1440        let hashes =
1441            [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32]), B256::from_slice(&[3; 32])];
1442        let announced_size =
1443            DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ;
1444        let announcement_data = hashes
1445            .into_iter()
1446            .map(|hash| (hash, Some((0u8, announced_size))))
1447            .collect::<B256Map<_>>();
1448        let valid_announcement_data = ValidAnnouncementData::from_partially_valid_data(
1449            PartiallyValidData::from_raw_data_eth72(announcement_data),
1450        );
1451
1452        let mut hashes_to_request = RequestTxHashes::with_capacity(3);
1453        let surplus_hashes =
1454            tx_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
1455
1456        assert_eq!(1, hashes_to_request.len());
1457        assert_eq!(2, surplus_hashes.len());
1458    }
1459
1460    #[tokio::test]
1461    async fn test_on_fetch_pending_hashes() {
1462        reth_tracing::init_test_tracing();
1463
1464        let tx_fetcher = &mut TransactionFetcher::default();
1465
1466        // RIG TEST
1467
1468        // hashes that will be fetched because they are stored as pending fetch
1469        let seen_hashes = [
1470            B256::from_slice(&[1; 32]),
1471            B256::from_slice(&[2; 32]),
1472            B256::from_slice(&[3; 32]),
1473            B256::from_slice(&[4; 32]),
1474        ];
1475        //
1476        // txns 1-3 are small, all will fit in request. no metadata has been made available for
1477        // hash 4, it has only been seen over eth66 conn, so average tx size will be assumed in
1478        // filling request.
1479        let seen_eth68_hashes_sizes = [120, 158, 116];
1480
1481        // peer that will fetch seen hashes because they are pending fetch
1482        let peer_1 = PeerId::new([1; 64]);
1483        // second peer, won't do anything in this test
1484        let peer_2 = PeerId::new([2; 64]);
1485
1486        // add seen hashes to peers seen transactions
1487        //
1488        // get handle for peer_1's session to receive request for pending hashes
1489        let (mut peer_1_data, mut peer_1_mock_session_rx) =
1490            new_mock_session(peer_1, EthVersion::Eth66);
1491        for hash in &seen_hashes {
1492            peer_1_data.seen_transactions.insert(*hash);
1493        }
1494        let (mut peer_2_data, _) = new_mock_session(peer_2, EthVersion::Eth66);
1495        for hash in &seen_hashes {
1496            peer_2_data.seen_transactions.insert(*hash);
1497        }
1498        let mut peers = HashMap::default();
1499        peers.insert(peer_1, peer_1_data);
1500        peers.insert(peer_2, peer_2_data);
1501
1502        // insert seen_hashes into tx fetcher
1503        for i in 0..3 {
1504            // insert peer_2 as fallback peer for seen_hashes
1505            buffer_hash_to_tx_fetcher(
1506                tx_fetcher,
1507                seen_hashes[i],
1508                peer_2,
1509                0,
1510                Some(seen_eth68_hashes_sizes[i]),
1511            );
1512        }
1513        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[3], peer_2, 0, None);
1514
1515        // insert pending hash without peer_1 as fallback peer, only with peer_2 as fallback peer
1516        let hash_other = B256::from_slice(&[5; 32]);
1517        buffer_hash_to_tx_fetcher(tx_fetcher, hash_other, peer_2, 0, None);
1518
1519        // add peer_1 as lru fallback peer for seen hashes
1520        for hash in &seen_hashes {
1521            buffer_hash_to_tx_fetcher(tx_fetcher, *hash, peer_1, 0, None);
1522        }
1523
1524        // seen hashes and the random hash from peer_2 are pending fetch
1525        assert_eq!(tx_fetcher.num_pending_hashes(), 5);
1526
1527        // TEST
1528
1529        tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
1530
1531        // mock session of peer_1 receives request
1532        let req = peer_1_mock_session_rx
1533            .recv()
1534            .await
1535            .expect("peer session should receive request with buffered hashes");
1536        let PeerRequest::GetPooledTransactions { request, .. } = req else { unreachable!() };
1537        let GetPooledTransactions(requested_hashes) = request;
1538
1539        assert_eq!(
1540            requested_hashes.into_iter().collect::<B256Set>(),
1541            seen_hashes.into_iter().collect::<B256Set>()
1542        )
1543    }
1544
1545    #[test]
1546    fn on_fetch_pending_hashes_rebuffers_on_disconnected_peer() {
1547        let tx_fetcher = &mut TransactionFetcher::default();
1548        let peer_1 = PeerId::new([1; 64]);
1549        let peer_2 = PeerId::new([2; 64]);
1550        let hash_1 = B256::from_slice(&[1; 32]);
1551
1552        buffer_hash_to_tx_fetcher(tx_fetcher, hash_1, peer_1, 0, Some(128));
1553        buffer_hash_to_tx_fetcher(tx_fetcher, hash_1, peer_2, 0, Some(128));
1554
1555        assert_eq!(tx_fetcher.num_pending_hashes(), 1);
1556
1557        // pass empty peers map — both peers are "disconnected"
1558        let peers = HashMap::new();
1559        tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
1560
1561        // hash should be re-buffered, not lost
1562        assert_eq!(tx_fetcher.num_pending_hashes(), 1);
1563    }
1564
1565    #[test]
1566    fn verify_response_hashes() {
1567        let input = hex!(
1568            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598daa"
1569        );
1570        let signed_tx_1: PooledTransaction =
1571            TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1572        let input = hex!(
1573            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
1574        );
1575        let signed_tx_2: PooledTransaction =
1576            TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1577
1578        // only tx 1 is requested
1579        let request_hashes = [
1580            B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e67890")
1581                .unwrap(),
1582            *signed_tx_1.hash(),
1583            B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e12345")
1584                .unwrap(),
1585            B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4edabe3")
1586                .unwrap(),
1587        ];
1588
1589        for hash in &request_hashes {
1590            assert_ne!(hash, signed_tx_2.hash())
1591        }
1592
1593        let request_hashes = RequestTxHashes::new(request_hashes.into_iter().collect());
1594
1595        // but response contains tx 1 + another tx
1596        let response_txns = PooledTransactions(vec![signed_tx_1.clone(), signed_tx_2]);
1597        let payload = UnverifiedPooledTransactions::new(response_txns);
1598
1599        let (outcome, verified_payload) = payload.verify(&request_hashes, &PeerId::ZERO);
1600
1601        assert_eq!(VerificationOutcome::ReportPeer, outcome);
1602        assert_eq!(1, verified_payload.len());
1603        assert!(verified_payload.contains(&signed_tx_1));
1604    }
1605}