1use super::{
29 config::TransactionFetcherConfig,
30 constants::{tx_fetcher::*, SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST},
31 PeerMetadata, PooledTransactions, SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
32};
33use crate::{
34 cache::{LruCache, LruMap},
35 duration_metered_exec,
36 metrics::TransactionFetcherMetrics,
37};
38use alloy_consensus::transaction::PooledTransaction;
39use alloy_primitives::TxHash;
40use derive_more::{Constructor, Deref};
41use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
42use pin_project::pin_project;
43use reth_eth_wire::{
44 DedupPayload, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
45 PartiallyValidData, RequestTxHashes, ValidAnnouncementData,
46};
47use reth_eth_wire_types::{EthNetworkPrimitives, NetworkPrimitives};
48use reth_network_api::PeerRequest;
49use reth_network_p2p::error::{RequestError, RequestResult};
50use reth_network_peers::PeerId;
51use reth_primitives_traits::SignedTransaction;
52use schnellru::ByLength;
53use std::{
54 collections::HashMap,
55 pin::Pin,
56 task::{ready, Context, Poll},
57 time::Duration,
58};
59use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
60use tracing::trace;
61
62#[derive(Debug)]
67#[pin_project]
68pub struct TransactionFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
69 pub active_peers: LruMap<PeerId, u8, ByLength>,
71 #[pin]
77 pub inflight_requests: FuturesUnordered<GetPooledTxRequestFut<N::PooledTransaction>>,
78 pub hashes_pending_fetch: LruCache<TxHash>,
83 pub hashes_fetch_inflight_and_pending_fetch: LruMap<TxHash, TxFetchMetadata, ByLength>,
85 pub info: TransactionFetcherInfo,
87 #[doc(hidden)]
88 metrics: TransactionFetcherMetrics,
89}
90
91impl<N: NetworkPrimitives> TransactionFetcher<N> {
92 pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) {
94 self.active_peers.remove(peer_id);
95 }
96
97 #[inline]
99 pub fn update_metrics(&self) {
100 let metrics = &self.metrics;
101
102 metrics.inflight_transaction_requests.set(self.inflight_requests.len() as f64);
103
104 let hashes_pending_fetch = self.hashes_pending_fetch.len() as f64;
105 let total_hashes = self.hashes_fetch_inflight_and_pending_fetch.len() as f64;
106
107 metrics.hashes_pending_fetch.set(hashes_pending_fetch);
108 metrics.hashes_inflight_transaction_requests.set(total_hashes - hashes_pending_fetch);
109 }
110
111 #[inline]
112 fn update_pending_fetch_cache_search_metrics(&self, durations: TxFetcherSearchDurations) {
113 let metrics = &self.metrics;
114
115 let TxFetcherSearchDurations { find_idle_peer, fill_request } = durations;
116 metrics
117 .duration_find_idle_fallback_peer_for_any_pending_hash
118 .set(find_idle_peer.as_secs_f64());
119 metrics.duration_fill_request_from_hashes_pending_fetch.set(fill_request.as_secs_f64());
120 }
121
122 pub fn with_transaction_fetcher_config(config: &TransactionFetcherConfig) -> Self {
124 let TransactionFetcherConfig {
125 max_inflight_requests,
126 max_capacity_cache_txns_pending_fetch,
127 ..
128 } = *config;
129
130 let info = config.clone().into();
131
132 let metrics = TransactionFetcherMetrics::default();
133 metrics.capacity_inflight_requests.increment(max_inflight_requests as u64);
134
135 Self {
136 active_peers: LruMap::new(max_inflight_requests),
137 hashes_pending_fetch: LruCache::new(max_capacity_cache_txns_pending_fetch),
138 hashes_fetch_inflight_and_pending_fetch: LruMap::new(
139 max_inflight_requests + max_capacity_cache_txns_pending_fetch,
140 ),
141 info,
142 metrics,
143 ..Default::default()
144 }
145 }
146
147 #[inline]
149 pub fn remove_hashes_from_transaction_fetcher<I>(&mut self, hashes: I)
150 where
151 I: IntoIterator<Item = TxHash>,
152 {
153 for hash in hashes {
154 self.hashes_fetch_inflight_and_pending_fetch.remove(&hash);
155 self.hashes_pending_fetch.remove(&hash);
156 }
157 }
158
159 fn decrement_inflight_request_count_for(&mut self, peer_id: &PeerId) {
161 let remove = || -> bool {
162 if let Some(inflight_count) = self.active_peers.get(peer_id) {
163 *inflight_count = inflight_count.saturating_sub(1);
164 if *inflight_count == 0 {
165 return true
166 }
167 }
168 false
169 }();
170
171 if remove {
172 self.active_peers.remove(peer_id);
173 }
174 }
175
176 #[inline]
178 pub fn is_idle(&self, peer_id: &PeerId) -> bool {
179 let Some(inflight_count) = self.active_peers.peek(peer_id) else { return true };
180 if *inflight_count < self.info.max_inflight_requests_per_peer {
181 return true
182 }
183 false
184 }
185
186 pub fn get_idle_peer_for(&self, hash: TxHash) -> Option<&PeerId> {
188 let TxFetchMetadata { fallback_peers, .. } =
189 self.hashes_fetch_inflight_and_pending_fetch.peek(&hash)?;
190
191 for peer_id in fallback_peers.iter() {
192 if self.is_idle(peer_id) {
193 return Some(peer_id)
194 }
195 }
196
197 None
198 }
199
200 pub fn find_any_idle_fallback_peer_for_any_pending_hash(
206 &mut self,
207 hashes_to_request: &mut RequestTxHashes,
208 mut budget: Option<usize>, ) -> Option<PeerId> {
210 let mut hashes_pending_fetch_iter = self.hashes_pending_fetch.iter();
211
212 let idle_peer = loop {
213 let &hash = hashes_pending_fetch_iter.next()?;
214
215 let idle_peer = self.get_idle_peer_for(hash);
216
217 if idle_peer.is_some() {
218 hashes_to_request.insert(hash);
219 break idle_peer.copied()
220 }
221
222 if let Some(ref mut bud) = budget {
223 *bud = bud.saturating_sub(1);
224 if *bud == 0 {
225 return None
226 }
227 }
228 };
229 let hash = hashes_to_request.iter().next()?;
230
231 drop(hashes_pending_fetch_iter);
233 _ = self.hashes_pending_fetch.remove(hash);
234
235 idle_peer
236 }
237
238 pub fn pack_request(
243 &self,
244 hashes_to_request: &mut RequestTxHashes,
245 hashes_from_announcement: ValidAnnouncementData,
246 ) -> RequestTxHashes {
247 if hashes_from_announcement.msg_version().is_eth68() {
248 return self.pack_request_eth68(hashes_to_request, hashes_from_announcement)
249 }
250 self.pack_request_eth66(hashes_to_request, hashes_from_announcement)
251 }
252
253 pub fn pack_request_eth68(
264 &self,
265 hashes_to_request: &mut RequestTxHashes,
266 hashes_from_announcement: impl HandleMempoolData
267 + IntoIterator<Item = (TxHash, Option<(u8, usize)>)>,
268 ) -> RequestTxHashes {
269 let mut acc_size_response = 0;
270
271 let mut hashes_from_announcement_iter = hashes_from_announcement.into_iter();
272
273 if let Some((hash, Some((_ty, size)))) = hashes_from_announcement_iter.next() {
274 hashes_to_request.insert(hash);
275
276 if size >= self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request {
278 return hashes_from_announcement_iter.collect()
279 }
280 acc_size_response = size;
281 }
282
283 let mut surplus_hashes = RequestTxHashes::default();
284
285 loop {
288 let Some((hash, metadata)) = hashes_from_announcement_iter.next() else { break };
289
290 let Some((_ty, size)) = metadata else {
291 unreachable!("this method is called upon reception of an eth68 announcement")
292 };
293
294 let next_acc_size = acc_size_response + size;
295
296 if next_acc_size <=
297 self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request
298 {
299 acc_size_response = next_acc_size;
302 _ = hashes_to_request.insert(hash)
303 } else {
304 _ = surplus_hashes.insert(hash)
305 }
306
307 let free_space =
308 self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request -
309 acc_size_response;
310
311 if free_space < MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED {
312 break
313 }
314 }
315
316 surplus_hashes.extend(hashes_from_announcement_iter.map(|(hash, _metadata)| hash));
317
318 surplus_hashes
319 }
320
321 pub fn pack_request_eth66(
328 &self,
329 hashes_to_request: &mut RequestTxHashes,
330 hashes_from_announcement: ValidAnnouncementData,
331 ) -> RequestTxHashes {
332 let (mut hashes, _version) = hashes_from_announcement.into_request_hashes();
333 if hashes.len() <= SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST {
334 *hashes_to_request = hashes;
335 hashes_to_request.shrink_to_fit();
336
337 RequestTxHashes::default()
338 } else {
339 let surplus_hashes =
340 hashes.retain_count(SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST);
341 *hashes_to_request = hashes;
342 hashes_to_request.shrink_to_fit();
343
344 surplus_hashes
345 }
346 }
347
348 pub fn try_buffer_hashes_for_retry(
350 &mut self,
351 mut hashes: RequestTxHashes,
352 peer_failed_to_serve: &PeerId,
353 ) {
354 hashes.retain(|hash| {
357 if let Some(entry) = self.hashes_fetch_inflight_and_pending_fetch.get(hash) {
358 entry.fallback_peers_mut().remove(peer_failed_to_serve);
359 return true
360 }
361 false
363 });
364
365 self.buffer_hashes(hashes, None)
366 }
367
368 pub fn num_pending_hashes(&self) -> usize {
370 self.hashes_pending_fetch.len()
371 }
372
373 pub fn num_all_hashes(&self) -> usize {
375 self.hashes_fetch_inflight_and_pending_fetch.len()
376 }
377
378 pub fn buffer_hashes(&mut self, hashes: RequestTxHashes, fallback_peer: Option<PeerId>) {
383 for hash in hashes {
384 if self.hashes_fetch_inflight_and_pending_fetch.peek(&hash).is_none() {
386 continue
387 }
388
389 let Some(TxFetchMetadata { retries, fallback_peers, .. }) =
390 self.hashes_fetch_inflight_and_pending_fetch.get(&hash)
391 else {
392 return
393 };
394
395 if let Some(peer_id) = fallback_peer {
396 fallback_peers.insert(peer_id);
398 } else {
399 if *retries >= DEFAULT_MAX_RETRIES {
400 trace!(target: "net::tx",
401 %hash,
402 retries,
403 "retry limit for `GetPooledTransactions` requests reached for hash, dropping hash"
404 );
405
406 self.hashes_fetch_inflight_and_pending_fetch.remove(&hash);
407 self.hashes_pending_fetch.remove(&hash);
408 continue
409 }
410 *retries += 1;
411 }
412
413 if let (_, Some(evicted_hash)) = self.hashes_pending_fetch.insert_and_get_evicted(hash)
414 {
415 self.hashes_fetch_inflight_and_pending_fetch.remove(&evicted_hash);
416 self.hashes_pending_fetch.remove(&evicted_hash);
417 }
418 }
419 }
420
421 pub fn on_fetch_pending_hashes(
426 &mut self,
427 peers: &HashMap<PeerId, PeerMetadata<N>>,
428 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
429 ) {
430 let mut hashes_to_request = RequestTxHashes::with_capacity(
431 DEFAULT_MARGINAL_COUNT_HASHES_GET_POOLED_TRANSACTIONS_REQUEST,
432 );
433 let mut search_durations = TxFetcherSearchDurations::default();
434
435 let budget_find_idle_fallback_peer = self
437 .search_breadth_budget_find_idle_fallback_peer(&has_capacity_wrt_pending_pool_imports);
438
439 let peer_id = duration_metered_exec!(
440 {
441 let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash(
442 &mut hashes_to_request,
443 budget_find_idle_fallback_peer,
444 ) else {
445 return
447 };
448
449 peer_id
450 },
451 search_durations.find_idle_peer
452 );
453
454 let Some(peer) = peers.get(&peer_id) else { return };
456 let conn_eth_version = peer.version;
457
458 let budget_fill_request = self
463 .search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
464 &has_capacity_wrt_pending_pool_imports,
465 );
466
467 duration_metered_exec!(
468 {
469 self.fill_request_from_hashes_pending_fetch(
470 &mut hashes_to_request,
471 &peer.seen_transactions,
472 budget_fill_request,
473 )
474 },
475 search_durations.fill_request
476 );
477
478 self.update_pending_fetch_cache_search_metrics(search_durations);
479
480 trace!(target: "net::tx",
481 peer_id=format!("{peer_id:#}"),
482 hashes=?*hashes_to_request,
483 %conn_eth_version,
484 "requesting hashes that were stored pending fetch from peer"
485 );
486
487 if let Some(failed_to_request_hashes) =
489 self.request_transactions_from_peer(hashes_to_request, peer)
490 {
491 trace!(target: "net::tx",
492 peer_id=format!("{peer_id:#}"),
493 ?failed_to_request_hashes,
494 %conn_eth_version,
495 "failed sending request to peer's session, buffering hashes"
496 );
497
498 self.buffer_hashes(failed_to_request_hashes, Some(peer_id));
499 }
500 }
501
502 pub fn filter_unseen_and_pending_hashes(
505 &mut self,
506 new_announced_hashes: &mut ValidAnnouncementData,
507 is_tx_bad_import: impl Fn(&TxHash) -> bool,
508 peer_id: &PeerId,
509 client_version: &str,
510 ) {
511 let mut previously_unseen_hashes_count = 0;
512
513 let msg_version = new_announced_hashes.msg_version();
514
515 new_announced_hashes.retain(|hash, metadata| {
517
518 if let Some(TxFetchMetadata{ tx_encoded_length: previously_seen_size, ..}) = self.hashes_fetch_inflight_and_pending_fetch.peek_mut(hash) {
520 if let Some((_ty, size)) = metadata {
522 if let Some(prev_size) = previously_seen_size {
523 if size != prev_size {
525 trace!(target: "net::tx",
526 peer_id=format!("{peer_id:#}"),
527 %hash,
528 size,
529 previously_seen_size,
530 %client_version,
531 "peer announced a different size for tx, this is especially worrying if one size is much bigger..."
532 );
533 }
534 }
535 *previously_seen_size = Some(*size);
537 }
538
539 if self.hashes_pending_fetch.remove(hash) {
541 return true
542 }
543
544 return false
545 }
546
547 if is_tx_bad_import(hash) {
550 return false
551 }
552
553 previously_unseen_hashes_count += 1;
554
555 if self.hashes_fetch_inflight_and_pending_fetch.get_or_insert(*hash, ||
556 TxFetchMetadata{retries: 0, fallback_peers: LruCache::new(DEFAULT_MAX_COUNT_FALLBACK_PEERS as u32), tx_encoded_length: None}
557 ).is_none() {
558
559 trace!(target: "net::tx",
560 peer_id=format!("{peer_id:#}"),
561 %hash,
562 ?msg_version,
563 %client_version,
564 "failed to cache new announced hash from peer in schnellru::LruMap, dropping hash"
565 );
566
567 return false
568 }
569 true
570 });
571
572 trace!(target: "net::tx",
573 peer_id=format!("{peer_id:#}"),
574 previously_unseen_hashes_count=previously_unseen_hashes_count,
575 msg_version=?msg_version,
576 client_version=%client_version,
577 "received previously unseen hashes in announcement from peer"
578 );
579 }
580
581 pub fn request_transactions_from_peer(
589 &mut self,
590 new_announced_hashes: RequestTxHashes,
591 peer: &PeerMetadata<N>,
592 ) -> Option<RequestTxHashes> {
593 let peer_id: PeerId = peer.request_tx.peer_id;
594 let conn_eth_version = peer.version;
595
596 if self.active_peers.len() >= self.info.max_inflight_requests {
597 trace!(target: "net::tx",
598 peer_id=format!("{peer_id:#}"),
599 hashes=?*new_announced_hashes,
600 %conn_eth_version,
601 max_inflight_transaction_requests=self.info.max_inflight_requests,
602 "limit for concurrent `GetPooledTransactions` requests reached, dropping request for hashes to peer"
603 );
604 return Some(new_announced_hashes)
605 }
606
607 let Some(inflight_count) = self.active_peers.get_or_insert(peer_id, || 0) else {
608 trace!(target: "net::tx",
609 peer_id=format!("{peer_id:#}"),
610 hashes=?*new_announced_hashes,
611 conn_eth_version=%conn_eth_version,
612 "failed to cache active peer in schnellru::LruMap, dropping request to peer"
613 );
614 return Some(new_announced_hashes)
615 };
616
617 if *inflight_count >= self.info.max_inflight_requests_per_peer {
618 trace!(target: "net::tx",
619 peer_id=format!("{peer_id:#}"),
620 hashes=?*new_announced_hashes,
621 %conn_eth_version,
622 max_concurrent_tx_reqs_per_peer=self.info.max_inflight_requests_per_peer,
623 "limit for concurrent `GetPooledTransactions` requests per peer reached"
624 );
625 return Some(new_announced_hashes)
626 }
627
628 #[cfg(debug_assertions)]
629 {
630 for hash in &new_announced_hashes {
631 if self.hashes_pending_fetch.contains(hash) {
632 tracing::debug!(target: "net::tx", "`{}` should have been taken out of buffer before packing in a request, breaks invariant `@hashes_pending_fetch` and `@inflight_requests`, `@hashes_fetch_inflight_and_pending_fetch` for `{}`: {:?}",
633 format!("{:?}", new_announced_hashes), format!("{:?}", new_announced_hashes),
635 new_announced_hashes.iter().map(|hash| {
636 let metadata = self.hashes_fetch_inflight_and_pending_fetch.get(hash);
637 (*hash, metadata.map(|m| (m.retries, m.tx_encoded_length)))
639 }).collect::<Vec<(TxHash, Option<(u8, Option<usize>)>)>>())
640 }
641 }
642 }
643
644 let (response, rx) = oneshot::channel();
645 let req = PeerRequest::GetPooledTransactions {
646 request: GetPooledTransactions(new_announced_hashes.iter().copied().collect()),
647 response,
648 };
649
650 if let Err(err) = peer.request_tx.try_send(req) {
652 return match err {
654 TrySendError::Full(_) | TrySendError::Closed(_) => {
655 self.metrics.egress_peer_channel_full.increment(1);
656 Some(new_announced_hashes)
657 }
658 }
659 }
660
661 *inflight_count += 1;
662 self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, new_announced_hashes, rx));
664
665 None
666 }
667
668 pub fn fill_request_from_hashes_pending_fetch(
688 &mut self,
689 hashes_to_request: &mut RequestTxHashes,
690 seen_hashes: &LruCache<TxHash>,
691 mut budget_fill_request: Option<usize>, ) {
693 let Some(hash) = hashes_to_request.iter().next() else { return };
694
695 let mut acc_size_response = self
696 .hashes_fetch_inflight_and_pending_fetch
697 .get(hash)
698 .and_then(|entry| entry.tx_encoded_len())
699 .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
700
701 if acc_size_response >=
703 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES
704 {
705 return
706 }
707
708 for hash in self.hashes_pending_fetch.iter() {
711 if !seen_hashes.contains(hash) {
713 continue
714 };
715
716 hashes_to_request.insert(*hash);
718
719 let size = self
721 .hashes_fetch_inflight_and_pending_fetch
722 .get(hash)
723 .and_then(|entry| entry.tx_encoded_len())
724 .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
725
726 acc_size_response += size;
727
728 if acc_size_response >=
732 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES ||
733 hashes_to_request.len() >
734 DEFAULT_SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST_ON_FETCH_PENDING_HASHES
735 {
736 break
737 }
738
739 if let Some(ref mut bud) = budget_fill_request {
740 *bud -= 1;
741 if *bud == 0 {
742 break
743 }
744 }
745 }
746
747 for hash in hashes_to_request.iter() {
749 self.hashes_pending_fetch.remove(hash);
750 }
751 }
752
753 pub fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
756 let info = &self.info;
757
758 self.has_capacity(info.max_inflight_requests)
759 }
760
761 fn has_capacity(&self, max_inflight_requests: usize) -> bool {
763 self.inflight_requests.len() <= max_inflight_requests
764 }
765
766 pub fn search_breadth_budget_find_idle_fallback_peer(
772 &self,
773 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
774 ) -> Option<usize> {
775 let info = &self.info;
776
777 let tx_fetcher_has_capacity = self.has_capacity(
778 info.max_inflight_requests /
779 DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_IDLE_PEER,
780 );
781 let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
782 DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_IDLE_PEER,
783 );
784
785 if tx_fetcher_has_capacity && tx_pool_has_capacity {
786 None
788 } else {
789 let limit = DEFAULT_BUDGET_FIND_IDLE_FALLBACK_PEER;
791
792 trace!(target: "net::tx",
793 inflight_requests=self.inflight_requests.len(),
794 max_inflight_transaction_requests=info.max_inflight_requests,
795 hashes_pending_fetch=self.hashes_pending_fetch.len(),
796 limit,
797 "search breadth limited in search for idle fallback peer for some hash pending fetch"
798 );
799
800 Some(limit)
801 }
802 }
803
804 pub fn search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
811 &self,
812 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
813 ) -> Option<usize> {
814 let info = &self.info;
815
816 let tx_fetcher_has_capacity = self.has_capacity(
817 info.max_inflight_requests /
818 DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_INTERSECTION,
819 );
820 let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
821 DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_INTERSECTION,
822 );
823
824 if tx_fetcher_has_capacity && tx_pool_has_capacity {
825 None
827 } else {
828 let limit = DEFAULT_BUDGET_FIND_INTERSECTION_ANNOUNCED_BY_PEER_AND_PENDING_FETCH;
830
831 trace!(target: "net::tx",
832 inflight_requests=self.inflight_requests.len(),
833 max_inflight_transaction_requests=self.info.max_inflight_requests,
834 hashes_pending_fetch=self.hashes_pending_fetch.len(),
835 limit=limit,
836 "search breadth limited in search for intersection of hashes announced by peer and hashes pending fetch"
837 );
838
839 Some(limit)
840 }
841 }
842
843 pub fn on_resolved_get_pooled_transactions_request_fut(
847 &mut self,
848 response: GetPooledTxResponse<N::PooledTransaction>,
849 ) -> FetchEvent<N::PooledTransaction> {
850 let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response;
853
854 self.decrement_inflight_request_count_for(&peer_id);
855
856 match result {
857 Ok(Ok(transactions)) => {
858 if transactions.is_empty() {
863 trace!(target: "net::tx",
864 peer_id=format!("{peer_id:#}"),
865 requested_hashes_len=requested_hashes.len(),
866 "received empty `PooledTransactions` response from peer, peer failed to serve hashes it announced"
867 );
868
869 return FetchEvent::EmptyResponse { peer_id }
870 }
871
872 let payload = UnverifiedPooledTransactions::new(transactions);
876
877 let unverified_len = payload.len();
878 let (verification_outcome, verified_payload) =
879 payload.verify(&requested_hashes, &peer_id);
880
881 let unsolicited = unverified_len - verified_payload.len();
882 if unsolicited > 0 {
883 self.metrics.unsolicited_transactions.increment(unsolicited as u64);
884 }
885
886 let report_peer = if verification_outcome == VerificationOutcome::ReportPeer {
887 trace!(target: "net::tx",
888 peer_id=format!("{peer_id:#}"),
889 unverified_len,
890 verified_payload_len=verified_payload.len(),
891 "received `PooledTransactions` response from peer with entries that didn't verify against request, filtered out transactions"
892 );
893 true
894 } else {
895 false
896 };
897
898 if verified_payload.is_empty() {
900 return FetchEvent::FetchError { peer_id, error: RequestError::BadResponse }
901 }
902
903 let unvalidated_payload_len = verified_payload.len();
907
908 let valid_payload = verified_payload.dedup();
909
910 if valid_payload.len() != unvalidated_payload_len {
916 trace!(target: "net::tx",
917 peer_id=format!("{peer_id:#}"),
918 unvalidated_payload_len,
919 valid_payload_len=valid_payload.len(),
920 "received `PooledTransactions` response from peer with duplicate entries, filtered them out"
921 );
922 }
923 let requested_hashes_len = requested_hashes.len();
931 let mut fetched = Vec::with_capacity(valid_payload.len());
932 requested_hashes.retain(|requested_hash| {
933 if valid_payload.contains_key(requested_hash) {
934 fetched.push(*requested_hash);
936 return false
937 }
938 true
939 });
940 fetched.shrink_to_fit();
941 self.metrics.fetched_transactions.increment(fetched.len() as u64);
942
943 if fetched.len() < requested_hashes_len {
944 trace!(target: "net::tx",
945 peer_id=format!("{peer_id:#}"),
946 requested_hashes_len=requested_hashes_len,
947 fetched_len=fetched.len(),
948 "peer failed to serve hashes it announced"
949 );
950 }
951
952 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
956
957 let transactions = valid_payload.into_data().into_values().collect();
958
959 FetchEvent::TransactionsFetched { peer_id, transactions, report_peer }
960 }
961 Ok(Err(req_err)) => {
962 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
963 FetchEvent::FetchError { peer_id, error: req_err }
964 }
965 Err(_) => {
966 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
967 FetchEvent::FetchError { peer_id, error: RequestError::ChannelClosed }
969 }
970 }
971 }
972}
973
974impl<N: NetworkPrimitives> Stream for TransactionFetcher<N> {
975 type Item = FetchEvent<N::PooledTransaction>;
976
977 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
979 if self.inflight_requests.is_empty() {
982 return Poll::Pending
983 }
984
985 if let Some(resp) = ready!(self.inflight_requests.poll_next_unpin(cx)) {
986 return Poll::Ready(Some(self.on_resolved_get_pooled_transactions_request_fut(resp)))
987 }
988
989 Poll::Pending
990 }
991}
992
993impl<T: NetworkPrimitives> Default for TransactionFetcher<T> {
994 fn default() -> Self {
995 Self {
996 active_peers: LruMap::new(DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS),
997 inflight_requests: Default::default(),
998 hashes_pending_fetch: LruCache::new(DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH),
999 hashes_fetch_inflight_and_pending_fetch: LruMap::new(
1000 DEFAULT_MAX_CAPACITY_CACHE_INFLIGHT_AND_PENDING_FETCH,
1001 ),
1002 info: TransactionFetcherInfo::default(),
1003 metrics: Default::default(),
1004 }
1005 }
1006}
1007
1008#[derive(Debug, Constructor)]
1010pub struct TxFetchMetadata {
1011 retries: u8,
1013 fallback_peers: LruCache<PeerId>,
1015 tx_encoded_length: Option<usize>,
1020}
1021
1022impl TxFetchMetadata {
1023 pub const fn fallback_peers_mut(&mut self) -> &mut LruCache<PeerId> {
1025 &mut self.fallback_peers
1026 }
1027
1028 pub const fn tx_encoded_len(&self) -> Option<usize> {
1033 self.tx_encoded_length
1034 }
1035}
1036
1037#[derive(Debug)]
1039pub enum FetchEvent<T = PooledTransaction> {
1040 TransactionsFetched {
1042 peer_id: PeerId,
1044 transactions: PooledTransactions<T>,
1046 report_peer: bool,
1049 },
1050 FetchError {
1052 peer_id: PeerId,
1054 error: RequestError,
1056 },
1057 EmptyResponse {
1059 peer_id: PeerId,
1061 },
1062}
1063
1064#[derive(Debug)]
1066pub struct GetPooledTxRequest<T = PooledTransaction> {
1067 peer_id: PeerId,
1068 requested_hashes: RequestTxHashes,
1070 response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1071}
1072
1073#[derive(Debug)]
1076pub struct GetPooledTxResponse<T = PooledTransaction> {
1077 peer_id: PeerId,
1078 requested_hashes: RequestTxHashes,
1081 result: Result<RequestResult<PooledTransactions<T>>, RecvError>,
1082}
1083
1084#[must_use = "futures do nothing unless polled"]
1087#[pin_project::pin_project]
1088#[derive(Debug)]
1089pub struct GetPooledTxRequestFut<T = PooledTransaction> {
1090 #[pin]
1091 inner: Option<GetPooledTxRequest<T>>,
1092}
1093
1094impl<T> GetPooledTxRequestFut<T> {
1095 #[inline]
1096 const fn new(
1097 peer_id: PeerId,
1098 requested_hashes: RequestTxHashes,
1099 response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1100 ) -> Self {
1101 Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) }
1102 }
1103}
1104
1105impl<T> Future for GetPooledTxRequestFut<T> {
1106 type Output = GetPooledTxResponse<T>;
1107
1108 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1109 let mut req = self.as_mut().project().inner.take().expect("polled after completion");
1110 match req.response.poll_unpin(cx) {
1111 Poll::Ready(result) => Poll::Ready(GetPooledTxResponse {
1112 peer_id: req.peer_id,
1113 requested_hashes: req.requested_hashes,
1114 result,
1115 }),
1116 Poll::Pending => {
1117 self.project().inner.set(Some(req));
1118 Poll::Pending
1119 }
1120 }
1121 }
1122}
1123
1124#[derive(Debug, Constructor, Deref)]
1126pub struct UnverifiedPooledTransactions<T> {
1127 txns: PooledTransactions<T>,
1128}
1129
1130#[derive(Debug, Constructor, Deref)]
1132pub struct VerifiedPooledTransactions<T> {
1133 txns: PooledTransactions<T>,
1134}
1135
1136impl<T: SignedTransaction> DedupPayload for VerifiedPooledTransactions<T> {
1137 type Value = T;
1138
1139 fn is_empty(&self) -> bool {
1140 self.txns.is_empty()
1141 }
1142
1143 fn len(&self) -> usize {
1144 self.txns.len()
1145 }
1146
1147 fn dedup(self) -> PartiallyValidData<Self::Value> {
1148 PartiallyValidData::from_raw_data(
1149 self.txns.into_iter().map(|tx| (*tx.tx_hash(), tx)).collect(),
1150 None,
1151 )
1152 }
1153}
1154
1155trait VerifyPooledTransactionsResponse {
1156 type Transaction: SignedTransaction;
1157
1158 fn verify(
1159 self,
1160 requested_hashes: &RequestTxHashes,
1161 peer_id: &PeerId,
1162 ) -> (VerificationOutcome, VerifiedPooledTransactions<Self::Transaction>);
1163}
1164
1165impl<T: SignedTransaction> VerifyPooledTransactionsResponse for UnverifiedPooledTransactions<T> {
1166 type Transaction = T;
1167
1168 fn verify(
1169 self,
1170 requested_hashes: &RequestTxHashes,
1171 _peer_id: &PeerId,
1172 ) -> (VerificationOutcome, VerifiedPooledTransactions<T>) {
1173 let mut verification_outcome = VerificationOutcome::Ok;
1174
1175 let Self { mut txns } = self;
1176
1177 #[cfg(debug_assertions)]
1178 let mut tx_hashes_not_requested: smallvec::SmallVec<[TxHash; 16]> = smallvec::smallvec!();
1179 #[cfg(not(debug_assertions))]
1180 let mut tx_hashes_not_requested_count = 0;
1181
1182 txns.0.retain(|tx| {
1183 if !requested_hashes.contains(tx.tx_hash()) {
1184 verification_outcome = VerificationOutcome::ReportPeer;
1185
1186 #[cfg(debug_assertions)]
1187 tx_hashes_not_requested.push(*tx.tx_hash());
1188 #[cfg(not(debug_assertions))]
1189 {
1190 tx_hashes_not_requested_count += 1;
1191 }
1192
1193 return false
1194 }
1195 true
1196 });
1197
1198 #[cfg(debug_assertions)]
1199 if !tx_hashes_not_requested.is_empty() {
1200 trace!(target: "net::tx",
1201 peer_id=format!("{_peer_id:#}"),
1202 ?tx_hashes_not_requested,
1203 "transactions in `PooledTransactions` response from peer were not requested"
1204 );
1205 }
1206 #[cfg(not(debug_assertions))]
1207 if tx_hashes_not_requested_count != 0 {
1208 trace!(target: "net::tx",
1209 peer_id=format!("{_peer_id:#}"),
1210 tx_hashes_not_requested_count,
1211 "transactions in `PooledTransactions` response from peer were not requested"
1212 );
1213 }
1214
1215 (verification_outcome, VerifiedPooledTransactions::new(txns))
1216 }
1217}
1218
1219#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1222pub enum VerificationOutcome {
1223 Ok,
1225 ReportPeer,
1228}
1229
1230#[derive(Debug, Constructor)]
1232pub struct TransactionFetcherInfo {
1233 pub max_inflight_requests: usize,
1235 pub max_inflight_requests_per_peer: u8,
1237 pub soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
1241 pub soft_limit_byte_size_pooled_transactions_response: usize,
1244 pub max_capacity_cache_txns_pending_fetch: u32,
1248}
1249
1250impl Default for TransactionFetcherInfo {
1251 fn default() -> Self {
1252 Self::new(
1253 DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS as usize,
1254 DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
1255 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
1256 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
1257 DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
1258 )
1259 }
1260}
1261
1262impl From<TransactionFetcherConfig> for TransactionFetcherInfo {
1263 fn from(config: TransactionFetcherConfig) -> Self {
1264 let TransactionFetcherConfig {
1265 max_inflight_requests,
1266 max_inflight_requests_per_peer,
1267 soft_limit_byte_size_pooled_transactions_response,
1268 soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1269 max_capacity_cache_txns_pending_fetch,
1270 } = config;
1271
1272 Self::new(
1273 max_inflight_requests as usize,
1274 max_inflight_requests_per_peer,
1275 soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1276 soft_limit_byte_size_pooled_transactions_response,
1277 max_capacity_cache_txns_pending_fetch,
1278 )
1279 }
1280}
1281
1282#[derive(Debug, Default)]
1283struct TxFetcherSearchDurations {
1284 find_idle_peer: Duration,
1285 fill_request: Duration,
1286}
1287
1288#[cfg(test)]
1289mod test {
1290 use super::*;
1291 use crate::test_utils::transactions::{buffer_hash_to_tx_fetcher, new_mock_session};
1292 use alloy_primitives::{hex, B256};
1293 use alloy_rlp::Decodable;
1294 use derive_more::IntoIterator;
1295 use reth_eth_wire_types::EthVersion;
1296 use reth_ethereum_primitives::TransactionSigned;
1297 use std::{collections::HashSet, str::FromStr};
1298
1299 #[derive(IntoIterator)]
1300 struct TestValidAnnouncementData(Vec<(TxHash, Option<(u8, usize)>)>);
1301
1302 impl HandleMempoolData for TestValidAnnouncementData {
1303 fn is_empty(&self) -> bool {
1304 self.0.is_empty()
1305 }
1306
1307 fn len(&self) -> usize {
1308 self.0.len()
1309 }
1310
1311 fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
1312 self.0.retain(|(hash, _)| f(hash))
1313 }
1314 }
1315
1316 impl HandleVersionedMempoolData for TestValidAnnouncementData {
1317 fn msg_version(&self) -> EthVersion {
1318 EthVersion::Eth68
1319 }
1320 }
1321
1322 #[test]
1323 fn pack_eth68_request() {
1324 reth_tracing::init_test_tracing();
1325
1326 let tx_fetcher = &mut TransactionFetcher::<EthNetworkPrimitives>::default();
1329
1330 let eth68_hashes = [
1331 B256::from_slice(&[1; 32]),
1332 B256::from_slice(&[2; 32]),
1333 B256::from_slice(&[3; 32]),
1334 B256::from_slice(&[4; 32]),
1335 B256::from_slice(&[5; 32]),
1336 ];
1337 let eth68_sizes = [
1338 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ - MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED - 1, DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ, 2, 9,
1342 0,
1343 ];
1344
1345 let expected_request_hashes =
1346 [eth68_hashes[0], eth68_hashes[2]].into_iter().collect::<HashSet<_>>();
1347
1348 let expected_surplus_hashes =
1349 [eth68_hashes[1], eth68_hashes[3], eth68_hashes[4]].into_iter().collect::<HashSet<_>>();
1350
1351 let mut eth68_hashes_to_request = RequestTxHashes::with_capacity(3);
1352
1353 let valid_announcement_data = TestValidAnnouncementData(
1354 eth68_hashes
1355 .into_iter()
1356 .zip(eth68_sizes)
1357 .map(|(hash, size)| (hash, Some((0u8, size))))
1358 .collect::<Vec<_>>(),
1359 );
1360
1361 let surplus_eth68_hashes =
1364 tx_fetcher.pack_request_eth68(&mut eth68_hashes_to_request, valid_announcement_data);
1365
1366 let eth68_hashes_to_request = eth68_hashes_to_request.into_iter().collect::<HashSet<_>>();
1367 let surplus_eth68_hashes = surplus_eth68_hashes.into_iter().collect::<HashSet<_>>();
1368
1369 assert_eq!(expected_request_hashes, eth68_hashes_to_request);
1370 assert_eq!(expected_surplus_hashes, surplus_eth68_hashes);
1371 }
1372
1373 #[tokio::test]
1374 async fn test_on_fetch_pending_hashes() {
1375 reth_tracing::init_test_tracing();
1376
1377 let tx_fetcher = &mut TransactionFetcher::default();
1378
1379 let seen_hashes = [
1383 B256::from_slice(&[1; 32]),
1384 B256::from_slice(&[2; 32]),
1385 B256::from_slice(&[3; 32]),
1386 B256::from_slice(&[4; 32]),
1387 ];
1388 let seen_eth68_hashes_sizes = [120, 158, 116];
1393
1394 let peer_1 = PeerId::new([1; 64]);
1396 let peer_2 = PeerId::new([2; 64]);
1398
1399 let (mut peer_1_data, mut peer_1_mock_session_rx) =
1403 new_mock_session(peer_1, EthVersion::Eth66);
1404 for hash in &seen_hashes {
1405 peer_1_data.seen_transactions.insert(*hash);
1406 }
1407 let (mut peer_2_data, _) = new_mock_session(peer_2, EthVersion::Eth66);
1408 for hash in &seen_hashes {
1409 peer_2_data.seen_transactions.insert(*hash);
1410 }
1411 let mut peers = HashMap::default();
1412 peers.insert(peer_1, peer_1_data);
1413 peers.insert(peer_2, peer_2_data);
1414
1415 for i in 0..3 {
1417 buffer_hash_to_tx_fetcher(
1419 tx_fetcher,
1420 seen_hashes[i],
1421 peer_2,
1422 0,
1423 Some(seen_eth68_hashes_sizes[i]),
1424 );
1425 }
1426 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[3], peer_2, 0, None);
1427
1428 let hash_other = B256::from_slice(&[5; 32]);
1430 buffer_hash_to_tx_fetcher(tx_fetcher, hash_other, peer_2, 0, None);
1431
1432 for hash in &seen_hashes {
1434 buffer_hash_to_tx_fetcher(tx_fetcher, *hash, peer_1, 0, None);
1435 }
1436
1437 assert_eq!(tx_fetcher.num_pending_hashes(), 5);
1439
1440 tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
1443
1444 let req = peer_1_mock_session_rx
1446 .recv()
1447 .await
1448 .expect("peer session should receive request with buffered hashes");
1449 let PeerRequest::GetPooledTransactions { request, .. } = req else { unreachable!() };
1450 let GetPooledTransactions(requested_hashes) = request;
1451
1452 assert_eq!(
1453 requested_hashes.into_iter().collect::<HashSet<_>>(),
1454 seen_hashes.into_iter().collect::<HashSet<_>>()
1455 )
1456 }
1457
1458 #[test]
1459 fn verify_response_hashes() {
1460 let input = hex!(
1461 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598daa"
1462 );
1463 let signed_tx_1: PooledTransaction =
1464 TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1465 let input = hex!(
1466 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
1467 );
1468 let signed_tx_2: PooledTransaction =
1469 TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1470
1471 let request_hashes = [
1473 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e67890")
1474 .unwrap(),
1475 *signed_tx_1.hash(),
1476 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e12345")
1477 .unwrap(),
1478 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4edabe3")
1479 .unwrap(),
1480 ];
1481
1482 for hash in &request_hashes {
1483 assert_ne!(hash, signed_tx_2.hash())
1484 }
1485
1486 let request_hashes = RequestTxHashes::new(request_hashes.into_iter().collect());
1487
1488 let response_txns = PooledTransactions(vec![signed_tx_1.clone(), signed_tx_2]);
1490 let payload = UnverifiedPooledTransactions::new(response_txns);
1491
1492 let (outcome, verified_payload) = payload.verify(&request_hashes, &PeerId::ZERO);
1493
1494 assert_eq!(VerificationOutcome::ReportPeer, outcome);
1495 assert_eq!(1, verified_payload.len());
1496 assert!(verified_payload.contains(&signed_tx_1));
1497 }
1498}