Skip to main content

reth_network/transactions/
fetcher.rs

1//! `TransactionFetcher` is responsible for rate limiting and retry logic for fetching
2//! transactions. Upon receiving an announcement, functionality of the `TransactionFetcher` is
3//! used for filtering out hashes 1) for which the tx is already known and 2) unknown but the hash
4//! is already seen in a previous announcement. The hashes that remain from an announcement are
5//! then packed into a request with respect to the [`EthVersion`] of the announcement. Any hashes
6//! that don't fit into the request, are buffered in the `TransactionFetcher`. If on the other
7//! hand, space remains, hashes that the peer has previously announced are taken out of buffered
8//! hashes to fill the request up. The [`GetPooledTransactions`] request is then sent to the
9//! peer's session, this marks the peer as active with respect to
10//! `MAX_CONCURRENT_TX_REQUESTS_PER_PEER`.
11//!
12//! When a peer buffers hashes in the `TransactionsManager::on_new_pooled_transaction_hashes`
13//! pipeline, it is stored as fallback peer for those hashes. When [`TransactionsManager`] is
14//! polled, it checks if any of fallback peer is idle. If so, it packs a request for that peer,
15//! filling it from the buffered hashes. It does so until there are no more idle peers or until
16//! the hashes buffer is empty.
17//!
18//! If a [`GetPooledTransactions`] request resolves with an error, the hashes in the request are
19//! buffered with respect to `MAX_REQUEST_RETRIES_PER_TX_HASH`. So is the case if the request
20//! resolves with partial success, that is some of the requested hashes are not in the response,
21//! these are then buffered.
22//!
23//! Most healthy peers will send the same hashes in their announcements, as RLPx is a gossip
24//! protocol. This means it's unlikely, that a valid hash, will be buffered for very long
25//! before it's re-tried. Nonetheless, the capacity of the buffered hashes cache must be large
26//! enough to buffer many hashes during network failure, to allow for recovery.
27
28use super::{
29    config::TransactionFetcherConfig,
30    constants::{tx_fetcher::*, SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST},
31    PeerMetadata, PooledTransactions, SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
32};
33use crate::{
34    cache::{LruCache, LruMap},
35    duration_metered_exec,
36    metrics::TransactionFetcherMetrics,
37};
38use alloy_consensus::transaction::PooledTransaction;
39use alloy_primitives::TxHash;
40use derive_more::{Constructor, Deref};
41use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
42use pin_project::pin_project;
43use reth_eth_wire::{
44    DedupPayload, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
45    PartiallyValidData, RequestTxHashes, ValidAnnouncementData,
46};
47use reth_eth_wire_types::{EthNetworkPrimitives, NetworkPrimitives};
48use reth_network_api::PeerRequest;
49use reth_network_p2p::error::{RequestError, RequestResult};
50use reth_network_peers::PeerId;
51use reth_primitives_traits::SignedTransaction;
52use schnellru::ByLength;
53use std::{
54    collections::HashMap,
55    pin::Pin,
56    task::{ready, Context, Poll},
57    time::Duration,
58};
59use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
60use tracing::trace;
61
62/// The type responsible for fetching missing transactions from peers.
63///
64/// This will keep track of unique transaction hashes that are currently being fetched and submits
65/// new requests on announced hashes.
66#[derive(Debug)]
67#[pin_project]
68pub struct TransactionFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
69    /// All peers with to which a [`GetPooledTransactions`] request is inflight.
70    pub active_peers: LruMap<PeerId, u8, ByLength>,
71    /// All currently active [`GetPooledTransactions`] requests.
72    ///
73    /// The set of hashes encompassed by these requests are a subset of all hashes in the fetcher.
74    /// It's disjoint from the set of hashes which are awaiting an idle fallback peer in order to
75    /// be fetched.
76    #[pin]
77    pub inflight_requests: FuturesUnordered<GetPooledTxRequestFut<N::PooledTransaction>>,
78    /// Hashes that are awaiting an idle fallback peer so they can be fetched.
79    ///
80    /// This is a subset of all hashes in the fetcher, and is disjoint from the set of hashes for
81    /// which a [`GetPooledTransactions`] request is inflight.
82    pub hashes_pending_fetch: LruCache<TxHash>,
83    /// Tracks all hashes in the transaction fetcher.
84    pub hashes_fetch_inflight_and_pending_fetch: LruMap<TxHash, TxFetchMetadata, ByLength>,
85    /// Info on capacity of the transaction fetcher.
86    pub info: TransactionFetcherInfo,
87    #[doc(hidden)]
88    metrics: TransactionFetcherMetrics,
89}
90
91impl<N: NetworkPrimitives> TransactionFetcher<N> {
92    /// Removes the peer from the active set.
93    pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) {
94        self.active_peers.remove(peer_id);
95    }
96
97    /// Updates metrics.
98    #[inline]
99    pub fn update_metrics(&self) {
100        let metrics = &self.metrics;
101
102        metrics.inflight_transaction_requests.set(self.inflight_requests.len() as f64);
103
104        let hashes_pending_fetch = self.hashes_pending_fetch.len() as f64;
105        let total_hashes = self.hashes_fetch_inflight_and_pending_fetch.len() as f64;
106
107        metrics.hashes_pending_fetch.set(hashes_pending_fetch);
108        metrics.hashes_inflight_transaction_requests.set(total_hashes - hashes_pending_fetch);
109    }
110
111    #[inline]
112    fn update_pending_fetch_cache_search_metrics(&self, durations: TxFetcherSearchDurations) {
113        let metrics = &self.metrics;
114
115        let TxFetcherSearchDurations { find_idle_peer, fill_request } = durations;
116        metrics
117            .duration_find_idle_fallback_peer_for_any_pending_hash
118            .set(find_idle_peer.as_secs_f64());
119        metrics.duration_fill_request_from_hashes_pending_fetch.set(fill_request.as_secs_f64());
120    }
121
122    /// Sets up transaction fetcher with config
123    pub fn with_transaction_fetcher_config(config: &TransactionFetcherConfig) -> Self {
124        let TransactionFetcherConfig {
125            max_inflight_requests,
126            max_capacity_cache_txns_pending_fetch,
127            ..
128        } = *config;
129
130        let info = config.clone().into();
131
132        let metrics = TransactionFetcherMetrics::default();
133        metrics.capacity_inflight_requests.increment(max_inflight_requests as u64);
134
135        Self {
136            active_peers: LruMap::new(max_inflight_requests),
137            hashes_pending_fetch: LruCache::new(max_capacity_cache_txns_pending_fetch),
138            hashes_fetch_inflight_and_pending_fetch: LruMap::new(
139                max_inflight_requests + max_capacity_cache_txns_pending_fetch,
140            ),
141            info,
142            metrics,
143            ..Default::default()
144        }
145    }
146
147    /// Removes the specified hashes from inflight tracking.
148    #[inline]
149    pub fn remove_hashes_from_transaction_fetcher<'a, I>(&mut self, hashes: I)
150    where
151        I: IntoIterator<Item = &'a TxHash>,
152    {
153        for hash in hashes {
154            self.hashes_fetch_inflight_and_pending_fetch.remove(hash);
155            self.hashes_pending_fetch.remove(hash);
156        }
157    }
158
159    /// Updates peer's activity status upon a resolved [`GetPooledTxRequest`].
160    fn decrement_inflight_request_count_for(&mut self, peer_id: &PeerId) {
161        let remove = || -> bool {
162            if let Some(inflight_count) = self.active_peers.get(peer_id) {
163                *inflight_count = inflight_count.saturating_sub(1);
164                if *inflight_count == 0 {
165                    return true
166                }
167            }
168            false
169        }();
170
171        if remove {
172            self.active_peers.remove(peer_id);
173        }
174    }
175
176    /// Returns `true` if peer is idle with respect to `self.inflight_requests`.
177    #[inline]
178    pub fn is_idle(&self, peer_id: &PeerId) -> bool {
179        let Some(inflight_count) = self.active_peers.peek(peer_id) else { return true };
180        if *inflight_count < self.info.max_inflight_requests_per_peer {
181            return true
182        }
183        false
184    }
185
186    /// Returns any idle peer for the given hash.
187    pub fn get_idle_peer_for(&self, hash: TxHash) -> Option<&PeerId> {
188        let TxFetchMetadata { fallback_peers, .. } =
189            self.hashes_fetch_inflight_and_pending_fetch.peek(&hash)?;
190
191        fallback_peers.iter().find(|peer_id| self.is_idle(peer_id))
192    }
193
194    /// Returns any idle peer for any hash pending fetch. If one is found, the corresponding
195    /// hash is written to the request buffer that is passed as parameter.
196    ///
197    /// Loops through the hashes pending fetch in lru order until one is found with an idle
198    /// fallback peer, or the budget passed as parameter is depleted, whatever happens first.
199    pub fn find_any_idle_fallback_peer_for_any_pending_hash(
200        &mut self,
201        hashes_to_request: &mut RequestTxHashes,
202        mut budget: Option<usize>, // search fallback peers for max `budget` lru pending hashes
203    ) -> Option<PeerId> {
204        let mut hashes_pending_fetch_iter = self.hashes_pending_fetch.iter();
205
206        let idle_peer = loop {
207            let &hash = hashes_pending_fetch_iter.next()?;
208
209            let idle_peer = self.get_idle_peer_for(hash);
210
211            if idle_peer.is_some() {
212                hashes_to_request.insert(hash);
213                break idle_peer.copied()
214            }
215
216            if let Some(ref mut bud) = budget {
217                *bud = bud.saturating_sub(1);
218                if *bud == 0 {
219                    return None
220                }
221            }
222        };
223        let hash = hashes_to_request.iter().next()?;
224
225        // pop hash that is loaded in request buffer from cache of hashes pending fetch
226        drop(hashes_pending_fetch_iter);
227        _ = self.hashes_pending_fetch.remove(hash);
228
229        idle_peer
230    }
231
232    /// Packages hashes for a [`GetPooledTxRequest`] up to limit. Returns left over hashes. Takes
233    /// a [`RequestTxHashes`] buffer as parameter for filling with hashes to request.
234    ///
235    /// Returns left over hashes.
236    pub fn pack_request(
237        &self,
238        hashes_to_request: &mut RequestTxHashes,
239        hashes_from_announcement: ValidAnnouncementData,
240    ) -> RequestTxHashes {
241        if hashes_from_announcement.msg_version().is_eth68() {
242            return self.pack_request_eth68(hashes_to_request, hashes_from_announcement)
243        }
244        self.pack_request_eth66(hashes_to_request, hashes_from_announcement)
245    }
246
247    /// Packages hashes for a [`GetPooledTxRequest`] from an
248    /// [`Eth68`](reth_eth_wire::EthVersion::Eth68) announcement up to limit as defined by protocol
249    /// version 68. Takes a [`RequestTxHashes`] buffer as parameter for filling with hashes to
250    /// request.
251    ///
252    /// Returns left over hashes.
253    ///
254    /// Loops through hashes passed as parameter and checks if a hash fits in the expected
255    /// response. If no, it's added to surplus hashes. If yes, it's added to hashes to the request
256    /// and expected response size is accumulated.
257    pub fn pack_request_eth68(
258        &self,
259        hashes_to_request: &mut RequestTxHashes,
260        hashes_from_announcement: impl HandleMempoolData
261            + IntoIterator<Item = (TxHash, Option<(u8, usize)>)>,
262    ) -> RequestTxHashes {
263        let mut acc_size_response = 0;
264
265        let mut hashes_from_announcement_iter = hashes_from_announcement.into_iter();
266
267        if let Some((hash, Some((_ty, size)))) = hashes_from_announcement_iter.next() {
268            hashes_to_request.insert(hash);
269
270            // tx is really big, pack request with single tx
271            if size >= self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request {
272                return hashes_from_announcement_iter.collect()
273            }
274            acc_size_response = size;
275        }
276
277        let mut surplus_hashes = RequestTxHashes::default();
278
279        // folds size based on expected response size  and adds selected hashes to the request
280        // list and the other hashes to the surplus list
281        for (hash, metadata) in hashes_from_announcement_iter.by_ref() {
282            let Some((_ty, size)) = metadata else {
283                unreachable!("this method is called upon reception of an eth68 announcement")
284            };
285
286            let next_acc_size = acc_size_response.checked_add(size).filter(|next_acc_size| {
287                *next_acc_size <=
288                    self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request
289            });
290
291            if let Some(next_acc_size) = next_acc_size {
292                // only update accumulated size of tx response if tx will fit in without exceeding
293                // soft limit
294                acc_size_response = next_acc_size;
295                _ = hashes_to_request.insert(hash)
296            } else {
297                _ = surplus_hashes.insert(hash)
298            }
299
300            let free_space =
301                self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request -
302                    acc_size_response;
303
304            if free_space < MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED {
305                break
306            }
307        }
308
309        surplus_hashes.extend(hashes_from_announcement_iter.map(|(hash, _metadata)| hash));
310
311        surplus_hashes
312    }
313
314    /// Packages hashes for a [`GetPooledTxRequest`] from an
315    /// [`Eth66`](reth_eth_wire::EthVersion::Eth66) announcement up to limit as defined by
316    /// protocol version 66. Takes a [`RequestTxHashes`] buffer as parameter for filling with
317    /// hashes to request.
318    ///
319    /// Returns left over hashes.
320    pub fn pack_request_eth66(
321        &self,
322        hashes_to_request: &mut RequestTxHashes,
323        hashes_from_announcement: ValidAnnouncementData,
324    ) -> RequestTxHashes {
325        let (mut hashes, _version) = hashes_from_announcement.into_request_hashes();
326        if hashes.len() <= SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST {
327            *hashes_to_request = hashes;
328            hashes_to_request.shrink_to_fit();
329
330            RequestTxHashes::default()
331        } else {
332            let surplus_hashes =
333                hashes.retain_count(SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST);
334            *hashes_to_request = hashes;
335            hashes_to_request.shrink_to_fit();
336
337            surplus_hashes
338        }
339    }
340
341    /// Tries to buffer hashes for retry.
342    pub fn try_buffer_hashes_for_retry(
343        &mut self,
344        mut hashes: RequestTxHashes,
345        peer_failed_to_serve: &PeerId,
346    ) {
347        // It could be that the txns have been received over broadcast in the time being. Remove
348        // the peer as fallback peer so it isn't request again for these hashes.
349        hashes.retain(|hash| {
350            if let Some(entry) = self.hashes_fetch_inflight_and_pending_fetch.get(hash) {
351                entry.fallback_peers_mut().remove(peer_failed_to_serve);
352                return true
353            }
354            // tx has been seen over broadcast in the time it took for the request to resolve
355            false
356        });
357
358        self.buffer_hashes(hashes, None)
359    }
360
361    /// Number of hashes pending fetch.
362    pub fn num_pending_hashes(&self) -> usize {
363        self.hashes_pending_fetch.len()
364    }
365
366    /// Number of all transaction hashes in the fetcher.
367    pub fn num_all_hashes(&self) -> usize {
368        self.hashes_fetch_inflight_and_pending_fetch.len()
369    }
370
371    /// Buffers hashes. Note: Only peers that haven't yet tried to request the hashes should be
372    /// passed as `fallback_peer` parameter! For re-buffering hashes on failed request, use
373    /// [`TransactionFetcher::try_buffer_hashes_for_retry`]. Hashes that have been re-requested
374    /// [`DEFAULT_MAX_RETRIES`], are dropped.
375    pub fn buffer_hashes(&mut self, hashes: RequestTxHashes, fallback_peer: Option<PeerId>) {
376        for hash in hashes {
377            // hash could have been evicted from bounded lru map
378            if self.hashes_fetch_inflight_and_pending_fetch.peek(&hash).is_none() {
379                continue
380            }
381
382            let Some(TxFetchMetadata { retries, fallback_peers, .. }) =
383                self.hashes_fetch_inflight_and_pending_fetch.get(&hash)
384            else {
385                continue
386            };
387
388            if let Some(peer_id) = fallback_peer {
389                // peer has not yet requested hash
390                fallback_peers.insert(peer_id);
391            } else {
392                if *retries >= DEFAULT_MAX_RETRIES {
393                    trace!(target: "net::tx",
394                        %hash,
395                        retries,
396                        "retry limit for `GetPooledTransactions` requests reached for hash, dropping hash"
397                    );
398
399                    self.hashes_fetch_inflight_and_pending_fetch.remove(&hash);
400                    self.hashes_pending_fetch.remove(&hash);
401                    continue
402                }
403                *retries += 1;
404            }
405
406            if let (_, Some(evicted_hash)) = self.hashes_pending_fetch.insert_and_get_evicted(hash)
407            {
408                self.hashes_fetch_inflight_and_pending_fetch.remove(&evicted_hash);
409            }
410        }
411    }
412
413    /// Tries to request hashes pending fetch.
414    ///
415    /// Finds the first buffered hash with a fallback peer that is idle, if any. Fills the rest of
416    /// the request by checking the transactions seen by the peer against the buffer.
417    pub fn on_fetch_pending_hashes(
418        &mut self,
419        peers: &HashMap<PeerId, PeerMetadata<N>>,
420        has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
421    ) -> bool {
422        let mut hashes_to_request = RequestTxHashes::with_capacity(
423            DEFAULT_MARGINAL_COUNT_HASHES_GET_POOLED_TRANSACTIONS_REQUEST,
424        );
425        let mut search_durations = TxFetcherSearchDurations::default();
426
427        // budget to look for an idle peer before giving up
428        let budget_find_idle_fallback_peer = self
429            .search_breadth_budget_find_idle_fallback_peer(&has_capacity_wrt_pending_pool_imports);
430
431        let peer_id = duration_metered_exec!(
432            {
433                let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash(
434                    &mut hashes_to_request,
435                    budget_find_idle_fallback_peer,
436                ) else {
437                    // no peers are idle or budget is depleted
438                    return false
439                };
440
441                peer_id
442            },
443            search_durations.find_idle_peer
444        );
445
446        // peer may have disconnected between idle check and here, re-buffer hashes so they
447        // aren't lost from the pending fetch cache
448        let Some(peer) = peers.get(&peer_id) else {
449            self.buffer_hashes(hashes_to_request, None);
450            return false
451        };
452        let conn_eth_version = peer.version;
453
454        // fill the request with more hashes pending fetch that have been announced by the peer.
455        // the search for more hashes is done with respect to the given budget, which determines
456        // how many hashes to loop through before giving up. if no more hashes are found wrt to
457        // the budget, the single hash that was taken out of the cache above is sent in a request.
458        let budget_fill_request = self
459            .search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
460                &has_capacity_wrt_pending_pool_imports,
461            );
462
463        duration_metered_exec!(
464            {
465                self.fill_request_from_hashes_pending_fetch(
466                    &mut hashes_to_request,
467                    &peer.seen_transactions,
468                    budget_fill_request,
469                )
470            },
471            search_durations.fill_request
472        );
473
474        self.update_pending_fetch_cache_search_metrics(search_durations);
475
476        trace!(target: "net::tx",
477            peer_id=format!("{peer_id:#}"),
478            hashes=?*hashes_to_request,
479            %conn_eth_version,
480            "requesting hashes that were stored pending fetch from peer"
481        );
482
483        // request the buffered missing transactions
484        if let Some(failed_to_request_hashes) =
485            self.request_transactions_from_peer(hashes_to_request, peer)
486        {
487            trace!(target: "net::tx",
488                peer_id=format!("{peer_id:#}"),
489                ?failed_to_request_hashes,
490                %conn_eth_version,
491                "failed sending request to peer's session, buffering hashes"
492            );
493
494            self.buffer_hashes(failed_to_request_hashes, Some(peer_id));
495            return false
496        }
497
498        true
499    }
500
501    /// Filters out hashes that have been seen before. For hashes that have already been seen, the
502    /// peer is added as fallback peer.
503    pub fn filter_unseen_and_pending_hashes(
504        &mut self,
505        new_announced_hashes: &mut ValidAnnouncementData,
506        is_tx_bad_import: impl Fn(&TxHash) -> bool,
507        peer_id: &PeerId,
508        client_version: &str,
509    ) {
510        let mut previously_unseen_hashes_count = 0;
511
512        let msg_version = new_announced_hashes.msg_version();
513
514        // filter out inflight hashes, and register the peer as fallback for all inflight hashes
515        new_announced_hashes.retain(|hash, metadata| {
516
517            // occupied entry
518            if let Some(TxFetchMetadata{ tx_encoded_length: previously_seen_size, ..}) = self.hashes_fetch_inflight_and_pending_fetch.peek_mut(hash) {
519                // update size metadata if available
520                if let Some((_ty, size)) = metadata {
521                    if let Some(prev_size) = previously_seen_size {
522                        // check if this peer is announcing a different size than a previous peer
523                        if size != prev_size {
524                            trace!(target: "net::tx",
525                                peer_id=format!("{peer_id:#}"),
526                                %hash,
527                                size,
528                                previously_seen_size,
529                                %client_version,
530                                "peer announced a different size for tx, this is especially worrying if one size is much bigger..."
531                            );
532                        }
533                    }
534                    // believe the most recent peer to announce tx
535                    *previously_seen_size = Some(*size);
536                }
537
538                // hash has been seen but is not inflight
539                if self.hashes_pending_fetch.remove(hash) {
540                    return true
541                }
542
543                return false
544            }
545
546            // vacant entry
547
548            if is_tx_bad_import(hash) {
549                return false
550            }
551
552            previously_unseen_hashes_count += 1;
553
554            if self.hashes_fetch_inflight_and_pending_fetch.get_or_insert(*hash, ||
555                TxFetchMetadata{retries: 0, fallback_peers: LruCache::new(DEFAULT_MAX_COUNT_FALLBACK_PEERS as u32), tx_encoded_length: None}
556            ).is_none() {
557
558                trace!(target: "net::tx",
559                    peer_id=format!("{peer_id:#}"),
560                    %hash,
561                    ?msg_version,
562                    %client_version,
563                    "failed to cache new announced hash from peer in schnellru::LruMap, dropping hash"
564                );
565
566                return false
567            }
568            true
569        });
570
571        trace!(target: "net::tx",
572            peer_id=format!("{peer_id:#}"),
573            previously_unseen_hashes_count=previously_unseen_hashes_count,
574            msg_version=?msg_version,
575            client_version=%client_version,
576            "received previously unseen hashes in announcement from peer"
577        );
578    }
579
580    /// Requests the missing transactions from the previously unseen announced hashes of the peer.
581    /// Returns the requested hashes if the request concurrency limit is reached or if the request
582    /// fails to send over the channel to the peer's session task.
583    ///
584    /// This filters all announced hashes that are already in flight, and requests the missing,
585    /// while marking the given peer as an alternative peer for the hashes that are already in
586    /// flight.
587    pub fn request_transactions_from_peer(
588        &mut self,
589        new_announced_hashes: RequestTxHashes,
590        peer: &PeerMetadata<N>,
591    ) -> Option<RequestTxHashes> {
592        let peer_id: PeerId = peer.request_tx.peer_id;
593        let conn_eth_version = peer.version;
594
595        if self.active_peers.len() >= self.info.max_inflight_requests {
596            trace!(target: "net::tx",
597                peer_id=format!("{peer_id:#}"),
598                hashes=?*new_announced_hashes,
599                %conn_eth_version,
600                max_inflight_transaction_requests=self.info.max_inflight_requests,
601                "limit for concurrent `GetPooledTransactions` requests reached, dropping request for hashes to peer"
602            );
603            return Some(new_announced_hashes)
604        }
605
606        let Some(inflight_count) = self.active_peers.get_or_insert(peer_id, || 0) else {
607            trace!(target: "net::tx",
608                peer_id=format!("{peer_id:#}"),
609                hashes=?*new_announced_hashes,
610                conn_eth_version=%conn_eth_version,
611                "failed to cache active peer in schnellru::LruMap, dropping request to peer"
612            );
613            return Some(new_announced_hashes)
614        };
615
616        if *inflight_count >= self.info.max_inflight_requests_per_peer {
617            trace!(target: "net::tx",
618                peer_id=format!("{peer_id:#}"),
619                hashes=?*new_announced_hashes,
620                %conn_eth_version,
621                max_concurrent_tx_reqs_per_peer=self.info.max_inflight_requests_per_peer,
622                "limit for concurrent `GetPooledTransactions` requests per peer reached"
623            );
624            return Some(new_announced_hashes)
625        }
626
627        #[cfg(debug_assertions)]
628        {
629            for hash in &new_announced_hashes {
630                if self.hashes_pending_fetch.contains(hash) {
631                    tracing::debug!(target: "net::tx", "`{}` should have been taken out of buffer before packing in a request, breaks invariant `@hashes_pending_fetch` and `@inflight_requests`, `@hashes_fetch_inflight_and_pending_fetch` for `{}`: {:?}",
632                        format!("{:?}", new_announced_hashes), // Assuming new_announced_hashes can be debug-printed directly
633                        format!("{:?}", new_announced_hashes),
634                        new_announced_hashes.iter().map(|hash| {
635                            let metadata = self.hashes_fetch_inflight_and_pending_fetch.get(hash);
636                            // Assuming you only need `retries` and `tx_encoded_length` for debugging
637                            (*hash, metadata.map(|m| (m.retries, m.tx_encoded_length)))
638                        }).collect::<Vec<(TxHash, Option<(u8, Option<usize>)>)>>())
639                }
640            }
641        }
642
643        let (response, rx) = oneshot::channel();
644        let req = PeerRequest::GetPooledTransactions {
645            request: GetPooledTransactions(new_announced_hashes.iter().copied().collect()),
646            response,
647        };
648
649        // try to send the request to the peer
650        if let Err(err) = peer.request_tx.try_send(req) {
651            // peer channel is full
652            return match err {
653                TrySendError::Full(_) | TrySendError::Closed(_) => {
654                    self.metrics.egress_peer_channel_full.increment(1);
655                    Some(new_announced_hashes)
656                }
657            }
658        }
659
660        *inflight_count += 1;
661        // stores a new request future for the request
662        self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, new_announced_hashes, rx));
663
664        None
665    }
666
667    /// Tries to fill request with hashes pending fetch so that the expected [`PooledTransactions`]
668    /// response is full enough. A mutable reference to a list of hashes to request is passed as
669    /// parameter. A budget is passed as parameter, this ensures that the node stops searching
670    /// for more hashes after the budget is depleted. Under bad network conditions, the cache of
671    /// hashes pending fetch may become very full for a while. As the node recovers, the hashes
672    /// pending fetch cache should get smaller. The budget should aim to be big enough to loop
673    /// through all buffered hashes in good network conditions.
674    ///
675    /// The request hashes buffer is filled as if it's an eth68 request, i.e. smartly assemble
676    /// the request based on expected response size. For any hash missing size metadata, it is
677    /// guessed at [`AVERAGE_BYTE_SIZE_TX_ENCODED`].
678    ///
679    /// Loops through hashes pending fetch and does:
680    ///
681    /// 1. Check if a hash pending fetch is seen by peer.
682    /// 2. Optimistically include the hash in the request.
683    /// 3. Accumulate expected total response size.
684    /// 4. Check if acc size and hashes count is at limit, if so stop looping.
685    /// 5. Remove hashes to request from cache of hashes pending fetch.
686    pub fn fill_request_from_hashes_pending_fetch(
687        &mut self,
688        hashes_to_request: &mut RequestTxHashes,
689        seen_hashes: &LruCache<TxHash>,
690        mut budget_fill_request: Option<usize>, // check max `budget` lru pending hashes
691    ) {
692        let Some(hash) = hashes_to_request.iter().next() else { return };
693
694        let mut acc_size_response = self
695            .hashes_fetch_inflight_and_pending_fetch
696            .get(hash)
697            .and_then(|entry| entry.tx_encoded_len())
698            .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
699
700        // if request full enough already, we're satisfied, send request for single tx
701        if acc_size_response >=
702            DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES
703        {
704            return
705        }
706
707        // try to fill request by checking if any other hashes pending fetch (in lru order) are
708        // also seen by peer
709        for hash in self.hashes_pending_fetch.iter() {
710            // 1. Check if a hash pending fetch is seen by peer.
711            if !seen_hashes.contains(hash) {
712                continue
713            };
714
715            // 2. Optimistically include the hash in the request.
716            hashes_to_request.insert(*hash);
717
718            // 3. Accumulate expected total response size.
719            let size = self
720                .hashes_fetch_inflight_and_pending_fetch
721                .get(hash)
722                .and_then(|entry| entry.tx_encoded_len())
723                .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
724
725            acc_size_response = acc_size_response.saturating_add(size);
726
727            // 4. Check if acc size or hashes count is at limit, if so stop looping.
728            // if expected response is full enough or the number of hashes in the request is
729            // enough, we're satisfied
730            if acc_size_response >=
731                DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES ||
732                hashes_to_request.len() >
733                    DEFAULT_SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST_ON_FETCH_PENDING_HASHES
734            {
735                break
736            }
737
738            if let Some(ref mut bud) = budget_fill_request {
739                *bud -= 1;
740                if *bud == 0 {
741                    break
742                }
743            }
744        }
745
746        // 5. Remove hashes to request from cache of hashes pending fetch.
747        for hash in hashes_to_request.iter() {
748            self.hashes_pending_fetch.remove(hash);
749        }
750    }
751
752    /// Returns `true` if [`TransactionFetcher`] has capacity to request pending hashes. Returns
753    /// `false` if [`TransactionFetcher`] is operating close to full capacity.
754    pub fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
755        let info = &self.info;
756
757        self.has_capacity(info.max_inflight_requests)
758    }
759
760    /// Returns `true` if the number of inflight requests are under a given tolerated max.
761    fn has_capacity(&self, max_inflight_requests: usize) -> bool {
762        self.inflight_requests.len() <= max_inflight_requests
763    }
764
765    /// Returns the limit to enforce when looking for any pending hash with an idle fallback peer.
766    ///
767    /// Returns `Some(limit)` if [`TransactionFetcher`] and the
768    /// [`TransactionPool`](reth_transaction_pool::TransactionPool) are operating close to full
769    /// capacity. Returns `None`, unlimited, if they are not that busy.
770    pub fn search_breadth_budget_find_idle_fallback_peer(
771        &self,
772        has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
773    ) -> Option<usize> {
774        let info = &self.info;
775
776        let tx_fetcher_has_capacity = self.has_capacity(
777            info.max_inflight_requests /
778                DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_IDLE_PEER,
779        );
780        let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
781            DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_IDLE_PEER,
782        );
783
784        if tx_fetcher_has_capacity && tx_pool_has_capacity {
785            // unlimited search breadth
786            None
787        } else {
788            // limited breadth of search for idle peer
789            let limit = DEFAULT_BUDGET_FIND_IDLE_FALLBACK_PEER;
790
791            trace!(target: "net::tx",
792                inflight_requests=self.inflight_requests.len(),
793                max_inflight_transaction_requests=info.max_inflight_requests,
794                hashes_pending_fetch=self.hashes_pending_fetch.len(),
795                limit,
796                "search breadth limited in search for idle fallback peer for some hash pending fetch"
797            );
798
799            Some(limit)
800        }
801    }
802
803    /// Returns the limit to enforce when looking for the intersection between hashes announced by
804    /// peer and hashes pending fetch.
805    ///
806    /// Returns `Some(limit)` if [`TransactionFetcher`] and the
807    /// [`TransactionPool`](reth_transaction_pool::TransactionPool) are operating close to full
808    /// capacity. Returns `None`, unlimited, if they are not that busy.
809    pub fn search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
810        &self,
811        has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
812    ) -> Option<usize> {
813        let info = &self.info;
814
815        let tx_fetcher_has_capacity = self.has_capacity(
816            info.max_inflight_requests /
817                DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_INTERSECTION,
818        );
819        let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
820            DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_INTERSECTION,
821        );
822
823        if tx_fetcher_has_capacity && tx_pool_has_capacity {
824            // unlimited search breadth
825            None
826        } else {
827            // limited breadth of search for idle peer
828            let limit = DEFAULT_BUDGET_FIND_INTERSECTION_ANNOUNCED_BY_PEER_AND_PENDING_FETCH;
829
830            trace!(target: "net::tx",
831                inflight_requests=self.inflight_requests.len(),
832                max_inflight_transaction_requests=self.info.max_inflight_requests,
833                hashes_pending_fetch=self.hashes_pending_fetch.len(),
834                limit=limit,
835                "search breadth limited in search for intersection of hashes announced by peer and hashes pending fetch"
836            );
837
838            Some(limit)
839        }
840    }
841
842    /// Processes a resolved [`GetPooledTransactions`] request. Queues the outcome as a
843    /// [`FetchEvent`], which will then be streamed by
844    /// [`TransactionsManager`](super::TransactionsManager).
845    pub fn on_resolved_get_pooled_transactions_request_fut(
846        &mut self,
847        response: GetPooledTxResponse<N::PooledTransaction>,
848    ) -> FetchEvent<N::PooledTransaction> {
849        // update peer activity, requests for buffered hashes can only be made to idle
850        // fallback peers
851        let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response;
852
853        self.decrement_inflight_request_count_for(&peer_id);
854
855        match result {
856            Ok(Ok(transactions)) => {
857                //
858                // 1. peer has failed to serve any of the hashes it has announced to us that we,
859                // as a follow, have requested
860                //
861                if transactions.is_empty() {
862                    trace!(target: "net::tx",
863                        peer_id=format!("{peer_id:#}"),
864                        requested_hashes_len=requested_hashes.len(),
865                        "received empty `PooledTransactions` response from peer, peer failed to serve hashes it announced"
866                    );
867
868                    return FetchEvent::EmptyResponse { peer_id }
869                }
870
871                //
872                // 2. filter out hashes that we didn't request
873                //
874                let payload = UnverifiedPooledTransactions::new(transactions);
875
876                let unverified_len = payload.len();
877                let (verification_outcome, verified_payload) =
878                    payload.verify(&requested_hashes, &peer_id);
879
880                let unsolicited = unverified_len - verified_payload.len();
881                if unsolicited > 0 {
882                    self.metrics.unsolicited_transactions.increment(unsolicited as u64);
883                }
884
885                let report_peer = if verification_outcome == VerificationOutcome::ReportPeer {
886                    trace!(target: "net::tx",
887                        peer_id=format!("{peer_id:#}"),
888                        unverified_len,
889                        verified_payload_len=verified_payload.len(),
890                        "received `PooledTransactions` response from peer with entries that didn't verify against request, filtered out transactions"
891                    );
892                    true
893                } else {
894                    false
895                };
896
897                // peer has only sent hashes that we didn't request
898                if verified_payload.is_empty() {
899                    return FetchEvent::FetchError { peer_id, error: RequestError::BadResponse }
900                }
901
902                //
903                // 3. stateless validation of payload, e.g. dedup
904                //
905                let unvalidated_payload_len = verified_payload.len();
906
907                let valid_payload = verified_payload.dedup();
908
909                // todo: validate based on announced tx size/type and report peer for sending
910                // invalid response <https://github.com/paradigmxyz/reth/issues/6529>. requires
911                // passing the rlp encoded length down from active session along with the decoded
912                // tx.
913
914                if valid_payload.len() != unvalidated_payload_len {
915                    trace!(target: "net::tx",
916                    peer_id=format!("{peer_id:#}"),
917                    unvalidated_payload_len,
918                    valid_payload_len=valid_payload.len(),
919                    "received `PooledTransactions` response from peer with duplicate entries, filtered them out"
920                    );
921                }
922                // valid payload will have at least one transaction at this point. even if the tx
923                // size/type announced by the peer is different to the actual tx size/type, pass on
924                // to pending pool imports pipeline for validation.
925
926                //
927                // 4. clear received hashes
928                //
929                let requested_hashes_len = requested_hashes.len();
930                let mut fetched = Vec::with_capacity(valid_payload.len());
931                requested_hashes.retain(|requested_hash| {
932                    if valid_payload.contains_key(requested_hash) {
933                        // hash is now known, stop tracking
934                        fetched.push(*requested_hash);
935                        return false
936                    }
937                    true
938                });
939                fetched.shrink_to_fit();
940                self.metrics.fetched_transactions.increment(fetched.len() as u64);
941
942                if fetched.len() < requested_hashes_len {
943                    trace!(target: "net::tx",
944                        peer_id=format!("{peer_id:#}"),
945                        requested_hashes_len=requested_hashes_len,
946                        fetched_len=fetched.len(),
947                        "peer failed to serve hashes it announced"
948                    );
949                }
950
951                //
952                // 5. buffer left over hashes
953                //
954                self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
955
956                let transactions = valid_payload.into_data().into_values().collect();
957
958                FetchEvent::TransactionsFetched { peer_id, transactions, report_peer }
959            }
960            Ok(Err(req_err)) => {
961                self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
962                FetchEvent::FetchError { peer_id, error: req_err }
963            }
964            Err(_) => {
965                self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
966                // request channel closed/dropped
967                FetchEvent::FetchError { peer_id, error: RequestError::ChannelClosed }
968            }
969        }
970    }
971}
972
973impl<N: NetworkPrimitives> Stream for TransactionFetcher<N> {
974    type Item = FetchEvent<N::PooledTransaction>;
975
976    /// Advances all inflight requests and returns the next event.
977    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
978        // `FuturesUnordered` doesn't close when `None` is returned. so just return pending.
979        // <https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=815be2b6c8003303757c3ced135f363e>
980        if self.inflight_requests.is_empty() {
981            return Poll::Pending
982        }
983
984        if let Some(resp) = ready!(self.inflight_requests.poll_next_unpin(cx)) {
985            return Poll::Ready(Some(self.on_resolved_get_pooled_transactions_request_fut(resp)))
986        }
987
988        Poll::Pending
989    }
990}
991
992impl<T: NetworkPrimitives> Default for TransactionFetcher<T> {
993    fn default() -> Self {
994        Self {
995            active_peers: LruMap::new(DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS),
996            inflight_requests: Default::default(),
997            hashes_pending_fetch: LruCache::new(DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH),
998            hashes_fetch_inflight_and_pending_fetch: LruMap::new(
999                DEFAULT_MAX_CAPACITY_CACHE_INFLIGHT_AND_PENDING_FETCH,
1000            ),
1001            info: TransactionFetcherInfo::default(),
1002            metrics: Default::default(),
1003        }
1004    }
1005}
1006
1007/// Metadata of a transaction hash that is yet to be fetched.
1008#[derive(Debug, Constructor)]
1009pub struct TxFetchMetadata {
1010    /// The number of times a request attempt has been made for the hash.
1011    retries: u8,
1012    /// Peers that have announced the hash, but to which a request attempt has not yet been made.
1013    fallback_peers: LruCache<PeerId>,
1014    /// Size metadata of the transaction if it has been seen in an eth68 announcement.
1015    // todo: store all seen sizes as a `(size, peer_id)` tuple to catch peers that respond with
1016    // another size tx than they announced. alt enter in request (won't catch peers announcing
1017    // wrong size for requests assembled from hashes pending fetch if stored in request fut)
1018    tx_encoded_length: Option<usize>,
1019}
1020
1021impl TxFetchMetadata {
1022    /// Returns a mutable reference to the fallback peers cache for this transaction hash.
1023    pub const fn fallback_peers_mut(&mut self) -> &mut LruCache<PeerId> {
1024        &mut self.fallback_peers
1025    }
1026
1027    /// Returns the size of the transaction, if its hash has been received in any
1028    /// [`Eth68`](reth_eth_wire::EthVersion::Eth68) announcement. If the transaction hash has only
1029    /// been seen in [`Eth66`](reth_eth_wire::EthVersion::Eth66) announcements so far, this will
1030    /// return `None`.
1031    pub const fn tx_encoded_len(&self) -> Option<usize> {
1032        self.tx_encoded_length
1033    }
1034}
1035
1036/// Represents possible events from fetching transactions.
1037#[derive(Debug)]
1038pub enum FetchEvent<T = PooledTransaction> {
1039    /// Triggered when transactions are successfully fetched.
1040    TransactionsFetched {
1041        /// The ID of the peer from which transactions were fetched.
1042        peer_id: PeerId,
1043        /// The transactions that were fetched, if available.
1044        transactions: PooledTransactions<T>,
1045        /// Whether the peer should be penalized for sending unsolicited transactions or for
1046        /// misbehavior.
1047        report_peer: bool,
1048    },
1049    /// Triggered when there is an error in fetching transactions.
1050    FetchError {
1051        /// The ID of the peer from which an attempt to fetch transactions resulted in an error.
1052        peer_id: PeerId,
1053        /// The specific error that occurred while fetching.
1054        error: RequestError,
1055    },
1056    /// An empty response was received.
1057    EmptyResponse {
1058        /// The ID of the sender.
1059        peer_id: PeerId,
1060    },
1061}
1062
1063/// An inflight request for [`PooledTransactions`] from a peer.
1064#[derive(Debug)]
1065pub struct GetPooledTxRequest<T = PooledTransaction> {
1066    peer_id: PeerId,
1067    /// Transaction hashes that were requested, for cleanup purposes
1068    requested_hashes: RequestTxHashes,
1069    response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1070}
1071
1072/// Upon reception of a response, a [`GetPooledTxRequest`] is deconstructed to form a
1073/// [`GetPooledTxResponse`].
1074#[derive(Debug)]
1075pub struct GetPooledTxResponse<T = PooledTransaction> {
1076    peer_id: PeerId,
1077    /// Transaction hashes that were requested, for cleanup purposes, since peer may only return a
1078    /// subset of requested hashes.
1079    requested_hashes: RequestTxHashes,
1080    result: Result<RequestResult<PooledTransactions<T>>, RecvError>,
1081}
1082
1083/// Stores the response receiver made by sending a [`GetPooledTransactions`] request to a peer's
1084/// session.
1085#[must_use = "futures do nothing unless polled"]
1086#[pin_project::pin_project]
1087#[derive(Debug)]
1088pub struct GetPooledTxRequestFut<T = PooledTransaction> {
1089    #[pin]
1090    inner: Option<GetPooledTxRequest<T>>,
1091}
1092
1093impl<T> GetPooledTxRequestFut<T> {
1094    #[inline]
1095    const fn new(
1096        peer_id: PeerId,
1097        requested_hashes: RequestTxHashes,
1098        response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1099    ) -> Self {
1100        Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) }
1101    }
1102}
1103
1104impl<T> Future for GetPooledTxRequestFut<T> {
1105    type Output = GetPooledTxResponse<T>;
1106
1107    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1108        let mut req = self.as_mut().project().inner.take().expect("polled after completion");
1109        match req.response.poll_unpin(cx) {
1110            Poll::Ready(result) => Poll::Ready(GetPooledTxResponse {
1111                peer_id: req.peer_id,
1112                requested_hashes: req.requested_hashes,
1113                result,
1114            }),
1115            Poll::Pending => {
1116                self.project().inner.set(Some(req));
1117                Poll::Pending
1118            }
1119        }
1120    }
1121}
1122
1123/// Wrapper of unverified [`PooledTransactions`].
1124#[derive(Debug, Constructor, Deref)]
1125pub struct UnverifiedPooledTransactions<T> {
1126    txns: PooledTransactions<T>,
1127}
1128
1129/// [`PooledTransactions`] that have been successfully verified.
1130#[derive(Debug, Constructor, Deref)]
1131pub struct VerifiedPooledTransactions<T> {
1132    txns: PooledTransactions<T>,
1133}
1134
1135impl<T: SignedTransaction> DedupPayload for VerifiedPooledTransactions<T> {
1136    type Value = T;
1137
1138    fn is_empty(&self) -> bool {
1139        self.txns.is_empty()
1140    }
1141
1142    fn len(&self) -> usize {
1143        self.txns.len()
1144    }
1145
1146    fn dedup(self) -> PartiallyValidData<Self::Value> {
1147        PartiallyValidData::from_raw_data(
1148            self.txns.into_iter().map(|tx| (*tx.tx_hash(), tx)).collect(),
1149            None,
1150        )
1151    }
1152}
1153
1154trait VerifyPooledTransactionsResponse {
1155    type Transaction: SignedTransaction;
1156
1157    fn verify(
1158        self,
1159        requested_hashes: &RequestTxHashes,
1160        peer_id: &PeerId,
1161    ) -> (VerificationOutcome, VerifiedPooledTransactions<Self::Transaction>);
1162}
1163
1164impl<T: SignedTransaction> VerifyPooledTransactionsResponse for UnverifiedPooledTransactions<T> {
1165    type Transaction = T;
1166
1167    fn verify(
1168        self,
1169        requested_hashes: &RequestTxHashes,
1170        _peer_id: &PeerId,
1171    ) -> (VerificationOutcome, VerifiedPooledTransactions<T>) {
1172        let mut verification_outcome = VerificationOutcome::Ok;
1173
1174        let Self { mut txns } = self;
1175
1176        #[cfg(debug_assertions)]
1177        let mut tx_hashes_not_requested: smallvec::SmallVec<[TxHash; 16]> = smallvec::smallvec!();
1178        #[cfg(not(debug_assertions))]
1179        let mut tx_hashes_not_requested_count = 0;
1180
1181        txns.0.retain(|tx| {
1182            if !requested_hashes.contains(tx.tx_hash()) {
1183                verification_outcome = VerificationOutcome::ReportPeer;
1184
1185                #[cfg(debug_assertions)]
1186                tx_hashes_not_requested.push(*tx.tx_hash());
1187                #[cfg(not(debug_assertions))]
1188                {
1189                    tx_hashes_not_requested_count += 1;
1190                }
1191
1192                return false
1193            }
1194            true
1195        });
1196
1197        #[cfg(debug_assertions)]
1198        if !tx_hashes_not_requested.is_empty() {
1199            trace!(target: "net::tx",
1200                peer_id=format!("{_peer_id:#}"),
1201                ?tx_hashes_not_requested,
1202                "transactions in `PooledTransactions` response from peer were not requested"
1203            );
1204        }
1205        #[cfg(not(debug_assertions))]
1206        if tx_hashes_not_requested_count != 0 {
1207            trace!(target: "net::tx",
1208                peer_id=format!("{_peer_id:#}"),
1209                tx_hashes_not_requested_count,
1210                "transactions in `PooledTransactions` response from peer were not requested"
1211            );
1212        }
1213
1214        (verification_outcome, VerifiedPooledTransactions::new(txns))
1215    }
1216}
1217
1218/// Outcome from verifying a [`PooledTransactions`] response. Signals to caller whether to penalize
1219/// the sender of the response or not.
1220#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1221pub enum VerificationOutcome {
1222    /// Peer behaves appropriately.
1223    Ok,
1224    /// A penalty should be flagged for the peer. Peer sent a response with unacceptably
1225    /// invalid entries.
1226    ReportPeer,
1227}
1228
1229/// Tracks stats about the [`TransactionFetcher`].
1230#[derive(Debug, Constructor)]
1231pub struct TransactionFetcherInfo {
1232    /// Max inflight [`GetPooledTransactions`] requests.
1233    pub max_inflight_requests: usize,
1234    /// Max inflight [`GetPooledTransactions`] requests per peer.
1235    pub max_inflight_requests_per_peer: u8,
1236    /// Soft limit for the byte size of the expected [`PooledTransactions`] response, upon packing
1237    /// a [`GetPooledTransactions`] request with hashes (by default less than 2 MiB worth of
1238    /// transactions is requested).
1239    pub soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
1240    /// Soft limit for the byte size of a [`PooledTransactions`] response, upon assembling the
1241    /// response. Spec'd at 2 MiB, but can be adjusted for research purpose.
1242    pub soft_limit_byte_size_pooled_transactions_response: usize,
1243    /// Max capacity of the cache of transaction hashes, for transactions that weren't yet fetched.
1244    /// A transaction is pending fetch if its hash didn't fit into a [`GetPooledTransactions`] yet,
1245    /// or it wasn't returned upon request to peers.
1246    pub max_capacity_cache_txns_pending_fetch: u32,
1247}
1248
1249impl Default for TransactionFetcherInfo {
1250    fn default() -> Self {
1251        Self::new(
1252            DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS as usize,
1253            DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
1254            DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
1255            SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
1256            DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
1257        )
1258    }
1259}
1260
1261impl From<TransactionFetcherConfig> for TransactionFetcherInfo {
1262    fn from(config: TransactionFetcherConfig) -> Self {
1263        let TransactionFetcherConfig {
1264            max_inflight_requests,
1265            max_inflight_requests_per_peer,
1266            soft_limit_byte_size_pooled_transactions_response,
1267            soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1268            max_capacity_cache_txns_pending_fetch,
1269        } = config;
1270
1271        Self::new(
1272            max_inflight_requests as usize,
1273            max_inflight_requests_per_peer,
1274            soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1275            soft_limit_byte_size_pooled_transactions_response,
1276            max_capacity_cache_txns_pending_fetch,
1277        )
1278    }
1279}
1280
1281#[derive(Debug, Default)]
1282struct TxFetcherSearchDurations {
1283    find_idle_peer: Duration,
1284    fill_request: Duration,
1285}
1286
1287#[cfg(test)]
1288mod test {
1289    use super::*;
1290    use crate::test_utils::transactions::{buffer_hash_to_tx_fetcher, new_mock_session};
1291    use alloy_primitives::{hex, B256};
1292    use alloy_rlp::Decodable;
1293    use derive_more::IntoIterator;
1294    use reth_eth_wire_types::EthVersion;
1295    use reth_ethereum_primitives::TransactionSigned;
1296    use std::{collections::HashSet, str::FromStr};
1297
1298    #[derive(IntoIterator)]
1299    struct TestValidAnnouncementData(Vec<(TxHash, Option<(u8, usize)>)>);
1300
1301    impl HandleMempoolData for TestValidAnnouncementData {
1302        fn is_empty(&self) -> bool {
1303            self.0.is_empty()
1304        }
1305
1306        fn len(&self) -> usize {
1307            self.0.len()
1308        }
1309
1310        fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
1311            self.0.retain(|(hash, _)| f(hash))
1312        }
1313    }
1314
1315    impl HandleVersionedMempoolData for TestValidAnnouncementData {
1316        fn msg_version(&self) -> EthVersion {
1317            EthVersion::Eth68
1318        }
1319    }
1320
1321    #[test]
1322    fn pack_eth68_request() {
1323        reth_tracing::init_test_tracing();
1324
1325        // RIG TEST
1326
1327        let tx_fetcher = &mut TransactionFetcher::<EthNetworkPrimitives>::default();
1328
1329        let eth68_hashes = [
1330            B256::from_slice(&[1; 32]),
1331            B256::from_slice(&[2; 32]),
1332            B256::from_slice(&[3; 32]),
1333            B256::from_slice(&[4; 32]),
1334            B256::from_slice(&[5; 32]),
1335        ];
1336        let eth68_sizes = [
1337            DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ - MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED - 1, // first will fit
1338            DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ, // second won't
1339            2, // free space > `MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED`, third will fit, no more after this
1340            9,
1341            0,
1342        ];
1343
1344        let expected_request_hashes =
1345            [eth68_hashes[0], eth68_hashes[2]].into_iter().collect::<HashSet<_>>();
1346
1347        let expected_surplus_hashes =
1348            [eth68_hashes[1], eth68_hashes[3], eth68_hashes[4]].into_iter().collect::<HashSet<_>>();
1349
1350        let mut eth68_hashes_to_request = RequestTxHashes::with_capacity(3);
1351
1352        let valid_announcement_data = TestValidAnnouncementData(
1353            eth68_hashes
1354                .into_iter()
1355                .zip(eth68_sizes)
1356                .map(|(hash, size)| (hash, Some((0u8, size))))
1357                .collect::<Vec<_>>(),
1358        );
1359
1360        // TEST
1361
1362        let surplus_eth68_hashes =
1363            tx_fetcher.pack_request_eth68(&mut eth68_hashes_to_request, valid_announcement_data);
1364
1365        let eth68_hashes_to_request = eth68_hashes_to_request.into_iter().collect::<HashSet<_>>();
1366        let surplus_eth68_hashes = surplus_eth68_hashes.into_iter().collect::<HashSet<_>>();
1367
1368        assert_eq!(expected_request_hashes, eth68_hashes_to_request);
1369        assert_eq!(expected_surplus_hashes, surplus_eth68_hashes);
1370    }
1371
1372    #[test]
1373    fn pack_eth68_request_does_not_overflow_announced_size() {
1374        reth_tracing::init_test_tracing();
1375
1376        let tx_fetcher = &mut TransactionFetcher::<EthNetworkPrimitives>::default();
1377
1378        let eth68_hashes =
1379            [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32]), B256::from_slice(&[3; 32])];
1380        let eth68_sizes = [
1381            DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ - MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED - 1,
1382            usize::MAX,
1383            2,
1384        ];
1385
1386        let expected_request_hashes =
1387            [eth68_hashes[0], eth68_hashes[2]].into_iter().collect::<HashSet<_>>();
1388        let expected_surplus_hashes = std::iter::once(eth68_hashes[1]).collect::<HashSet<_>>();
1389
1390        let mut eth68_hashes_to_request = RequestTxHashes::with_capacity(3);
1391        let valid_announcement_data = TestValidAnnouncementData(
1392            eth68_hashes
1393                .into_iter()
1394                .zip(eth68_sizes)
1395                .map(|(hash, size)| (hash, Some((0u8, size))))
1396                .collect::<Vec<_>>(),
1397        );
1398
1399        let surplus_eth68_hashes =
1400            tx_fetcher.pack_request_eth68(&mut eth68_hashes_to_request, valid_announcement_data);
1401
1402        let eth68_hashes_to_request = eth68_hashes_to_request.into_iter().collect::<HashSet<_>>();
1403        let surplus_eth68_hashes = surplus_eth68_hashes.into_iter().collect::<HashSet<_>>();
1404
1405        assert_eq!(expected_request_hashes, eth68_hashes_to_request);
1406        assert_eq!(expected_surplus_hashes, surplus_eth68_hashes);
1407    }
1408
1409    #[tokio::test]
1410    async fn test_on_fetch_pending_hashes() {
1411        reth_tracing::init_test_tracing();
1412
1413        let tx_fetcher = &mut TransactionFetcher::default();
1414
1415        // RIG TEST
1416
1417        // hashes that will be fetched because they are stored as pending fetch
1418        let seen_hashes = [
1419            B256::from_slice(&[1; 32]),
1420            B256::from_slice(&[2; 32]),
1421            B256::from_slice(&[3; 32]),
1422            B256::from_slice(&[4; 32]),
1423        ];
1424        //
1425        // txns 1-3 are small, all will fit in request. no metadata has been made available for
1426        // hash 4, it has only been seen over eth66 conn, so average tx size will be assumed in
1427        // filling request.
1428        let seen_eth68_hashes_sizes = [120, 158, 116];
1429
1430        // peer that will fetch seen hashes because they are pending fetch
1431        let peer_1 = PeerId::new([1; 64]);
1432        // second peer, won't do anything in this test
1433        let peer_2 = PeerId::new([2; 64]);
1434
1435        // add seen hashes to peers seen transactions
1436        //
1437        // get handle for peer_1's session to receive request for pending hashes
1438        let (mut peer_1_data, mut peer_1_mock_session_rx) =
1439            new_mock_session(peer_1, EthVersion::Eth66);
1440        for hash in &seen_hashes {
1441            peer_1_data.seen_transactions.insert(*hash);
1442        }
1443        let (mut peer_2_data, _) = new_mock_session(peer_2, EthVersion::Eth66);
1444        for hash in &seen_hashes {
1445            peer_2_data.seen_transactions.insert(*hash);
1446        }
1447        let mut peers = HashMap::default();
1448        peers.insert(peer_1, peer_1_data);
1449        peers.insert(peer_2, peer_2_data);
1450
1451        // insert seen_hashes into tx fetcher
1452        for i in 0..3 {
1453            // insert peer_2 as fallback peer for seen_hashes
1454            buffer_hash_to_tx_fetcher(
1455                tx_fetcher,
1456                seen_hashes[i],
1457                peer_2,
1458                0,
1459                Some(seen_eth68_hashes_sizes[i]),
1460            );
1461        }
1462        buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[3], peer_2, 0, None);
1463
1464        // insert pending hash without peer_1 as fallback peer, only with peer_2 as fallback peer
1465        let hash_other = B256::from_slice(&[5; 32]);
1466        buffer_hash_to_tx_fetcher(tx_fetcher, hash_other, peer_2, 0, None);
1467
1468        // add peer_1 as lru fallback peer for seen hashes
1469        for hash in &seen_hashes {
1470            buffer_hash_to_tx_fetcher(tx_fetcher, *hash, peer_1, 0, None);
1471        }
1472
1473        // seen hashes and the random hash from peer_2 are pending fetch
1474        assert_eq!(tx_fetcher.num_pending_hashes(), 5);
1475
1476        // TEST
1477
1478        tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
1479
1480        // mock session of peer_1 receives request
1481        let req = peer_1_mock_session_rx
1482            .recv()
1483            .await
1484            .expect("peer session should receive request with buffered hashes");
1485        let PeerRequest::GetPooledTransactions { request, .. } = req else { unreachable!() };
1486        let GetPooledTransactions(requested_hashes) = request;
1487
1488        assert_eq!(
1489            requested_hashes.into_iter().collect::<HashSet<_>>(),
1490            seen_hashes.into_iter().collect::<HashSet<_>>()
1491        )
1492    }
1493
1494    #[test]
1495    fn on_fetch_pending_hashes_rebuffers_on_disconnected_peer() {
1496        let tx_fetcher = &mut TransactionFetcher::default();
1497        let peer_1 = PeerId::new([1; 64]);
1498        let peer_2 = PeerId::new([2; 64]);
1499        let hash_1 = B256::from_slice(&[1; 32]);
1500
1501        buffer_hash_to_tx_fetcher(tx_fetcher, hash_1, peer_1, 0, Some(128));
1502        buffer_hash_to_tx_fetcher(tx_fetcher, hash_1, peer_2, 0, Some(128));
1503
1504        assert_eq!(tx_fetcher.num_pending_hashes(), 1);
1505
1506        // pass empty peers map — both peers are "disconnected"
1507        let peers = HashMap::new();
1508        tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
1509
1510        // hash should be re-buffered, not lost
1511        assert_eq!(tx_fetcher.num_pending_hashes(), 1);
1512    }
1513
1514    #[test]
1515    fn verify_response_hashes() {
1516        let input = hex!(
1517            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598daa"
1518        );
1519        let signed_tx_1: PooledTransaction =
1520            TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1521        let input = hex!(
1522            "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
1523        );
1524        let signed_tx_2: PooledTransaction =
1525            TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1526
1527        // only tx 1 is requested
1528        let request_hashes = [
1529            B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e67890")
1530                .unwrap(),
1531            *signed_tx_1.hash(),
1532            B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e12345")
1533                .unwrap(),
1534            B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4edabe3")
1535                .unwrap(),
1536        ];
1537
1538        for hash in &request_hashes {
1539            assert_ne!(hash, signed_tx_2.hash())
1540        }
1541
1542        let request_hashes = RequestTxHashes::new(request_hashes.into_iter().collect());
1543
1544        // but response contains tx 1 + another tx
1545        let response_txns = PooledTransactions(vec![signed_tx_1.clone(), signed_tx_2]);
1546        let payload = UnverifiedPooledTransactions::new(response_txns);
1547
1548        let (outcome, verified_payload) = payload.verify(&request_hashes, &PeerId::ZERO);
1549
1550        assert_eq!(VerificationOutcome::ReportPeer, outcome);
1551        assert_eq!(1, verified_payload.len());
1552        assert!(verified_payload.contains(&signed_tx_1));
1553    }
1554}