1use super::{
29 config::TransactionFetcherConfig,
30 constants::{tx_fetcher::*, SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST},
31 MessageFilter, PeerMetadata, PooledTransactions,
32 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
33};
34use crate::{
35 cache::{LruCache, LruMap},
36 duration_metered_exec,
37 metrics::TransactionFetcherMetrics,
38 transactions::{validation, PartiallyFilterMessage},
39};
40use alloy_primitives::TxHash;
41use derive_more::{Constructor, Deref};
42use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
43use pin_project::pin_project;
44use reth_eth_wire::{
45 DedupPayload, EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
46 PartiallyValidData, RequestTxHashes, ValidAnnouncementData,
47};
48use reth_eth_wire_types::{EthNetworkPrimitives, NetworkPrimitives};
49use reth_network_api::PeerRequest;
50use reth_network_p2p::error::{RequestError, RequestResult};
51use reth_network_peers::PeerId;
52use reth_primitives::PooledTransaction;
53use reth_primitives_traits::SignedTransaction;
54use schnellru::ByLength;
55#[cfg(debug_assertions)]
56use smallvec::{smallvec, SmallVec};
57use std::{
58 collections::HashMap,
59 pin::Pin,
60 task::{ready, Context, Poll},
61 time::Duration,
62};
63use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
64use tracing::trace;
65use validation::FilterOutcome;
66
67#[derive(Debug)]
72#[pin_project]
73pub struct TransactionFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
74 pub active_peers: LruMap<PeerId, u8, ByLength>,
76 #[pin]
82 pub inflight_requests: FuturesUnordered<GetPooledTxRequestFut<N::PooledTransaction>>,
83 pub hashes_pending_fetch: LruCache<TxHash>,
88 pub(super) hashes_fetch_inflight_and_pending_fetch: LruMap<TxHash, TxFetchMetadata, ByLength>,
90 pub(super) filter_valid_message: MessageFilter,
92 pub info: TransactionFetcherInfo,
94 #[doc(hidden)]
95 metrics: TransactionFetcherMetrics,
96}
97
98impl<N: NetworkPrimitives> TransactionFetcher<N> {
99 pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) {
101 self.active_peers.remove(peer_id);
102 }
103
104 #[inline]
106 pub fn update_metrics(&self) {
107 let metrics = &self.metrics;
108
109 metrics.inflight_transaction_requests.set(self.inflight_requests.len() as f64);
110
111 let hashes_pending_fetch = self.hashes_pending_fetch.len() as f64;
112 let total_hashes = self.hashes_fetch_inflight_and_pending_fetch.len() as f64;
113
114 metrics.hashes_pending_fetch.set(hashes_pending_fetch);
115 metrics.hashes_inflight_transaction_requests.set(total_hashes - hashes_pending_fetch);
116 }
117
118 #[inline]
119 fn update_pending_fetch_cache_search_metrics(&self, durations: TxFetcherSearchDurations) {
120 let metrics = &self.metrics;
121
122 let TxFetcherSearchDurations { find_idle_peer, fill_request } = durations;
123 metrics
124 .duration_find_idle_fallback_peer_for_any_pending_hash
125 .set(find_idle_peer.as_secs_f64());
126 metrics.duration_fill_request_from_hashes_pending_fetch.set(fill_request.as_secs_f64());
127 }
128
129 pub fn with_transaction_fetcher_config(config: &TransactionFetcherConfig) -> Self {
131 let TransactionFetcherConfig {
132 max_inflight_requests,
133 max_capacity_cache_txns_pending_fetch,
134 ..
135 } = *config;
136
137 let info = config.clone().into();
138
139 let metrics = TransactionFetcherMetrics::default();
140 metrics.capacity_inflight_requests.increment(max_inflight_requests as u64);
141
142 Self {
143 active_peers: LruMap::new(max_inflight_requests),
144 hashes_pending_fetch: LruCache::new(max_capacity_cache_txns_pending_fetch),
145 hashes_fetch_inflight_and_pending_fetch: LruMap::new(
146 max_inflight_requests + max_capacity_cache_txns_pending_fetch,
147 ),
148 info,
149 metrics,
150 ..Default::default()
151 }
152 }
153
154 #[inline]
156 pub fn remove_hashes_from_transaction_fetcher<I>(&mut self, hashes: I)
157 where
158 I: IntoIterator<Item = TxHash>,
159 {
160 for hash in hashes {
161 self.hashes_fetch_inflight_and_pending_fetch.remove(&hash);
162 self.hashes_pending_fetch.remove(&hash);
163 }
164 }
165
166 fn decrement_inflight_request_count_for(&mut self, peer_id: &PeerId) {
168 let remove = || -> bool {
169 if let Some(inflight_count) = self.active_peers.get(peer_id) {
170 *inflight_count = inflight_count.saturating_sub(1);
171 if *inflight_count == 0 {
172 return true
173 }
174 }
175 false
176 }();
177
178 if remove {
179 self.active_peers.remove(peer_id);
180 }
181 }
182
183 #[inline]
185 pub fn is_idle(&self, peer_id: &PeerId) -> bool {
186 let Some(inflight_count) = self.active_peers.peek(peer_id) else { return true };
187 if *inflight_count < self.info.max_inflight_requests_per_peer {
188 return true
189 }
190 false
191 }
192
193 pub fn get_idle_peer_for(&self, hash: TxHash) -> Option<&PeerId> {
195 let TxFetchMetadata { fallback_peers, .. } =
196 self.hashes_fetch_inflight_and_pending_fetch.peek(&hash)?;
197
198 for peer_id in fallback_peers.iter() {
199 if self.is_idle(peer_id) {
200 return Some(peer_id)
201 }
202 }
203
204 None
205 }
206
207 pub fn find_any_idle_fallback_peer_for_any_pending_hash(
213 &mut self,
214 hashes_to_request: &mut RequestTxHashes,
215 mut budget: Option<usize>, ) -> Option<PeerId> {
217 let mut hashes_pending_fetch_iter = self.hashes_pending_fetch.iter();
218
219 let idle_peer = loop {
220 let &hash = hashes_pending_fetch_iter.next()?;
221
222 let idle_peer = self.get_idle_peer_for(hash);
223
224 if idle_peer.is_some() {
225 hashes_to_request.insert(hash);
226 break idle_peer.copied()
227 }
228
229 if let Some(ref mut bud) = budget {
230 *bud = bud.saturating_sub(1);
231 if *bud == 0 {
232 return None
233 }
234 }
235 };
236 let hash = hashes_to_request.iter().next()?;
237
238 drop(hashes_pending_fetch_iter);
240 _ = self.hashes_pending_fetch.remove(hash);
241
242 idle_peer
243 }
244
245 pub fn pack_request(
250 &self,
251 hashes_to_request: &mut RequestTxHashes,
252 hashes_from_announcement: ValidAnnouncementData,
253 ) -> RequestTxHashes {
254 if hashes_from_announcement.msg_version().is_eth68() {
255 return self.pack_request_eth68(hashes_to_request, hashes_from_announcement)
256 }
257 self.pack_request_eth66(hashes_to_request, hashes_from_announcement)
258 }
259
260 pub fn pack_request_eth68(
271 &self,
272 hashes_to_request: &mut RequestTxHashes,
273 hashes_from_announcement: impl HandleMempoolData
274 + IntoIterator<Item = (TxHash, Option<(u8, usize)>)>,
275 ) -> RequestTxHashes {
276 let mut acc_size_response = 0;
277
278 let mut hashes_from_announcement_iter = hashes_from_announcement.into_iter();
279
280 if let Some((hash, Some((_ty, size)))) = hashes_from_announcement_iter.next() {
281 hashes_to_request.insert(hash);
282
283 if size >= self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request {
285 return hashes_from_announcement_iter.collect()
286 }
287 acc_size_response = size;
288 }
289
290 let mut surplus_hashes = RequestTxHashes::default();
291
292 loop {
295 let Some((hash, metadata)) = hashes_from_announcement_iter.next() else { break };
296
297 let Some((_ty, size)) = metadata else {
298 unreachable!("this method is called upon reception of an eth68 announcement")
299 };
300
301 let next_acc_size = acc_size_response + size;
302
303 if next_acc_size <=
304 self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request
305 {
306 acc_size_response = next_acc_size;
309 _ = hashes_to_request.insert(hash)
310 } else {
311 _ = surplus_hashes.insert(hash)
312 }
313
314 let free_space =
315 self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request -
316 acc_size_response;
317
318 if free_space < MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED {
319 break
320 }
321 }
322
323 surplus_hashes.extend(hashes_from_announcement_iter.map(|(hash, _metadata)| hash));
324
325 surplus_hashes
326 }
327
328 pub fn pack_request_eth66(
335 &self,
336 hashes_to_request: &mut RequestTxHashes,
337 hashes_from_announcement: ValidAnnouncementData,
338 ) -> RequestTxHashes {
339 let (mut hashes, _version) = hashes_from_announcement.into_request_hashes();
340 if hashes.len() <= SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST {
341 *hashes_to_request = hashes;
342 hashes_to_request.shrink_to_fit();
343
344 RequestTxHashes::default()
345 } else {
346 let surplus_hashes =
347 hashes.retain_count(SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST);
348 *hashes_to_request = hashes;
349 hashes_to_request.shrink_to_fit();
350
351 surplus_hashes
352 }
353 }
354
355 pub fn try_buffer_hashes_for_retry(
357 &mut self,
358 mut hashes: RequestTxHashes,
359 peer_failed_to_serve: &PeerId,
360 ) {
361 hashes.retain(|hash| {
364 if let Some(entry) = self.hashes_fetch_inflight_and_pending_fetch.get(hash) {
365 entry.fallback_peers_mut().remove(peer_failed_to_serve);
366 return true
367 }
368 false
370 });
371
372 self.buffer_hashes(hashes, None)
373 }
374
375 pub fn buffer_hashes(&mut self, hashes: RequestTxHashes, fallback_peer: Option<PeerId>) {
380 for hash in hashes {
381 if self.hashes_fetch_inflight_and_pending_fetch.peek(&hash).is_none() {
383 continue
384 }
385
386 let Some(TxFetchMetadata { retries, fallback_peers, .. }) =
387 self.hashes_fetch_inflight_and_pending_fetch.get(&hash)
388 else {
389 return
390 };
391
392 if let Some(peer_id) = fallback_peer {
393 fallback_peers.insert(peer_id);
395 } else {
396 if *retries >= DEFAULT_MAX_RETRIES {
397 trace!(target: "net::tx",
398 %hash,
399 retries,
400 "retry limit for `GetPooledTransactions` requests reached for hash, dropping hash"
401 );
402
403 self.hashes_fetch_inflight_and_pending_fetch.remove(&hash);
404 self.hashes_pending_fetch.remove(&hash);
405 continue
406 }
407 *retries += 1;
408 }
409
410 if let (_, Some(evicted_hash)) = self.hashes_pending_fetch.insert_and_get_evicted(hash)
411 {
412 self.hashes_fetch_inflight_and_pending_fetch.remove(&evicted_hash);
413 self.hashes_pending_fetch.remove(&evicted_hash);
414 }
415 }
416 }
417
418 pub fn on_fetch_pending_hashes(
423 &mut self,
424 peers: &HashMap<PeerId, PeerMetadata<N>>,
425 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
426 ) {
427 let mut hashes_to_request = RequestTxHashes::with_capacity(
428 DEFAULT_MARGINAL_COUNT_HASHES_GET_POOLED_TRANSACTIONS_REQUEST,
429 );
430 let mut search_durations = TxFetcherSearchDurations::default();
431
432 let budget_find_idle_fallback_peer = self
434 .search_breadth_budget_find_idle_fallback_peer(&has_capacity_wrt_pending_pool_imports);
435
436 let peer_id = duration_metered_exec!(
437 {
438 let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash(
439 &mut hashes_to_request,
440 budget_find_idle_fallback_peer,
441 ) else {
442 return
444 };
445
446 peer_id
447 },
448 search_durations.find_idle_peer
449 );
450
451 let Some(peer) = peers.get(&peer_id) else { return };
453 let conn_eth_version = peer.version;
454
455 let budget_fill_request = self
460 .search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
461 &has_capacity_wrt_pending_pool_imports,
462 );
463
464 duration_metered_exec!(
465 {
466 self.fill_request_from_hashes_pending_fetch(
467 &mut hashes_to_request,
468 &peer.seen_transactions,
469 budget_fill_request,
470 )
471 },
472 search_durations.fill_request
473 );
474
475 self.update_pending_fetch_cache_search_metrics(search_durations);
476
477 trace!(target: "net::tx",
478 peer_id=format!("{peer_id:#}"),
479 hashes=?*hashes_to_request,
480 %conn_eth_version,
481 "requesting hashes that were stored pending fetch from peer"
482 );
483
484 if let Some(failed_to_request_hashes) =
486 self.request_transactions_from_peer(hashes_to_request, peer)
487 {
488 trace!(target: "net::tx",
489 peer_id=format!("{peer_id:#}"),
490 ?failed_to_request_hashes,
491 %conn_eth_version,
492 "failed sending request to peer's session, buffering hashes"
493 );
494
495 self.buffer_hashes(failed_to_request_hashes, Some(peer_id));
496 }
497 }
498
499 pub fn filter_unseen_and_pending_hashes(
502 &mut self,
503 new_announced_hashes: &mut ValidAnnouncementData,
504 is_tx_bad_import: impl Fn(&TxHash) -> bool,
505 peer_id: &PeerId,
506 client_version: &str,
507 ) {
508 let mut previously_unseen_hashes_count = 0;
509
510 let msg_version = new_announced_hashes.msg_version();
511
512 new_announced_hashes.retain(|hash, metadata| {
514
515 if let Some(TxFetchMetadata{ tx_encoded_length: ref mut previously_seen_size, ..}) = self.hashes_fetch_inflight_and_pending_fetch.peek_mut(hash) {
517 if let Some((_ty, size)) = metadata {
519 if let Some(prev_size) = previously_seen_size {
520 if size != prev_size {
522 trace!(target: "net::tx",
523 peer_id=format!("{peer_id:#}"),
524 %hash,
525 size,
526 previously_seen_size,
527 %client_version,
528 "peer announced a different size for tx, this is especially worrying if one size is much bigger..."
529 );
530 }
531 }
532 *previously_seen_size = Some(*size);
534 }
535
536 if self.hashes_pending_fetch.remove(hash) {
538 return true
539 }
540
541 return false
542 }
543
544 if is_tx_bad_import(hash) {
547 return false
548 }
549
550 previously_unseen_hashes_count += 1;
551
552 if self.hashes_fetch_inflight_and_pending_fetch.get_or_insert(*hash, ||
553 TxFetchMetadata{retries: 0, fallback_peers: LruCache::new(DEFAULT_MAX_COUNT_FALLBACK_PEERS as u32), tx_encoded_length: None}
554 ).is_none() {
555
556 trace!(target: "net::tx",
557 peer_id=format!("{peer_id:#}"),
558 %hash,
559 ?msg_version,
560 %client_version,
561 "failed to cache new announced hash from peer in schnellru::LruMap, dropping hash"
562 );
563
564 return false
565 }
566 true
567 });
568
569 trace!(target: "net::tx",
570 peer_id=format!("{peer_id:#}"),
571 previously_unseen_hashes_count=previously_unseen_hashes_count,
572 msg_version=?msg_version,
573 client_version=%client_version,
574 "received previously unseen hashes in announcement from peer"
575 );
576 }
577
578 pub fn request_transactions_from_peer(
586 &mut self,
587 new_announced_hashes: RequestTxHashes,
588 peer: &PeerMetadata<N>,
589 ) -> Option<RequestTxHashes> {
590 let peer_id: PeerId = peer.request_tx.peer_id;
591 let conn_eth_version = peer.version;
592
593 if self.active_peers.len() >= self.info.max_inflight_requests {
594 trace!(target: "net::tx",
595 peer_id=format!("{peer_id:#}"),
596 hashes=?*new_announced_hashes,
597 %conn_eth_version,
598 max_inflight_transaction_requests=self.info.max_inflight_requests,
599 "limit for concurrent `GetPooledTransactions` requests reached, dropping request for hashes to peer"
600 );
601 return Some(new_announced_hashes)
602 }
603
604 let Some(inflight_count) = self.active_peers.get_or_insert(peer_id, || 0) else {
605 trace!(target: "net::tx",
606 peer_id=format!("{peer_id:#}"),
607 hashes=?*new_announced_hashes,
608 conn_eth_version=%conn_eth_version,
609 "failed to cache active peer in schnellru::LruMap, dropping request to peer"
610 );
611 return Some(new_announced_hashes)
612 };
613
614 if *inflight_count >= self.info.max_inflight_requests_per_peer {
615 trace!(target: "net::tx",
616 peer_id=format!("{peer_id:#}"),
617 hashes=?*new_announced_hashes,
618 %conn_eth_version,
619 max_concurrent_tx_reqs_per_peer=self.info.max_inflight_requests_per_peer,
620 "limit for concurrent `GetPooledTransactions` requests per peer reached"
621 );
622 return Some(new_announced_hashes)
623 }
624
625 #[cfg(debug_assertions)]
626 {
627 for hash in &new_announced_hashes {
628 if self.hashes_pending_fetch.contains(hash) {
629 tracing::debug!(target: "net::tx", "`{}` should have been taken out of buffer before packing in a request, breaks invariant `@hashes_pending_fetch` and `@inflight_requests`, `@hashes_fetch_inflight_and_pending_fetch` for `{}`: {:?}",
630 format!("{:?}", new_announced_hashes), format!("{:?}", new_announced_hashes),
632 new_announced_hashes.iter().map(|hash| {
633 let metadata = self.hashes_fetch_inflight_and_pending_fetch.get(hash);
634 (*hash, metadata.map(|m| (m.retries, m.tx_encoded_length)))
636 }).collect::<Vec<(TxHash, Option<(u8, Option<usize>)>)>>())
637 }
638 }
639 }
640
641 let (response, rx) = oneshot::channel();
642 let req = PeerRequest::GetPooledTransactions {
643 request: GetPooledTransactions(new_announced_hashes.iter().copied().collect()),
644 response,
645 };
646
647 if let Err(err) = peer.request_tx.try_send(req) {
649 return match err {
651 TrySendError::Full(_) | TrySendError::Closed(_) => {
652 self.metrics.egress_peer_channel_full.increment(1);
653 Some(new_announced_hashes)
654 }
655 }
656 }
657
658 *inflight_count += 1;
659 self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, new_announced_hashes, rx));
661
662 None
663 }
664
665 pub fn fill_request_from_hashes_pending_fetch(
685 &mut self,
686 hashes_to_request: &mut RequestTxHashes,
687 seen_hashes: &LruCache<TxHash>,
688 mut budget_fill_request: Option<usize>, ) {
690 let Some(hash) = hashes_to_request.iter().next() else { return };
691
692 let mut acc_size_response = self
693 .hashes_fetch_inflight_and_pending_fetch
694 .get(hash)
695 .and_then(|entry| entry.tx_encoded_len())
696 .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
697
698 if acc_size_response >=
700 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES
701 {
702 return
703 }
704
705 for hash in self.hashes_pending_fetch.iter() {
708 if !seen_hashes.contains(hash) {
710 continue
711 };
712
713 hashes_to_request.insert(*hash);
715
716 let size = self
718 .hashes_fetch_inflight_and_pending_fetch
719 .get(hash)
720 .and_then(|entry| entry.tx_encoded_len())
721 .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
722
723 acc_size_response += size;
724
725 if acc_size_response >=
729 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES ||
730 hashes_to_request.len() >
731 DEFAULT_SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST_ON_FETCH_PENDING_HASHES
732 {
733 break
734 }
735
736 if let Some(ref mut bud) = budget_fill_request {
737 *bud -= 1;
738 if *bud == 0 {
739 break
740 }
741 }
742 }
743
744 for hash in hashes_to_request.iter() {
746 self.hashes_pending_fetch.remove(hash);
747 }
748 }
749
750 pub fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
753 let info = &self.info;
754
755 self.has_capacity(info.max_inflight_requests)
756 }
757
758 fn has_capacity(&self, max_inflight_requests: usize) -> bool {
760 self.inflight_requests.len() <= max_inflight_requests
761 }
762
763 pub fn search_breadth_budget_find_idle_fallback_peer(
769 &self,
770 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
771 ) -> Option<usize> {
772 let info = &self.info;
773
774 let tx_fetcher_has_capacity = self.has_capacity(
775 info.max_inflight_requests /
776 DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_IDLE_PEER,
777 );
778 let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
779 DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_IDLE_PEER,
780 );
781
782 if tx_fetcher_has_capacity && tx_pool_has_capacity {
783 None
785 } else {
786 let limit = DEFAULT_BUDGET_FIND_IDLE_FALLBACK_PEER;
788
789 trace!(target: "net::tx",
790 inflight_requests=self.inflight_requests.len(),
791 max_inflight_transaction_requests=info.max_inflight_requests,
792 hashes_pending_fetch=self.hashes_pending_fetch.len(),
793 limit,
794 "search breadth limited in search for idle fallback peer for some hash pending fetch"
795 );
796
797 Some(limit)
798 }
799 }
800
801 pub fn search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
808 &self,
809 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
810 ) -> Option<usize> {
811 let info = &self.info;
812
813 let tx_fetcher_has_capacity = self.has_capacity(
814 info.max_inflight_requests /
815 DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_INTERSECTION,
816 );
817 let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
818 DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_INTERSECTION,
819 );
820
821 if tx_fetcher_has_capacity && tx_pool_has_capacity {
822 None
824 } else {
825 let limit = DEFAULT_BUDGET_FIND_INTERSECTION_ANNOUNCED_BY_PEER_AND_PENDING_FETCH;
827
828 trace!(target: "net::tx",
829 inflight_requests=self.inflight_requests.len(),
830 max_inflight_transaction_requests=self.info.max_inflight_requests,
831 hashes_pending_fetch=self.hashes_pending_fetch.len(),
832 limit=limit,
833 "search breadth limited in search for intersection of hashes announced by peer and hashes pending fetch"
834 );
835
836 Some(limit)
837 }
838 }
839
840 pub const fn approx_capacity_get_pooled_transactions_req(
843 &self,
844 announcement_version: EthVersion,
845 ) -> usize {
846 if announcement_version.is_eth68() {
847 approx_capacity_get_pooled_transactions_req_eth68(&self.info)
848 } else {
849 approx_capacity_get_pooled_transactions_req_eth66()
850 }
851 }
852
853 pub fn on_resolved_get_pooled_transactions_request_fut(
857 &mut self,
858 response: GetPooledTxResponse<N::PooledTransaction>,
859 ) -> FetchEvent<N::PooledTransaction> {
860 let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response;
863
864 self.decrement_inflight_request_count_for(&peer_id);
865
866 match result {
867 Ok(Ok(transactions)) => {
868 if transactions.is_empty() {
873 trace!(target: "net::tx",
874 peer_id=format!("{peer_id:#}"),
875 requested_hashes_len=requested_hashes.len(),
876 "received empty `PooledTransactions` response from peer, peer failed to serve hashes it announced"
877 );
878
879 return FetchEvent::EmptyResponse { peer_id }
880 }
881
882 let payload = UnverifiedPooledTransactions::new(transactions);
886
887 let unverified_len = payload.len();
888 let (verification_outcome, verified_payload) =
889 payload.verify(&requested_hashes, &peer_id);
890
891 let unsolicited = unverified_len - verified_payload.len();
892 if unsolicited > 0 {
893 self.metrics.unsolicited_transactions.increment(unsolicited as u64);
894 }
895 if verification_outcome == VerificationOutcome::ReportPeer {
896 trace!(target: "net::tx",
898 peer_id=format!("{peer_id:#}"),
899 unverified_len,
900 verified_payload_len=verified_payload.len(),
901 "received `PooledTransactions` response from peer with entries that didn't verify against request, filtered out transactions"
902 );
903 }
904 if verified_payload.is_empty() {
906 return FetchEvent::FetchError { peer_id, error: RequestError::BadResponse }
907 }
908
909 let unvalidated_payload_len = verified_payload.len();
913
914 let (validation_outcome, valid_payload) =
915 self.filter_valid_message.partially_filter_valid_entries(verified_payload);
916
917 if validation_outcome == FilterOutcome::ReportPeer {
923 trace!(target: "net::tx",
924 peer_id=format!("{peer_id:#}"),
925 unvalidated_payload_len,
926 valid_payload_len=valid_payload.len(),
927 "received invalid `PooledTransactions` response from peer, filtered out duplicate entries"
928 );
929 }
930 let requested_hashes_len = requested_hashes.len();
938 let mut fetched = Vec::with_capacity(valid_payload.len());
939 requested_hashes.retain(|requested_hash| {
940 if valid_payload.contains_key(requested_hash) {
941 fetched.push(*requested_hash);
943 return false
944 }
945 true
946 });
947 fetched.shrink_to_fit();
948 self.metrics.fetched_transactions.increment(fetched.len() as u64);
949
950 if fetched.len() < requested_hashes_len {
951 trace!(target: "net::tx",
952 peer_id=format!("{peer_id:#}"),
953 requested_hashes_len=requested_hashes_len,
954 fetched_len=fetched.len(),
955 "peer failed to serve hashes it announced"
956 );
957 }
958
959 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
963
964 let transactions = valid_payload.into_data().into_values().collect();
965
966 FetchEvent::TransactionsFetched { peer_id, transactions }
967 }
968 Ok(Err(req_err)) => {
969 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
970 FetchEvent::FetchError { peer_id, error: req_err }
971 }
972 Err(_) => {
973 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
974 FetchEvent::FetchError { peer_id, error: RequestError::ChannelClosed }
976 }
977 }
978 }
979}
980
981impl<N: NetworkPrimitives> Stream for TransactionFetcher<N> {
982 type Item = FetchEvent<N::PooledTransaction>;
983
984 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
986 if self.inflight_requests.is_empty() {
989 return Poll::Pending
990 }
991
992 if let Some(resp) = ready!(self.inflight_requests.poll_next_unpin(cx)) {
993 return Poll::Ready(Some(self.on_resolved_get_pooled_transactions_request_fut(resp)))
994 }
995
996 Poll::Pending
997 }
998}
999
1000impl<T: NetworkPrimitives> Default for TransactionFetcher<T> {
1001 fn default() -> Self {
1002 Self {
1003 active_peers: LruMap::new(DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS),
1004 inflight_requests: Default::default(),
1005 hashes_pending_fetch: LruCache::new(DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH),
1006 hashes_fetch_inflight_and_pending_fetch: LruMap::new(
1007 DEFAULT_MAX_CAPACITY_CACHE_INFLIGHT_AND_PENDING_FETCH,
1008 ),
1009 filter_valid_message: Default::default(),
1010 info: TransactionFetcherInfo::default(),
1011 metrics: Default::default(),
1012 }
1013 }
1014}
1015
1016#[derive(Debug, Constructor)]
1018pub struct TxFetchMetadata {
1019 retries: u8,
1021 fallback_peers: LruCache<PeerId>,
1023 tx_encoded_length: Option<usize>,
1028}
1029
1030impl TxFetchMetadata {
1031 pub fn fallback_peers_mut(&mut self) -> &mut LruCache<PeerId> {
1033 &mut self.fallback_peers
1034 }
1035
1036 pub const fn tx_encoded_len(&self) -> Option<usize> {
1041 self.tx_encoded_length
1042 }
1043}
1044
1045#[derive(Debug)]
1047pub enum FetchEvent<T = PooledTransaction> {
1048 TransactionsFetched {
1050 peer_id: PeerId,
1052 transactions: PooledTransactions<T>,
1054 },
1055 FetchError {
1057 peer_id: PeerId,
1059 error: RequestError,
1061 },
1062 EmptyResponse {
1064 peer_id: PeerId,
1066 },
1067}
1068
1069#[derive(Debug)]
1071pub struct GetPooledTxRequest<T = PooledTransaction> {
1072 peer_id: PeerId,
1073 requested_hashes: RequestTxHashes,
1075 response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1076}
1077
1078#[derive(Debug)]
1081pub struct GetPooledTxResponse<T = PooledTransaction> {
1082 peer_id: PeerId,
1083 requested_hashes: RequestTxHashes,
1086 result: Result<RequestResult<PooledTransactions<T>>, RecvError>,
1087}
1088
1089#[must_use = "futures do nothing unless polled"]
1092#[pin_project::pin_project]
1093#[derive(Debug)]
1094pub struct GetPooledTxRequestFut<T = PooledTransaction> {
1095 #[pin]
1096 inner: Option<GetPooledTxRequest<T>>,
1097}
1098
1099impl<T> GetPooledTxRequestFut<T> {
1100 #[inline]
1101 const fn new(
1102 peer_id: PeerId,
1103 requested_hashes: RequestTxHashes,
1104 response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1105 ) -> Self {
1106 Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) }
1107 }
1108}
1109
1110impl<T> Future for GetPooledTxRequestFut<T> {
1111 type Output = GetPooledTxResponse<T>;
1112
1113 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1114 let mut req = self.as_mut().project().inner.take().expect("polled after completion");
1115 match req.response.poll_unpin(cx) {
1116 Poll::Ready(result) => Poll::Ready(GetPooledTxResponse {
1117 peer_id: req.peer_id,
1118 requested_hashes: req.requested_hashes,
1119 result,
1120 }),
1121 Poll::Pending => {
1122 self.project().inner.set(Some(req));
1123 Poll::Pending
1124 }
1125 }
1126 }
1127}
1128
1129#[derive(Debug, Constructor, Deref)]
1131pub struct UnverifiedPooledTransactions<T> {
1132 txns: PooledTransactions<T>,
1133}
1134
1135#[derive(Debug, Constructor, Deref)]
1137pub struct VerifiedPooledTransactions<T> {
1138 txns: PooledTransactions<T>,
1139}
1140
1141impl<T: SignedTransaction> DedupPayload for VerifiedPooledTransactions<T> {
1142 type Value = T;
1143
1144 fn is_empty(&self) -> bool {
1145 self.txns.is_empty()
1146 }
1147
1148 fn len(&self) -> usize {
1149 self.txns.len()
1150 }
1151
1152 fn dedup(self) -> PartiallyValidData<Self::Value> {
1153 PartiallyValidData::from_raw_data(
1154 self.txns.into_iter().map(|tx| (*tx.tx_hash(), tx)).collect(),
1155 None,
1156 )
1157 }
1158}
1159
1160trait VerifyPooledTransactionsResponse {
1161 type Transaction: SignedTransaction;
1162
1163 fn verify(
1164 self,
1165 requested_hashes: &RequestTxHashes,
1166 peer_id: &PeerId,
1167 ) -> (VerificationOutcome, VerifiedPooledTransactions<Self::Transaction>);
1168}
1169
1170impl<T: SignedTransaction> VerifyPooledTransactionsResponse for UnverifiedPooledTransactions<T> {
1171 type Transaction = T;
1172
1173 fn verify(
1174 self,
1175 requested_hashes: &RequestTxHashes,
1176 _peer_id: &PeerId,
1177 ) -> (VerificationOutcome, VerifiedPooledTransactions<T>) {
1178 let mut verification_outcome = VerificationOutcome::Ok;
1179
1180 let Self { mut txns } = self;
1181
1182 #[cfg(debug_assertions)]
1183 let mut tx_hashes_not_requested: SmallVec<[TxHash; 16]> = smallvec!();
1184 #[cfg(not(debug_assertions))]
1185 let mut tx_hashes_not_requested_count = 0;
1186
1187 txns.0.retain(|tx| {
1188 if !requested_hashes.contains(tx.tx_hash()) {
1189 verification_outcome = VerificationOutcome::ReportPeer;
1190
1191 #[cfg(debug_assertions)]
1192 tx_hashes_not_requested.push(*tx.tx_hash());
1193 #[cfg(not(debug_assertions))]
1194 {
1195 tx_hashes_not_requested_count += 1;
1196 }
1197
1198 return false
1199 }
1200 true
1201 });
1202
1203 #[cfg(debug_assertions)]
1204 if !tx_hashes_not_requested.is_empty() {
1205 trace!(target: "net::tx",
1206 peer_id=format!("{_peer_id:#}"),
1207 ?tx_hashes_not_requested,
1208 "transactions in `PooledTransactions` response from peer were not requested"
1209 );
1210 }
1211 #[cfg(not(debug_assertions))]
1212 if tx_hashes_not_requested_count != 0 {
1213 trace!(target: "net::tx",
1214 peer_id=format!("{_peer_id:#}"),
1215 tx_hashes_not_requested_count,
1216 "transactions in `PooledTransactions` response from peer were not requested"
1217 );
1218 }
1219
1220 (verification_outcome, VerifiedPooledTransactions::new(txns))
1221 }
1222}
1223
1224#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1227pub enum VerificationOutcome {
1228 Ok,
1230 ReportPeer,
1233}
1234
1235#[derive(Debug, Constructor)]
1237pub struct TransactionFetcherInfo {
1238 pub max_inflight_requests: usize,
1240 pub max_inflight_requests_per_peer: u8,
1242 pub soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
1246 pub soft_limit_byte_size_pooled_transactions_response: usize,
1249 pub max_capacity_cache_txns_pending_fetch: u32,
1253}
1254
1255impl Default for TransactionFetcherInfo {
1256 fn default() -> Self {
1257 Self::new(
1258 DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS as usize,
1259 DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
1260 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
1261 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
1262 DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
1263 )
1264 }
1265}
1266
1267impl From<TransactionFetcherConfig> for TransactionFetcherInfo {
1268 fn from(config: TransactionFetcherConfig) -> Self {
1269 let TransactionFetcherConfig {
1270 max_inflight_requests,
1271 max_inflight_requests_per_peer,
1272 soft_limit_byte_size_pooled_transactions_response,
1273 soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1274 max_capacity_cache_txns_pending_fetch,
1275 } = config;
1276
1277 Self::new(
1278 max_inflight_requests as usize,
1279 max_inflight_requests_per_peer,
1280 soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1281 soft_limit_byte_size_pooled_transactions_response,
1282 max_capacity_cache_txns_pending_fetch,
1283 )
1284 }
1285}
1286
1287#[derive(Debug, Default)]
1288struct TxFetcherSearchDurations {
1289 find_idle_peer: Duration,
1290 fill_request: Duration,
1291}
1292
1293#[cfg(test)]
1294mod test {
1295 use super::*;
1296 use crate::transactions::tests::{default_cache, new_mock_session};
1297 use alloy_primitives::{hex, B256};
1298 use alloy_rlp::Decodable;
1299 use derive_more::IntoIterator;
1300 use reth_primitives::TransactionSigned;
1301 use std::{collections::HashSet, str::FromStr};
1302
1303 #[derive(IntoIterator)]
1304 struct TestValidAnnouncementData(Vec<(TxHash, Option<(u8, usize)>)>);
1305
1306 impl HandleMempoolData for TestValidAnnouncementData {
1307 fn is_empty(&self) -> bool {
1308 self.0.is_empty()
1309 }
1310
1311 fn len(&self) -> usize {
1312 self.0.len()
1313 }
1314
1315 fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
1316 self.0.retain(|(hash, _)| f(hash))
1317 }
1318 }
1319
1320 impl HandleVersionedMempoolData for TestValidAnnouncementData {
1321 fn msg_version(&self) -> EthVersion {
1322 EthVersion::Eth68
1323 }
1324 }
1325
1326 #[test]
1327 fn pack_eth68_request() {
1328 reth_tracing::init_test_tracing();
1329
1330 let tx_fetcher = &mut TransactionFetcher::<EthNetworkPrimitives>::default();
1333
1334 let eth68_hashes = [
1335 B256::from_slice(&[1; 32]),
1336 B256::from_slice(&[2; 32]),
1337 B256::from_slice(&[3; 32]),
1338 B256::from_slice(&[4; 32]),
1339 B256::from_slice(&[5; 32]),
1340 ];
1341 let eth68_sizes = [
1342 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ - MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED - 1, DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ, 2, 9,
1346 0,
1347 ];
1348
1349 let expected_request_hashes =
1350 [eth68_hashes[0], eth68_hashes[2]].into_iter().collect::<HashSet<_>>();
1351
1352 let expected_surplus_hashes =
1353 [eth68_hashes[1], eth68_hashes[3], eth68_hashes[4]].into_iter().collect::<HashSet<_>>();
1354
1355 let mut eth68_hashes_to_request = RequestTxHashes::with_capacity(3);
1356
1357 let valid_announcement_data = TestValidAnnouncementData(
1358 eth68_hashes
1359 .into_iter()
1360 .zip(eth68_sizes)
1361 .map(|(hash, size)| (hash, Some((0u8, size))))
1362 .collect::<Vec<_>>(),
1363 );
1364
1365 let surplus_eth68_hashes =
1368 tx_fetcher.pack_request_eth68(&mut eth68_hashes_to_request, valid_announcement_data);
1369
1370 let eth68_hashes_to_request = eth68_hashes_to_request.into_iter().collect::<HashSet<_>>();
1371 let surplus_eth68_hashes = surplus_eth68_hashes.into_iter().collect::<HashSet<_>>();
1372
1373 assert_eq!(expected_request_hashes, eth68_hashes_to_request);
1374 assert_eq!(expected_surplus_hashes, surplus_eth68_hashes);
1375 }
1376
1377 #[tokio::test]
1378 async fn test_on_fetch_pending_hashes() {
1379 reth_tracing::init_test_tracing();
1380
1381 let tx_fetcher = &mut TransactionFetcher::default();
1382
1383 let seen_hashes = [
1387 B256::from_slice(&[1; 32]),
1388 B256::from_slice(&[2; 32]),
1389 B256::from_slice(&[3; 32]),
1390 B256::from_slice(&[4; 32]),
1391 ];
1392 let seen_eth68_hashes_sizes = [120, 158, 116];
1397
1398 let peer_1 = PeerId::new([1; 64]);
1400 let peer_2 = PeerId::new([2; 64]);
1402
1403 let (mut peer_1_data, mut peer_1_mock_session_rx) =
1407 new_mock_session(peer_1, EthVersion::Eth66);
1408 for hash in &seen_hashes {
1409 peer_1_data.seen_transactions.insert(*hash);
1410 }
1411 let (mut peer_2_data, _) = new_mock_session(peer_2, EthVersion::Eth66);
1412 for hash in &seen_hashes {
1413 peer_2_data.seen_transactions.insert(*hash);
1414 }
1415 let mut peers = HashMap::default();
1416 peers.insert(peer_1, peer_1_data);
1417 peers.insert(peer_2, peer_2_data);
1418
1419 let mut backups = default_cache();
1420 backups.insert(peer_2);
1421 for i in 0..3 {
1423 let mut backups = default_cache();
1425 backups.insert(peer_2);
1426 let meta = TxFetchMetadata::new(0, backups, Some(seen_eth68_hashes_sizes[i]));
1427 tx_fetcher.hashes_fetch_inflight_and_pending_fetch.insert(seen_hashes[i], meta);
1428 }
1429 let meta = TxFetchMetadata::new(0, backups, None);
1430 tx_fetcher.hashes_fetch_inflight_and_pending_fetch.insert(seen_hashes[3], meta);
1431
1432 let mut backups = default_cache();
1433 backups.insert(peer_2);
1434 let hash_other = B256::from_slice(&[5; 32]);
1436 tx_fetcher
1437 .hashes_fetch_inflight_and_pending_fetch
1438 .insert(hash_other, TxFetchMetadata::new(0, backups, None));
1439 tx_fetcher.hashes_pending_fetch.insert(hash_other);
1440
1441 for hash in &seen_hashes {
1443 tx_fetcher
1444 .hashes_fetch_inflight_and_pending_fetch
1445 .get(hash)
1446 .unwrap()
1447 .fallback_peers_mut()
1448 .insert(peer_1);
1449 }
1450
1451 for hash in &seen_hashes {
1453 tx_fetcher.hashes_pending_fetch.insert(*hash);
1454 }
1455
1456 assert_eq!(tx_fetcher.hashes_pending_fetch.len(), 5);
1458
1459 tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
1462
1463 let req = peer_1_mock_session_rx
1465 .recv()
1466 .await
1467 .expect("peer session should receive request with buffered hashes");
1468 let PeerRequest::GetPooledTransactions { request, .. } = req else { unreachable!() };
1469 let GetPooledTransactions(requested_hashes) = request;
1470
1471 assert_eq!(
1472 requested_hashes.into_iter().collect::<HashSet<_>>(),
1473 seen_hashes.into_iter().collect::<HashSet<_>>()
1474 )
1475 }
1476
1477 #[test]
1478 fn verify_response_hashes() {
1479 let input = hex!("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598daa");
1480 let signed_tx_1: PooledTransaction =
1481 TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1482 let input = hex!("02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76");
1483 let signed_tx_2: PooledTransaction =
1484 TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1485
1486 let request_hashes = [
1488 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e67890")
1489 .unwrap(),
1490 *signed_tx_1.hash(),
1491 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e12345")
1492 .unwrap(),
1493 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4edabe3")
1494 .unwrap(),
1495 ];
1496
1497 for hash in &request_hashes {
1498 assert_ne!(hash, signed_tx_2.hash())
1499 }
1500
1501 let request_hashes = RequestTxHashes::new(request_hashes.into_iter().collect());
1502
1503 let response_txns = PooledTransactions(vec![signed_tx_1.clone(), signed_tx_2]);
1505 let payload = UnverifiedPooledTransactions::new(response_txns);
1506
1507 let (outcome, verified_payload) = payload.verify(&request_hashes, &PeerId::ZERO);
1508
1509 assert_eq!(VerificationOutcome::ReportPeer, outcome);
1510 assert_eq!(1, verified_payload.len());
1511 assert!(verified_payload.contains(&signed_tx_1));
1512 }
1513}