1use super::{
29 config::TransactionFetcherConfig,
30 constants::{tx_fetcher::*, SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST},
31 MessageFilter, PeerMetadata, PooledTransactions,
32 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
33};
34use crate::{
35 cache::{LruCache, LruMap},
36 duration_metered_exec,
37 metrics::TransactionFetcherMetrics,
38 transactions::{validation, PartiallyFilterMessage},
39};
40use alloy_consensus::transaction::PooledTransaction;
41use alloy_primitives::TxHash;
42use derive_more::{Constructor, Deref};
43use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
44use pin_project::pin_project;
45use reth_eth_wire::{
46 DedupPayload, EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
47 PartiallyValidData, RequestTxHashes, ValidAnnouncementData,
48};
49use reth_eth_wire_types::{EthNetworkPrimitives, NetworkPrimitives};
50use reth_network_api::PeerRequest;
51use reth_network_p2p::error::{RequestError, RequestResult};
52use reth_network_peers::PeerId;
53use reth_primitives_traits::SignedTransaction;
54use schnellru::ByLength;
55use std::{
56 collections::HashMap,
57 pin::Pin,
58 task::{ready, Context, Poll},
59 time::Duration,
60};
61use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
62use tracing::trace;
63use validation::FilterOutcome;
64
65#[derive(Debug)]
70#[pin_project]
71pub struct TransactionFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
72 pub active_peers: LruMap<PeerId, u8, ByLength>,
74 #[pin]
80 pub inflight_requests: FuturesUnordered<GetPooledTxRequestFut<N::PooledTransaction>>,
81 pub hashes_pending_fetch: LruCache<TxHash>,
86 pub hashes_fetch_inflight_and_pending_fetch: LruMap<TxHash, TxFetchMetadata, ByLength>,
88 pub(super) filter_valid_message: MessageFilter,
90 pub info: TransactionFetcherInfo,
92 #[doc(hidden)]
93 metrics: TransactionFetcherMetrics,
94}
95
96impl<N: NetworkPrimitives> TransactionFetcher<N> {
97 pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) {
99 self.active_peers.remove(peer_id);
100 }
101
102 #[inline]
104 pub fn update_metrics(&self) {
105 let metrics = &self.metrics;
106
107 metrics.inflight_transaction_requests.set(self.inflight_requests.len() as f64);
108
109 let hashes_pending_fetch = self.hashes_pending_fetch.len() as f64;
110 let total_hashes = self.hashes_fetch_inflight_and_pending_fetch.len() as f64;
111
112 metrics.hashes_pending_fetch.set(hashes_pending_fetch);
113 metrics.hashes_inflight_transaction_requests.set(total_hashes - hashes_pending_fetch);
114 }
115
116 #[inline]
117 fn update_pending_fetch_cache_search_metrics(&self, durations: TxFetcherSearchDurations) {
118 let metrics = &self.metrics;
119
120 let TxFetcherSearchDurations { find_idle_peer, fill_request } = durations;
121 metrics
122 .duration_find_idle_fallback_peer_for_any_pending_hash
123 .set(find_idle_peer.as_secs_f64());
124 metrics.duration_fill_request_from_hashes_pending_fetch.set(fill_request.as_secs_f64());
125 }
126
127 pub fn with_transaction_fetcher_config(config: &TransactionFetcherConfig) -> Self {
129 let TransactionFetcherConfig {
130 max_inflight_requests,
131 max_capacity_cache_txns_pending_fetch,
132 ..
133 } = *config;
134
135 let info = config.clone().into();
136
137 let metrics = TransactionFetcherMetrics::default();
138 metrics.capacity_inflight_requests.increment(max_inflight_requests as u64);
139
140 Self {
141 active_peers: LruMap::new(max_inflight_requests),
142 hashes_pending_fetch: LruCache::new(max_capacity_cache_txns_pending_fetch),
143 hashes_fetch_inflight_and_pending_fetch: LruMap::new(
144 max_inflight_requests + max_capacity_cache_txns_pending_fetch,
145 ),
146 info,
147 metrics,
148 ..Default::default()
149 }
150 }
151
152 #[inline]
154 pub fn remove_hashes_from_transaction_fetcher<I>(&mut self, hashes: I)
155 where
156 I: IntoIterator<Item = TxHash>,
157 {
158 for hash in hashes {
159 self.hashes_fetch_inflight_and_pending_fetch.remove(&hash);
160 self.hashes_pending_fetch.remove(&hash);
161 }
162 }
163
164 fn decrement_inflight_request_count_for(&mut self, peer_id: &PeerId) {
166 let remove = || -> bool {
167 if let Some(inflight_count) = self.active_peers.get(peer_id) {
168 *inflight_count = inflight_count.saturating_sub(1);
169 if *inflight_count == 0 {
170 return true
171 }
172 }
173 false
174 }();
175
176 if remove {
177 self.active_peers.remove(peer_id);
178 }
179 }
180
181 #[inline]
183 pub fn is_idle(&self, peer_id: &PeerId) -> bool {
184 let Some(inflight_count) = self.active_peers.peek(peer_id) else { return true };
185 if *inflight_count < self.info.max_inflight_requests_per_peer {
186 return true
187 }
188 false
189 }
190
191 pub fn get_idle_peer_for(&self, hash: TxHash) -> Option<&PeerId> {
193 let TxFetchMetadata { fallback_peers, .. } =
194 self.hashes_fetch_inflight_and_pending_fetch.peek(&hash)?;
195
196 for peer_id in fallback_peers.iter() {
197 if self.is_idle(peer_id) {
198 return Some(peer_id)
199 }
200 }
201
202 None
203 }
204
205 pub fn find_any_idle_fallback_peer_for_any_pending_hash(
211 &mut self,
212 hashes_to_request: &mut RequestTxHashes,
213 mut budget: Option<usize>, ) -> Option<PeerId> {
215 let mut hashes_pending_fetch_iter = self.hashes_pending_fetch.iter();
216
217 let idle_peer = loop {
218 let &hash = hashes_pending_fetch_iter.next()?;
219
220 let idle_peer = self.get_idle_peer_for(hash);
221
222 if idle_peer.is_some() {
223 hashes_to_request.insert(hash);
224 break idle_peer.copied()
225 }
226
227 if let Some(ref mut bud) = budget {
228 *bud = bud.saturating_sub(1);
229 if *bud == 0 {
230 return None
231 }
232 }
233 };
234 let hash = hashes_to_request.iter().next()?;
235
236 drop(hashes_pending_fetch_iter);
238 _ = self.hashes_pending_fetch.remove(hash);
239
240 idle_peer
241 }
242
243 pub fn pack_request(
248 &self,
249 hashes_to_request: &mut RequestTxHashes,
250 hashes_from_announcement: ValidAnnouncementData,
251 ) -> RequestTxHashes {
252 if hashes_from_announcement.msg_version().is_eth68() {
253 return self.pack_request_eth68(hashes_to_request, hashes_from_announcement)
254 }
255 self.pack_request_eth66(hashes_to_request, hashes_from_announcement)
256 }
257
258 pub fn pack_request_eth68(
269 &self,
270 hashes_to_request: &mut RequestTxHashes,
271 hashes_from_announcement: impl HandleMempoolData
272 + IntoIterator<Item = (TxHash, Option<(u8, usize)>)>,
273 ) -> RequestTxHashes {
274 let mut acc_size_response = 0;
275
276 let mut hashes_from_announcement_iter = hashes_from_announcement.into_iter();
277
278 if let Some((hash, Some((_ty, size)))) = hashes_from_announcement_iter.next() {
279 hashes_to_request.insert(hash);
280
281 if size >= self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request {
283 return hashes_from_announcement_iter.collect()
284 }
285 acc_size_response = size;
286 }
287
288 let mut surplus_hashes = RequestTxHashes::default();
289
290 loop {
293 let Some((hash, metadata)) = hashes_from_announcement_iter.next() else { break };
294
295 let Some((_ty, size)) = metadata else {
296 unreachable!("this method is called upon reception of an eth68 announcement")
297 };
298
299 let next_acc_size = acc_size_response + size;
300
301 if next_acc_size <=
302 self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request
303 {
304 acc_size_response = next_acc_size;
307 _ = hashes_to_request.insert(hash)
308 } else {
309 _ = surplus_hashes.insert(hash)
310 }
311
312 let free_space =
313 self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request -
314 acc_size_response;
315
316 if free_space < MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED {
317 break
318 }
319 }
320
321 surplus_hashes.extend(hashes_from_announcement_iter.map(|(hash, _metadata)| hash));
322
323 surplus_hashes
324 }
325
326 pub fn pack_request_eth66(
333 &self,
334 hashes_to_request: &mut RequestTxHashes,
335 hashes_from_announcement: ValidAnnouncementData,
336 ) -> RequestTxHashes {
337 let (mut hashes, _version) = hashes_from_announcement.into_request_hashes();
338 if hashes.len() <= SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST {
339 *hashes_to_request = hashes;
340 hashes_to_request.shrink_to_fit();
341
342 RequestTxHashes::default()
343 } else {
344 let surplus_hashes =
345 hashes.retain_count(SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST);
346 *hashes_to_request = hashes;
347 hashes_to_request.shrink_to_fit();
348
349 surplus_hashes
350 }
351 }
352
353 pub fn try_buffer_hashes_for_retry(
355 &mut self,
356 mut hashes: RequestTxHashes,
357 peer_failed_to_serve: &PeerId,
358 ) {
359 hashes.retain(|hash| {
362 if let Some(entry) = self.hashes_fetch_inflight_and_pending_fetch.get(hash) {
363 entry.fallback_peers_mut().remove(peer_failed_to_serve);
364 return true
365 }
366 false
368 });
369
370 self.buffer_hashes(hashes, None)
371 }
372
373 pub fn num_pending_hashes(&self) -> usize {
375 self.hashes_pending_fetch.len()
376 }
377
378 pub fn num_all_hashes(&self) -> usize {
380 self.hashes_fetch_inflight_and_pending_fetch.len()
381 }
382
383 pub fn buffer_hashes(&mut self, hashes: RequestTxHashes, fallback_peer: Option<PeerId>) {
388 for hash in hashes {
389 if self.hashes_fetch_inflight_and_pending_fetch.peek(&hash).is_none() {
391 continue
392 }
393
394 let Some(TxFetchMetadata { retries, fallback_peers, .. }) =
395 self.hashes_fetch_inflight_and_pending_fetch.get(&hash)
396 else {
397 return
398 };
399
400 if let Some(peer_id) = fallback_peer {
401 fallback_peers.insert(peer_id);
403 } else {
404 if *retries >= DEFAULT_MAX_RETRIES {
405 trace!(target: "net::tx",
406 %hash,
407 retries,
408 "retry limit for `GetPooledTransactions` requests reached for hash, dropping hash"
409 );
410
411 self.hashes_fetch_inflight_and_pending_fetch.remove(&hash);
412 self.hashes_pending_fetch.remove(&hash);
413 continue
414 }
415 *retries += 1;
416 }
417
418 if let (_, Some(evicted_hash)) = self.hashes_pending_fetch.insert_and_get_evicted(hash)
419 {
420 self.hashes_fetch_inflight_and_pending_fetch.remove(&evicted_hash);
421 self.hashes_pending_fetch.remove(&evicted_hash);
422 }
423 }
424 }
425
426 pub fn on_fetch_pending_hashes(
431 &mut self,
432 peers: &HashMap<PeerId, PeerMetadata<N>>,
433 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
434 ) {
435 let mut hashes_to_request = RequestTxHashes::with_capacity(
436 DEFAULT_MARGINAL_COUNT_HASHES_GET_POOLED_TRANSACTIONS_REQUEST,
437 );
438 let mut search_durations = TxFetcherSearchDurations::default();
439
440 let budget_find_idle_fallback_peer = self
442 .search_breadth_budget_find_idle_fallback_peer(&has_capacity_wrt_pending_pool_imports);
443
444 let peer_id = duration_metered_exec!(
445 {
446 let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash(
447 &mut hashes_to_request,
448 budget_find_idle_fallback_peer,
449 ) else {
450 return
452 };
453
454 peer_id
455 },
456 search_durations.find_idle_peer
457 );
458
459 let Some(peer) = peers.get(&peer_id) else { return };
461 let conn_eth_version = peer.version;
462
463 let budget_fill_request = self
468 .search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
469 &has_capacity_wrt_pending_pool_imports,
470 );
471
472 duration_metered_exec!(
473 {
474 self.fill_request_from_hashes_pending_fetch(
475 &mut hashes_to_request,
476 &peer.seen_transactions,
477 budget_fill_request,
478 )
479 },
480 search_durations.fill_request
481 );
482
483 self.update_pending_fetch_cache_search_metrics(search_durations);
484
485 trace!(target: "net::tx",
486 peer_id=format!("{peer_id:#}"),
487 hashes=?*hashes_to_request,
488 %conn_eth_version,
489 "requesting hashes that were stored pending fetch from peer"
490 );
491
492 if let Some(failed_to_request_hashes) =
494 self.request_transactions_from_peer(hashes_to_request, peer)
495 {
496 trace!(target: "net::tx",
497 peer_id=format!("{peer_id:#}"),
498 ?failed_to_request_hashes,
499 %conn_eth_version,
500 "failed sending request to peer's session, buffering hashes"
501 );
502
503 self.buffer_hashes(failed_to_request_hashes, Some(peer_id));
504 }
505 }
506
507 pub fn filter_unseen_and_pending_hashes(
510 &mut self,
511 new_announced_hashes: &mut ValidAnnouncementData,
512 is_tx_bad_import: impl Fn(&TxHash) -> bool,
513 peer_id: &PeerId,
514 client_version: &str,
515 ) {
516 let mut previously_unseen_hashes_count = 0;
517
518 let msg_version = new_announced_hashes.msg_version();
519
520 new_announced_hashes.retain(|hash, metadata| {
522
523 if let Some(TxFetchMetadata{ tx_encoded_length: previously_seen_size, ..}) = self.hashes_fetch_inflight_and_pending_fetch.peek_mut(hash) {
525 if let Some((_ty, size)) = metadata {
527 if let Some(prev_size) = previously_seen_size {
528 if size != prev_size {
530 trace!(target: "net::tx",
531 peer_id=format!("{peer_id:#}"),
532 %hash,
533 size,
534 previously_seen_size,
535 %client_version,
536 "peer announced a different size for tx, this is especially worrying if one size is much bigger..."
537 );
538 }
539 }
540 *previously_seen_size = Some(*size);
542 }
543
544 if self.hashes_pending_fetch.remove(hash) {
546 return true
547 }
548
549 return false
550 }
551
552 if is_tx_bad_import(hash) {
555 return false
556 }
557
558 previously_unseen_hashes_count += 1;
559
560 if self.hashes_fetch_inflight_and_pending_fetch.get_or_insert(*hash, ||
561 TxFetchMetadata{retries: 0, fallback_peers: LruCache::new(DEFAULT_MAX_COUNT_FALLBACK_PEERS as u32), tx_encoded_length: None}
562 ).is_none() {
563
564 trace!(target: "net::tx",
565 peer_id=format!("{peer_id:#}"),
566 %hash,
567 ?msg_version,
568 %client_version,
569 "failed to cache new announced hash from peer in schnellru::LruMap, dropping hash"
570 );
571
572 return false
573 }
574 true
575 });
576
577 trace!(target: "net::tx",
578 peer_id=format!("{peer_id:#}"),
579 previously_unseen_hashes_count=previously_unseen_hashes_count,
580 msg_version=?msg_version,
581 client_version=%client_version,
582 "received previously unseen hashes in announcement from peer"
583 );
584 }
585
586 pub fn request_transactions_from_peer(
594 &mut self,
595 new_announced_hashes: RequestTxHashes,
596 peer: &PeerMetadata<N>,
597 ) -> Option<RequestTxHashes> {
598 let peer_id: PeerId = peer.request_tx.peer_id;
599 let conn_eth_version = peer.version;
600
601 if self.active_peers.len() >= self.info.max_inflight_requests {
602 trace!(target: "net::tx",
603 peer_id=format!("{peer_id:#}"),
604 hashes=?*new_announced_hashes,
605 %conn_eth_version,
606 max_inflight_transaction_requests=self.info.max_inflight_requests,
607 "limit for concurrent `GetPooledTransactions` requests reached, dropping request for hashes to peer"
608 );
609 return Some(new_announced_hashes)
610 }
611
612 let Some(inflight_count) = self.active_peers.get_or_insert(peer_id, || 0) else {
613 trace!(target: "net::tx",
614 peer_id=format!("{peer_id:#}"),
615 hashes=?*new_announced_hashes,
616 conn_eth_version=%conn_eth_version,
617 "failed to cache active peer in schnellru::LruMap, dropping request to peer"
618 );
619 return Some(new_announced_hashes)
620 };
621
622 if *inflight_count >= self.info.max_inflight_requests_per_peer {
623 trace!(target: "net::tx",
624 peer_id=format!("{peer_id:#}"),
625 hashes=?*new_announced_hashes,
626 %conn_eth_version,
627 max_concurrent_tx_reqs_per_peer=self.info.max_inflight_requests_per_peer,
628 "limit for concurrent `GetPooledTransactions` requests per peer reached"
629 );
630 return Some(new_announced_hashes)
631 }
632
633 #[cfg(debug_assertions)]
634 {
635 for hash in &new_announced_hashes {
636 if self.hashes_pending_fetch.contains(hash) {
637 tracing::debug!(target: "net::tx", "`{}` should have been taken out of buffer before packing in a request, breaks invariant `@hashes_pending_fetch` and `@inflight_requests`, `@hashes_fetch_inflight_and_pending_fetch` for `{}`: {:?}",
638 format!("{:?}", new_announced_hashes), format!("{:?}", new_announced_hashes),
640 new_announced_hashes.iter().map(|hash| {
641 let metadata = self.hashes_fetch_inflight_and_pending_fetch.get(hash);
642 (*hash, metadata.map(|m| (m.retries, m.tx_encoded_length)))
644 }).collect::<Vec<(TxHash, Option<(u8, Option<usize>)>)>>())
645 }
646 }
647 }
648
649 let (response, rx) = oneshot::channel();
650 let req = PeerRequest::GetPooledTransactions {
651 request: GetPooledTransactions(new_announced_hashes.iter().copied().collect()),
652 response,
653 };
654
655 if let Err(err) = peer.request_tx.try_send(req) {
657 return match err {
659 TrySendError::Full(_) | TrySendError::Closed(_) => {
660 self.metrics.egress_peer_channel_full.increment(1);
661 Some(new_announced_hashes)
662 }
663 }
664 }
665
666 *inflight_count += 1;
667 self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, new_announced_hashes, rx));
669
670 None
671 }
672
673 pub fn fill_request_from_hashes_pending_fetch(
693 &mut self,
694 hashes_to_request: &mut RequestTxHashes,
695 seen_hashes: &LruCache<TxHash>,
696 mut budget_fill_request: Option<usize>, ) {
698 let Some(hash) = hashes_to_request.iter().next() else { return };
699
700 let mut acc_size_response = self
701 .hashes_fetch_inflight_and_pending_fetch
702 .get(hash)
703 .and_then(|entry| entry.tx_encoded_len())
704 .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
705
706 if acc_size_response >=
708 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES
709 {
710 return
711 }
712
713 for hash in self.hashes_pending_fetch.iter() {
716 if !seen_hashes.contains(hash) {
718 continue
719 };
720
721 hashes_to_request.insert(*hash);
723
724 let size = self
726 .hashes_fetch_inflight_and_pending_fetch
727 .get(hash)
728 .and_then(|entry| entry.tx_encoded_len())
729 .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
730
731 acc_size_response += size;
732
733 if acc_size_response >=
737 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES ||
738 hashes_to_request.len() >
739 DEFAULT_SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST_ON_FETCH_PENDING_HASHES
740 {
741 break
742 }
743
744 if let Some(ref mut bud) = budget_fill_request {
745 *bud -= 1;
746 if *bud == 0 {
747 break
748 }
749 }
750 }
751
752 for hash in hashes_to_request.iter() {
754 self.hashes_pending_fetch.remove(hash);
755 }
756 }
757
758 pub fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
761 let info = &self.info;
762
763 self.has_capacity(info.max_inflight_requests)
764 }
765
766 fn has_capacity(&self, max_inflight_requests: usize) -> bool {
768 self.inflight_requests.len() <= max_inflight_requests
769 }
770
771 pub fn search_breadth_budget_find_idle_fallback_peer(
777 &self,
778 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
779 ) -> Option<usize> {
780 let info = &self.info;
781
782 let tx_fetcher_has_capacity = self.has_capacity(
783 info.max_inflight_requests /
784 DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_IDLE_PEER,
785 );
786 let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
787 DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_IDLE_PEER,
788 );
789
790 if tx_fetcher_has_capacity && tx_pool_has_capacity {
791 None
793 } else {
794 let limit = DEFAULT_BUDGET_FIND_IDLE_FALLBACK_PEER;
796
797 trace!(target: "net::tx",
798 inflight_requests=self.inflight_requests.len(),
799 max_inflight_transaction_requests=info.max_inflight_requests,
800 hashes_pending_fetch=self.hashes_pending_fetch.len(),
801 limit,
802 "search breadth limited in search for idle fallback peer for some hash pending fetch"
803 );
804
805 Some(limit)
806 }
807 }
808
809 pub fn search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
816 &self,
817 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
818 ) -> Option<usize> {
819 let info = &self.info;
820
821 let tx_fetcher_has_capacity = self.has_capacity(
822 info.max_inflight_requests /
823 DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_INTERSECTION,
824 );
825 let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
826 DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_INTERSECTION,
827 );
828
829 if tx_fetcher_has_capacity && tx_pool_has_capacity {
830 None
832 } else {
833 let limit = DEFAULT_BUDGET_FIND_INTERSECTION_ANNOUNCED_BY_PEER_AND_PENDING_FETCH;
835
836 trace!(target: "net::tx",
837 inflight_requests=self.inflight_requests.len(),
838 max_inflight_transaction_requests=self.info.max_inflight_requests,
839 hashes_pending_fetch=self.hashes_pending_fetch.len(),
840 limit=limit,
841 "search breadth limited in search for intersection of hashes announced by peer and hashes pending fetch"
842 );
843
844 Some(limit)
845 }
846 }
847
848 pub const fn approx_capacity_get_pooled_transactions_req(
851 &self,
852 announcement_version: EthVersion,
853 ) -> usize {
854 if announcement_version.is_eth68() {
855 approx_capacity_get_pooled_transactions_req_eth68(&self.info)
856 } else {
857 approx_capacity_get_pooled_transactions_req_eth66()
858 }
859 }
860
861 pub fn on_resolved_get_pooled_transactions_request_fut(
865 &mut self,
866 response: GetPooledTxResponse<N::PooledTransaction>,
867 ) -> FetchEvent<N::PooledTransaction> {
868 let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response;
871
872 self.decrement_inflight_request_count_for(&peer_id);
873
874 match result {
875 Ok(Ok(transactions)) => {
876 if transactions.is_empty() {
881 trace!(target: "net::tx",
882 peer_id=format!("{peer_id:#}"),
883 requested_hashes_len=requested_hashes.len(),
884 "received empty `PooledTransactions` response from peer, peer failed to serve hashes it announced"
885 );
886
887 return FetchEvent::EmptyResponse { peer_id }
888 }
889
890 let payload = UnverifiedPooledTransactions::new(transactions);
894
895 let unverified_len = payload.len();
896 let (verification_outcome, verified_payload) =
897 payload.verify(&requested_hashes, &peer_id);
898
899 let unsolicited = unverified_len - verified_payload.len();
900 if unsolicited > 0 {
901 self.metrics.unsolicited_transactions.increment(unsolicited as u64);
902 }
903 if verification_outcome == VerificationOutcome::ReportPeer {
904 trace!(target: "net::tx",
906 peer_id=format!("{peer_id:#}"),
907 unverified_len,
908 verified_payload_len=verified_payload.len(),
909 "received `PooledTransactions` response from peer with entries that didn't verify against request, filtered out transactions"
910 );
911 }
912 if verified_payload.is_empty() {
914 return FetchEvent::FetchError { peer_id, error: RequestError::BadResponse }
915 }
916
917 let unvalidated_payload_len = verified_payload.len();
921
922 let (validation_outcome, valid_payload) =
923 self.filter_valid_message.partially_filter_valid_entries(verified_payload);
924
925 if validation_outcome == FilterOutcome::ReportPeer {
931 trace!(target: "net::tx",
932 peer_id=format!("{peer_id:#}"),
933 unvalidated_payload_len,
934 valid_payload_len=valid_payload.len(),
935 "received invalid `PooledTransactions` response from peer, filtered out duplicate entries"
936 );
937 }
938 let requested_hashes_len = requested_hashes.len();
946 let mut fetched = Vec::with_capacity(valid_payload.len());
947 requested_hashes.retain(|requested_hash| {
948 if valid_payload.contains_key(requested_hash) {
949 fetched.push(*requested_hash);
951 return false
952 }
953 true
954 });
955 fetched.shrink_to_fit();
956 self.metrics.fetched_transactions.increment(fetched.len() as u64);
957
958 if fetched.len() < requested_hashes_len {
959 trace!(target: "net::tx",
960 peer_id=format!("{peer_id:#}"),
961 requested_hashes_len=requested_hashes_len,
962 fetched_len=fetched.len(),
963 "peer failed to serve hashes it announced"
964 );
965 }
966
967 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
971
972 let transactions = valid_payload.into_data().into_values().collect();
973
974 FetchEvent::TransactionsFetched { peer_id, transactions }
975 }
976 Ok(Err(req_err)) => {
977 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
978 FetchEvent::FetchError { peer_id, error: req_err }
979 }
980 Err(_) => {
981 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
982 FetchEvent::FetchError { peer_id, error: RequestError::ChannelClosed }
984 }
985 }
986 }
987}
988
989impl<N: NetworkPrimitives> Stream for TransactionFetcher<N> {
990 type Item = FetchEvent<N::PooledTransaction>;
991
992 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
994 if self.inflight_requests.is_empty() {
997 return Poll::Pending
998 }
999
1000 if let Some(resp) = ready!(self.inflight_requests.poll_next_unpin(cx)) {
1001 return Poll::Ready(Some(self.on_resolved_get_pooled_transactions_request_fut(resp)))
1002 }
1003
1004 Poll::Pending
1005 }
1006}
1007
1008impl<T: NetworkPrimitives> Default for TransactionFetcher<T> {
1009 fn default() -> Self {
1010 Self {
1011 active_peers: LruMap::new(DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS),
1012 inflight_requests: Default::default(),
1013 hashes_pending_fetch: LruCache::new(DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH),
1014 hashes_fetch_inflight_and_pending_fetch: LruMap::new(
1015 DEFAULT_MAX_CAPACITY_CACHE_INFLIGHT_AND_PENDING_FETCH,
1016 ),
1017 filter_valid_message: Default::default(),
1018 info: TransactionFetcherInfo::default(),
1019 metrics: Default::default(),
1020 }
1021 }
1022}
1023
1024#[derive(Debug, Constructor)]
1026pub struct TxFetchMetadata {
1027 retries: u8,
1029 fallback_peers: LruCache<PeerId>,
1031 tx_encoded_length: Option<usize>,
1036}
1037
1038impl TxFetchMetadata {
1039 pub const fn fallback_peers_mut(&mut self) -> &mut LruCache<PeerId> {
1041 &mut self.fallback_peers
1042 }
1043
1044 pub const fn tx_encoded_len(&self) -> Option<usize> {
1049 self.tx_encoded_length
1050 }
1051}
1052
1053#[derive(Debug)]
1055pub enum FetchEvent<T = PooledTransaction> {
1056 TransactionsFetched {
1058 peer_id: PeerId,
1060 transactions: PooledTransactions<T>,
1062 },
1063 FetchError {
1065 peer_id: PeerId,
1067 error: RequestError,
1069 },
1070 EmptyResponse {
1072 peer_id: PeerId,
1074 },
1075}
1076
1077#[derive(Debug)]
1079pub struct GetPooledTxRequest<T = PooledTransaction> {
1080 peer_id: PeerId,
1081 requested_hashes: RequestTxHashes,
1083 response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1084}
1085
1086#[derive(Debug)]
1089pub struct GetPooledTxResponse<T = PooledTransaction> {
1090 peer_id: PeerId,
1091 requested_hashes: RequestTxHashes,
1094 result: Result<RequestResult<PooledTransactions<T>>, RecvError>,
1095}
1096
1097#[must_use = "futures do nothing unless polled"]
1100#[pin_project::pin_project]
1101#[derive(Debug)]
1102pub struct GetPooledTxRequestFut<T = PooledTransaction> {
1103 #[pin]
1104 inner: Option<GetPooledTxRequest<T>>,
1105}
1106
1107impl<T> GetPooledTxRequestFut<T> {
1108 #[inline]
1109 const fn new(
1110 peer_id: PeerId,
1111 requested_hashes: RequestTxHashes,
1112 response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1113 ) -> Self {
1114 Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) }
1115 }
1116}
1117
1118impl<T> Future for GetPooledTxRequestFut<T> {
1119 type Output = GetPooledTxResponse<T>;
1120
1121 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1122 let mut req = self.as_mut().project().inner.take().expect("polled after completion");
1123 match req.response.poll_unpin(cx) {
1124 Poll::Ready(result) => Poll::Ready(GetPooledTxResponse {
1125 peer_id: req.peer_id,
1126 requested_hashes: req.requested_hashes,
1127 result,
1128 }),
1129 Poll::Pending => {
1130 self.project().inner.set(Some(req));
1131 Poll::Pending
1132 }
1133 }
1134 }
1135}
1136
1137#[derive(Debug, Constructor, Deref)]
1139pub struct UnverifiedPooledTransactions<T> {
1140 txns: PooledTransactions<T>,
1141}
1142
1143#[derive(Debug, Constructor, Deref)]
1145pub struct VerifiedPooledTransactions<T> {
1146 txns: PooledTransactions<T>,
1147}
1148
1149impl<T: SignedTransaction> DedupPayload for VerifiedPooledTransactions<T> {
1150 type Value = T;
1151
1152 fn is_empty(&self) -> bool {
1153 self.txns.is_empty()
1154 }
1155
1156 fn len(&self) -> usize {
1157 self.txns.len()
1158 }
1159
1160 fn dedup(self) -> PartiallyValidData<Self::Value> {
1161 PartiallyValidData::from_raw_data(
1162 self.txns.into_iter().map(|tx| (*tx.tx_hash(), tx)).collect(),
1163 None,
1164 )
1165 }
1166}
1167
1168trait VerifyPooledTransactionsResponse {
1169 type Transaction: SignedTransaction;
1170
1171 fn verify(
1172 self,
1173 requested_hashes: &RequestTxHashes,
1174 peer_id: &PeerId,
1175 ) -> (VerificationOutcome, VerifiedPooledTransactions<Self::Transaction>);
1176}
1177
1178impl<T: SignedTransaction> VerifyPooledTransactionsResponse for UnverifiedPooledTransactions<T> {
1179 type Transaction = T;
1180
1181 fn verify(
1182 self,
1183 requested_hashes: &RequestTxHashes,
1184 _peer_id: &PeerId,
1185 ) -> (VerificationOutcome, VerifiedPooledTransactions<T>) {
1186 let mut verification_outcome = VerificationOutcome::Ok;
1187
1188 let Self { mut txns } = self;
1189
1190 #[cfg(debug_assertions)]
1191 let mut tx_hashes_not_requested: smallvec::SmallVec<[TxHash; 16]> = smallvec::smallvec!();
1192 #[cfg(not(debug_assertions))]
1193 let mut tx_hashes_not_requested_count = 0;
1194
1195 txns.0.retain(|tx| {
1196 if !requested_hashes.contains(tx.tx_hash()) {
1197 verification_outcome = VerificationOutcome::ReportPeer;
1198
1199 #[cfg(debug_assertions)]
1200 tx_hashes_not_requested.push(*tx.tx_hash());
1201 #[cfg(not(debug_assertions))]
1202 {
1203 tx_hashes_not_requested_count += 1;
1204 }
1205
1206 return false
1207 }
1208 true
1209 });
1210
1211 #[cfg(debug_assertions)]
1212 if !tx_hashes_not_requested.is_empty() {
1213 trace!(target: "net::tx",
1214 peer_id=format!("{_peer_id:#}"),
1215 ?tx_hashes_not_requested,
1216 "transactions in `PooledTransactions` response from peer were not requested"
1217 );
1218 }
1219 #[cfg(not(debug_assertions))]
1220 if tx_hashes_not_requested_count != 0 {
1221 trace!(target: "net::tx",
1222 peer_id=format!("{_peer_id:#}"),
1223 tx_hashes_not_requested_count,
1224 "transactions in `PooledTransactions` response from peer were not requested"
1225 );
1226 }
1227
1228 (verification_outcome, VerifiedPooledTransactions::new(txns))
1229 }
1230}
1231
1232#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1235pub enum VerificationOutcome {
1236 Ok,
1238 ReportPeer,
1241}
1242
1243#[derive(Debug, Constructor)]
1245pub struct TransactionFetcherInfo {
1246 pub max_inflight_requests: usize,
1248 pub max_inflight_requests_per_peer: u8,
1250 pub soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
1254 pub soft_limit_byte_size_pooled_transactions_response: usize,
1257 pub max_capacity_cache_txns_pending_fetch: u32,
1261}
1262
1263impl Default for TransactionFetcherInfo {
1264 fn default() -> Self {
1265 Self::new(
1266 DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS as usize,
1267 DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
1268 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
1269 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
1270 DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
1271 )
1272 }
1273}
1274
1275impl From<TransactionFetcherConfig> for TransactionFetcherInfo {
1276 fn from(config: TransactionFetcherConfig) -> Self {
1277 let TransactionFetcherConfig {
1278 max_inflight_requests,
1279 max_inflight_requests_per_peer,
1280 soft_limit_byte_size_pooled_transactions_response,
1281 soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1282 max_capacity_cache_txns_pending_fetch,
1283 } = config;
1284
1285 Self::new(
1286 max_inflight_requests as usize,
1287 max_inflight_requests_per_peer,
1288 soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1289 soft_limit_byte_size_pooled_transactions_response,
1290 max_capacity_cache_txns_pending_fetch,
1291 )
1292 }
1293}
1294
1295#[derive(Debug, Default)]
1296struct TxFetcherSearchDurations {
1297 find_idle_peer: Duration,
1298 fill_request: Duration,
1299}
1300
1301#[cfg(test)]
1302mod test {
1303 use super::*;
1304 use crate::test_utils::transactions::{buffer_hash_to_tx_fetcher, new_mock_session};
1305 use alloy_primitives::{hex, B256};
1306 use alloy_rlp::Decodable;
1307 use derive_more::IntoIterator;
1308 use reth_ethereum_primitives::TransactionSigned;
1309 use std::{collections::HashSet, str::FromStr};
1310
1311 #[derive(IntoIterator)]
1312 struct TestValidAnnouncementData(Vec<(TxHash, Option<(u8, usize)>)>);
1313
1314 impl HandleMempoolData for TestValidAnnouncementData {
1315 fn is_empty(&self) -> bool {
1316 self.0.is_empty()
1317 }
1318
1319 fn len(&self) -> usize {
1320 self.0.len()
1321 }
1322
1323 fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
1324 self.0.retain(|(hash, _)| f(hash))
1325 }
1326 }
1327
1328 impl HandleVersionedMempoolData for TestValidAnnouncementData {
1329 fn msg_version(&self) -> EthVersion {
1330 EthVersion::Eth68
1331 }
1332 }
1333
1334 #[test]
1335 fn pack_eth68_request() {
1336 reth_tracing::init_test_tracing();
1337
1338 let tx_fetcher = &mut TransactionFetcher::<EthNetworkPrimitives>::default();
1341
1342 let eth68_hashes = [
1343 B256::from_slice(&[1; 32]),
1344 B256::from_slice(&[2; 32]),
1345 B256::from_slice(&[3; 32]),
1346 B256::from_slice(&[4; 32]),
1347 B256::from_slice(&[5; 32]),
1348 ];
1349 let eth68_sizes = [
1350 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ - MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED - 1, DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ, 2, 9,
1354 0,
1355 ];
1356
1357 let expected_request_hashes =
1358 [eth68_hashes[0], eth68_hashes[2]].into_iter().collect::<HashSet<_>>();
1359
1360 let expected_surplus_hashes =
1361 [eth68_hashes[1], eth68_hashes[3], eth68_hashes[4]].into_iter().collect::<HashSet<_>>();
1362
1363 let mut eth68_hashes_to_request = RequestTxHashes::with_capacity(3);
1364
1365 let valid_announcement_data = TestValidAnnouncementData(
1366 eth68_hashes
1367 .into_iter()
1368 .zip(eth68_sizes)
1369 .map(|(hash, size)| (hash, Some((0u8, size))))
1370 .collect::<Vec<_>>(),
1371 );
1372
1373 let surplus_eth68_hashes =
1376 tx_fetcher.pack_request_eth68(&mut eth68_hashes_to_request, valid_announcement_data);
1377
1378 let eth68_hashes_to_request = eth68_hashes_to_request.into_iter().collect::<HashSet<_>>();
1379 let surplus_eth68_hashes = surplus_eth68_hashes.into_iter().collect::<HashSet<_>>();
1380
1381 assert_eq!(expected_request_hashes, eth68_hashes_to_request);
1382 assert_eq!(expected_surplus_hashes, surplus_eth68_hashes);
1383 }
1384
1385 #[tokio::test]
1386 async fn test_on_fetch_pending_hashes() {
1387 reth_tracing::init_test_tracing();
1388
1389 let tx_fetcher = &mut TransactionFetcher::default();
1390
1391 let seen_hashes = [
1395 B256::from_slice(&[1; 32]),
1396 B256::from_slice(&[2; 32]),
1397 B256::from_slice(&[3; 32]),
1398 B256::from_slice(&[4; 32]),
1399 ];
1400 let seen_eth68_hashes_sizes = [120, 158, 116];
1405
1406 let peer_1 = PeerId::new([1; 64]);
1408 let peer_2 = PeerId::new([2; 64]);
1410
1411 let (mut peer_1_data, mut peer_1_mock_session_rx) =
1415 new_mock_session(peer_1, EthVersion::Eth66);
1416 for hash in &seen_hashes {
1417 peer_1_data.seen_transactions.insert(*hash);
1418 }
1419 let (mut peer_2_data, _) = new_mock_session(peer_2, EthVersion::Eth66);
1420 for hash in &seen_hashes {
1421 peer_2_data.seen_transactions.insert(*hash);
1422 }
1423 let mut peers = HashMap::default();
1424 peers.insert(peer_1, peer_1_data);
1425 peers.insert(peer_2, peer_2_data);
1426
1427 for i in 0..3 {
1429 buffer_hash_to_tx_fetcher(
1431 tx_fetcher,
1432 seen_hashes[i],
1433 peer_2,
1434 0,
1435 Some(seen_eth68_hashes_sizes[i]),
1436 );
1437 }
1438 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[3], peer_2, 0, None);
1439
1440 let hash_other = B256::from_slice(&[5; 32]);
1442 buffer_hash_to_tx_fetcher(tx_fetcher, hash_other, peer_2, 0, None);
1443
1444 for hash in &seen_hashes {
1446 buffer_hash_to_tx_fetcher(tx_fetcher, *hash, peer_1, 0, None);
1447 }
1448
1449 assert_eq!(tx_fetcher.num_pending_hashes(), 5);
1451
1452 tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
1455
1456 let req = peer_1_mock_session_rx
1458 .recv()
1459 .await
1460 .expect("peer session should receive request with buffered hashes");
1461 let PeerRequest::GetPooledTransactions { request, .. } = req else { unreachable!() };
1462 let GetPooledTransactions(requested_hashes) = request;
1463
1464 assert_eq!(
1465 requested_hashes.into_iter().collect::<HashSet<_>>(),
1466 seen_hashes.into_iter().collect::<HashSet<_>>()
1467 )
1468 }
1469
1470 #[test]
1471 fn verify_response_hashes() {
1472 let input = hex!(
1473 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598daa"
1474 );
1475 let signed_tx_1: PooledTransaction =
1476 TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1477 let input = hex!(
1478 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
1479 );
1480 let signed_tx_2: PooledTransaction =
1481 TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1482
1483 let request_hashes = [
1485 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e67890")
1486 .unwrap(),
1487 *signed_tx_1.hash(),
1488 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e12345")
1489 .unwrap(),
1490 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4edabe3")
1491 .unwrap(),
1492 ];
1493
1494 for hash in &request_hashes {
1495 assert_ne!(hash, signed_tx_2.hash())
1496 }
1497
1498 let request_hashes = RequestTxHashes::new(request_hashes.into_iter().collect());
1499
1500 let response_txns = PooledTransactions(vec![signed_tx_1.clone(), signed_tx_2]);
1502 let payload = UnverifiedPooledTransactions::new(response_txns);
1503
1504 let (outcome, verified_payload) = payload.verify(&request_hashes, &PeerId::ZERO);
1505
1506 assert_eq!(VerificationOutcome::ReportPeer, outcome);
1507 assert_eq!(1, verified_payload.len());
1508 assert!(verified_payload.contains(&signed_tx_1));
1509 }
1510}