1use super::{
29 config::TransactionFetcherConfig,
30 constants::{tx_fetcher::*, SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST},
31 PeerMetadata, PooledTransactions, SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
32};
33use crate::{
34 cache::{LruCache, LruMap},
35 duration_metered_exec,
36 metrics::TransactionFetcherMetrics,
37};
38use alloy_consensus::transaction::PooledTransaction;
39use alloy_primitives::TxHash;
40use derive_more::{Constructor, Deref};
41use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
42use pin_project::pin_project;
43use reth_eth_wire::{
44 DedupPayload, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
45 PartiallyValidData, RequestTxHashes, ValidAnnouncementData,
46};
47use reth_eth_wire_types::{EthNetworkPrimitives, NetworkPrimitives};
48use reth_network_api::PeerRequest;
49use reth_network_p2p::error::{RequestError, RequestResult};
50use reth_network_peers::PeerId;
51use reth_primitives_traits::SignedTransaction;
52use schnellru::ByLength;
53use std::{
54 collections::HashMap,
55 pin::Pin,
56 task::{ready, Context, Poll},
57 time::Duration,
58};
59use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
60use tracing::trace;
61
62#[derive(Debug)]
67#[pin_project]
68pub struct TransactionFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
69 pub active_peers: LruMap<PeerId, u8, ByLength>,
71 #[pin]
77 pub inflight_requests: FuturesUnordered<GetPooledTxRequestFut<N::PooledTransaction>>,
78 pub hashes_pending_fetch: LruCache<TxHash>,
83 pub hashes_fetch_inflight_and_pending_fetch: LruMap<TxHash, TxFetchMetadata, ByLength>,
85 pub info: TransactionFetcherInfo,
87 #[doc(hidden)]
88 metrics: TransactionFetcherMetrics,
89}
90
91impl<N: NetworkPrimitives> TransactionFetcher<N> {
92 pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) {
94 self.active_peers.remove(peer_id);
95 }
96
97 #[inline]
99 pub fn update_metrics(&self) {
100 let metrics = &self.metrics;
101
102 metrics.inflight_transaction_requests.set(self.inflight_requests.len() as f64);
103
104 let hashes_pending_fetch = self.hashes_pending_fetch.len() as f64;
105 let total_hashes = self.hashes_fetch_inflight_and_pending_fetch.len() as f64;
106
107 metrics.hashes_pending_fetch.set(hashes_pending_fetch);
108 metrics.hashes_inflight_transaction_requests.set(total_hashes - hashes_pending_fetch);
109 }
110
111 #[inline]
112 fn update_pending_fetch_cache_search_metrics(&self, durations: TxFetcherSearchDurations) {
113 let metrics = &self.metrics;
114
115 let TxFetcherSearchDurations { find_idle_peer, fill_request } = durations;
116 metrics
117 .duration_find_idle_fallback_peer_for_any_pending_hash
118 .set(find_idle_peer.as_secs_f64());
119 metrics.duration_fill_request_from_hashes_pending_fetch.set(fill_request.as_secs_f64());
120 }
121
122 pub fn with_transaction_fetcher_config(config: &TransactionFetcherConfig) -> Self {
124 let TransactionFetcherConfig {
125 max_inflight_requests,
126 max_capacity_cache_txns_pending_fetch,
127 ..
128 } = *config;
129
130 let info = config.clone().into();
131
132 let metrics = TransactionFetcherMetrics::default();
133 metrics.capacity_inflight_requests.increment(max_inflight_requests as u64);
134
135 Self {
136 active_peers: LruMap::new(max_inflight_requests),
137 hashes_pending_fetch: LruCache::new(max_capacity_cache_txns_pending_fetch),
138 hashes_fetch_inflight_and_pending_fetch: LruMap::new(
139 max_inflight_requests + max_capacity_cache_txns_pending_fetch,
140 ),
141 info,
142 metrics,
143 ..Default::default()
144 }
145 }
146
147 #[inline]
149 pub fn remove_hashes_from_transaction_fetcher<'a, I>(&mut self, hashes: I)
150 where
151 I: IntoIterator<Item = &'a TxHash>,
152 {
153 for hash in hashes {
154 self.hashes_fetch_inflight_and_pending_fetch.remove(hash);
155 self.hashes_pending_fetch.remove(hash);
156 }
157 }
158
159 fn decrement_inflight_request_count_for(&mut self, peer_id: &PeerId) {
161 let remove = || -> bool {
162 if let Some(inflight_count) = self.active_peers.get(peer_id) {
163 *inflight_count = inflight_count.saturating_sub(1);
164 if *inflight_count == 0 {
165 return true
166 }
167 }
168 false
169 }();
170
171 if remove {
172 self.active_peers.remove(peer_id);
173 }
174 }
175
176 #[inline]
178 pub fn is_idle(&self, peer_id: &PeerId) -> bool {
179 let Some(inflight_count) = self.active_peers.peek(peer_id) else { return true };
180 if *inflight_count < self.info.max_inflight_requests_per_peer {
181 return true
182 }
183 false
184 }
185
186 pub fn get_idle_peer_for(&self, hash: TxHash) -> Option<&PeerId> {
188 let TxFetchMetadata { fallback_peers, .. } =
189 self.hashes_fetch_inflight_and_pending_fetch.peek(&hash)?;
190
191 fallback_peers.iter().find(|peer_id| self.is_idle(peer_id))
192 }
193
194 pub fn find_any_idle_fallback_peer_for_any_pending_hash(
200 &mut self,
201 hashes_to_request: &mut RequestTxHashes,
202 mut budget: Option<usize>, ) -> Option<PeerId> {
204 let mut hashes_pending_fetch_iter = self.hashes_pending_fetch.iter();
205
206 let idle_peer = loop {
207 let &hash = hashes_pending_fetch_iter.next()?;
208
209 let idle_peer = self.get_idle_peer_for(hash);
210
211 if idle_peer.is_some() {
212 hashes_to_request.insert(hash);
213 break idle_peer.copied()
214 }
215
216 if let Some(ref mut bud) = budget {
217 *bud = bud.saturating_sub(1);
218 if *bud == 0 {
219 return None
220 }
221 }
222 };
223 let hash = hashes_to_request.iter().next()?;
224
225 drop(hashes_pending_fetch_iter);
227 _ = self.hashes_pending_fetch.remove(hash);
228
229 idle_peer
230 }
231
232 pub fn pack_request(
237 &self,
238 hashes_to_request: &mut RequestTxHashes,
239 hashes_from_announcement: ValidAnnouncementData,
240 ) -> RequestTxHashes {
241 if hashes_from_announcement.msg_version().is_eth68() {
242 return self.pack_request_eth68(hashes_to_request, hashes_from_announcement)
243 }
244 self.pack_request_eth66(hashes_to_request, hashes_from_announcement)
245 }
246
247 pub fn pack_request_eth68(
258 &self,
259 hashes_to_request: &mut RequestTxHashes,
260 hashes_from_announcement: impl HandleMempoolData
261 + IntoIterator<Item = (TxHash, Option<(u8, usize)>)>,
262 ) -> RequestTxHashes {
263 let mut acc_size_response = 0;
264
265 let mut hashes_from_announcement_iter = hashes_from_announcement.into_iter();
266
267 if let Some((hash, Some((_ty, size)))) = hashes_from_announcement_iter.next() {
268 hashes_to_request.insert(hash);
269
270 if size >= self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request {
272 return hashes_from_announcement_iter.collect()
273 }
274 acc_size_response = size;
275 }
276
277 let mut surplus_hashes = RequestTxHashes::default();
278
279 for (hash, metadata) in hashes_from_announcement_iter.by_ref() {
282 let Some((_ty, size)) = metadata else {
283 unreachable!("this method is called upon reception of an eth68 announcement")
284 };
285
286 let next_acc_size = acc_size_response + size;
287
288 if next_acc_size <=
289 self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request
290 {
291 acc_size_response = next_acc_size;
294 _ = hashes_to_request.insert(hash)
295 } else {
296 _ = surplus_hashes.insert(hash)
297 }
298
299 let free_space =
300 self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request -
301 acc_size_response;
302
303 if free_space < MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED {
304 break
305 }
306 }
307
308 surplus_hashes.extend(hashes_from_announcement_iter.map(|(hash, _metadata)| hash));
309
310 surplus_hashes
311 }
312
313 pub fn pack_request_eth66(
320 &self,
321 hashes_to_request: &mut RequestTxHashes,
322 hashes_from_announcement: ValidAnnouncementData,
323 ) -> RequestTxHashes {
324 let (mut hashes, _version) = hashes_from_announcement.into_request_hashes();
325 if hashes.len() <= SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST {
326 *hashes_to_request = hashes;
327 hashes_to_request.shrink_to_fit();
328
329 RequestTxHashes::default()
330 } else {
331 let surplus_hashes =
332 hashes.retain_count(SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST);
333 *hashes_to_request = hashes;
334 hashes_to_request.shrink_to_fit();
335
336 surplus_hashes
337 }
338 }
339
340 pub fn try_buffer_hashes_for_retry(
342 &mut self,
343 mut hashes: RequestTxHashes,
344 peer_failed_to_serve: &PeerId,
345 ) {
346 hashes.retain(|hash| {
349 if let Some(entry) = self.hashes_fetch_inflight_and_pending_fetch.get(hash) {
350 entry.fallback_peers_mut().remove(peer_failed_to_serve);
351 return true
352 }
353 false
355 });
356
357 self.buffer_hashes(hashes, None)
358 }
359
360 pub fn num_pending_hashes(&self) -> usize {
362 self.hashes_pending_fetch.len()
363 }
364
365 pub fn num_all_hashes(&self) -> usize {
367 self.hashes_fetch_inflight_and_pending_fetch.len()
368 }
369
370 pub fn buffer_hashes(&mut self, hashes: RequestTxHashes, fallback_peer: Option<PeerId>) {
375 for hash in hashes {
376 if self.hashes_fetch_inflight_and_pending_fetch.peek(&hash).is_none() {
378 continue
379 }
380
381 let Some(TxFetchMetadata { retries, fallback_peers, .. }) =
382 self.hashes_fetch_inflight_and_pending_fetch.get(&hash)
383 else {
384 continue
385 };
386
387 if let Some(peer_id) = fallback_peer {
388 fallback_peers.insert(peer_id);
390 } else {
391 if *retries >= DEFAULT_MAX_RETRIES {
392 trace!(target: "net::tx",
393 %hash,
394 retries,
395 "retry limit for `GetPooledTransactions` requests reached for hash, dropping hash"
396 );
397
398 self.hashes_fetch_inflight_and_pending_fetch.remove(&hash);
399 self.hashes_pending_fetch.remove(&hash);
400 continue
401 }
402 *retries += 1;
403 }
404
405 if let (_, Some(evicted_hash)) = self.hashes_pending_fetch.insert_and_get_evicted(hash)
406 {
407 self.hashes_fetch_inflight_and_pending_fetch.remove(&evicted_hash);
408 }
409 }
410 }
411
412 pub fn on_fetch_pending_hashes(
417 &mut self,
418 peers: &HashMap<PeerId, PeerMetadata<N>>,
419 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
420 ) -> bool {
421 let mut hashes_to_request = RequestTxHashes::with_capacity(
422 DEFAULT_MARGINAL_COUNT_HASHES_GET_POOLED_TRANSACTIONS_REQUEST,
423 );
424 let mut search_durations = TxFetcherSearchDurations::default();
425
426 let budget_find_idle_fallback_peer = self
428 .search_breadth_budget_find_idle_fallback_peer(&has_capacity_wrt_pending_pool_imports);
429
430 let peer_id = duration_metered_exec!(
431 {
432 let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash(
433 &mut hashes_to_request,
434 budget_find_idle_fallback_peer,
435 ) else {
436 return false
438 };
439
440 peer_id
441 },
442 search_durations.find_idle_peer
443 );
444
445 let Some(peer) = peers.get(&peer_id) else {
448 self.buffer_hashes(hashes_to_request, None);
449 return false
450 };
451 let conn_eth_version = peer.version;
452
453 let budget_fill_request = self
458 .search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
459 &has_capacity_wrt_pending_pool_imports,
460 );
461
462 duration_metered_exec!(
463 {
464 self.fill_request_from_hashes_pending_fetch(
465 &mut hashes_to_request,
466 &peer.seen_transactions,
467 budget_fill_request,
468 )
469 },
470 search_durations.fill_request
471 );
472
473 self.update_pending_fetch_cache_search_metrics(search_durations);
474
475 trace!(target: "net::tx",
476 peer_id=format!("{peer_id:#}"),
477 hashes=?*hashes_to_request,
478 %conn_eth_version,
479 "requesting hashes that were stored pending fetch from peer"
480 );
481
482 if let Some(failed_to_request_hashes) =
484 self.request_transactions_from_peer(hashes_to_request, peer)
485 {
486 trace!(target: "net::tx",
487 peer_id=format!("{peer_id:#}"),
488 ?failed_to_request_hashes,
489 %conn_eth_version,
490 "failed sending request to peer's session, buffering hashes"
491 );
492
493 self.buffer_hashes(failed_to_request_hashes, Some(peer_id));
494 return false
495 }
496
497 true
498 }
499
500 pub fn filter_unseen_and_pending_hashes(
503 &mut self,
504 new_announced_hashes: &mut ValidAnnouncementData,
505 is_tx_bad_import: impl Fn(&TxHash) -> bool,
506 peer_id: &PeerId,
507 client_version: &str,
508 ) {
509 let mut previously_unseen_hashes_count = 0;
510
511 let msg_version = new_announced_hashes.msg_version();
512
513 new_announced_hashes.retain(|hash, metadata| {
515
516 if let Some(TxFetchMetadata{ tx_encoded_length: previously_seen_size, ..}) = self.hashes_fetch_inflight_and_pending_fetch.peek_mut(hash) {
518 if let Some((_ty, size)) = metadata {
520 if let Some(prev_size) = previously_seen_size {
521 if size != prev_size {
523 trace!(target: "net::tx",
524 peer_id=format!("{peer_id:#}"),
525 %hash,
526 size,
527 previously_seen_size,
528 %client_version,
529 "peer announced a different size for tx, this is especially worrying if one size is much bigger..."
530 );
531 }
532 }
533 *previously_seen_size = Some(*size);
535 }
536
537 if self.hashes_pending_fetch.remove(hash) {
539 return true
540 }
541
542 return false
543 }
544
545 if is_tx_bad_import(hash) {
548 return false
549 }
550
551 previously_unseen_hashes_count += 1;
552
553 if self.hashes_fetch_inflight_and_pending_fetch.get_or_insert(*hash, ||
554 TxFetchMetadata{retries: 0, fallback_peers: LruCache::new(DEFAULT_MAX_COUNT_FALLBACK_PEERS as u32), tx_encoded_length: None}
555 ).is_none() {
556
557 trace!(target: "net::tx",
558 peer_id=format!("{peer_id:#}"),
559 %hash,
560 ?msg_version,
561 %client_version,
562 "failed to cache new announced hash from peer in schnellru::LruMap, dropping hash"
563 );
564
565 return false
566 }
567 true
568 });
569
570 trace!(target: "net::tx",
571 peer_id=format!("{peer_id:#}"),
572 previously_unseen_hashes_count=previously_unseen_hashes_count,
573 msg_version=?msg_version,
574 client_version=%client_version,
575 "received previously unseen hashes in announcement from peer"
576 );
577 }
578
579 pub fn request_transactions_from_peer(
587 &mut self,
588 new_announced_hashes: RequestTxHashes,
589 peer: &PeerMetadata<N>,
590 ) -> Option<RequestTxHashes> {
591 let peer_id: PeerId = peer.request_tx.peer_id;
592 let conn_eth_version = peer.version;
593
594 if self.active_peers.len() >= self.info.max_inflight_requests {
595 trace!(target: "net::tx",
596 peer_id=format!("{peer_id:#}"),
597 hashes=?*new_announced_hashes,
598 %conn_eth_version,
599 max_inflight_transaction_requests=self.info.max_inflight_requests,
600 "limit for concurrent `GetPooledTransactions` requests reached, dropping request for hashes to peer"
601 );
602 return Some(new_announced_hashes)
603 }
604
605 let Some(inflight_count) = self.active_peers.get_or_insert(peer_id, || 0) else {
606 trace!(target: "net::tx",
607 peer_id=format!("{peer_id:#}"),
608 hashes=?*new_announced_hashes,
609 conn_eth_version=%conn_eth_version,
610 "failed to cache active peer in schnellru::LruMap, dropping request to peer"
611 );
612 return Some(new_announced_hashes)
613 };
614
615 if *inflight_count >= self.info.max_inflight_requests_per_peer {
616 trace!(target: "net::tx",
617 peer_id=format!("{peer_id:#}"),
618 hashes=?*new_announced_hashes,
619 %conn_eth_version,
620 max_concurrent_tx_reqs_per_peer=self.info.max_inflight_requests_per_peer,
621 "limit for concurrent `GetPooledTransactions` requests per peer reached"
622 );
623 return Some(new_announced_hashes)
624 }
625
626 #[cfg(debug_assertions)]
627 {
628 for hash in &new_announced_hashes {
629 if self.hashes_pending_fetch.contains(hash) {
630 tracing::debug!(target: "net::tx", "`{}` should have been taken out of buffer before packing in a request, breaks invariant `@hashes_pending_fetch` and `@inflight_requests`, `@hashes_fetch_inflight_and_pending_fetch` for `{}`: {:?}",
631 format!("{:?}", new_announced_hashes), format!("{:?}", new_announced_hashes),
633 new_announced_hashes.iter().map(|hash| {
634 let metadata = self.hashes_fetch_inflight_and_pending_fetch.get(hash);
635 (*hash, metadata.map(|m| (m.retries, m.tx_encoded_length)))
637 }).collect::<Vec<(TxHash, Option<(u8, Option<usize>)>)>>())
638 }
639 }
640 }
641
642 let (response, rx) = oneshot::channel();
643 let req = PeerRequest::GetPooledTransactions {
644 request: GetPooledTransactions(new_announced_hashes.iter().copied().collect()),
645 response,
646 };
647
648 if let Err(err) = peer.request_tx.try_send(req) {
650 return match err {
652 TrySendError::Full(_) | TrySendError::Closed(_) => {
653 self.metrics.egress_peer_channel_full.increment(1);
654 Some(new_announced_hashes)
655 }
656 }
657 }
658
659 *inflight_count += 1;
660 self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, new_announced_hashes, rx));
662
663 None
664 }
665
666 pub fn fill_request_from_hashes_pending_fetch(
686 &mut self,
687 hashes_to_request: &mut RequestTxHashes,
688 seen_hashes: &LruCache<TxHash>,
689 mut budget_fill_request: Option<usize>, ) {
691 let Some(hash) = hashes_to_request.iter().next() else { return };
692
693 let mut acc_size_response = self
694 .hashes_fetch_inflight_and_pending_fetch
695 .get(hash)
696 .and_then(|entry| entry.tx_encoded_len())
697 .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
698
699 if acc_size_response >=
701 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES
702 {
703 return
704 }
705
706 for hash in self.hashes_pending_fetch.iter() {
709 if !seen_hashes.contains(hash) {
711 continue
712 };
713
714 hashes_to_request.insert(*hash);
716
717 let size = self
719 .hashes_fetch_inflight_and_pending_fetch
720 .get(hash)
721 .and_then(|entry| entry.tx_encoded_len())
722 .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
723
724 acc_size_response += size;
725
726 if acc_size_response >=
730 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES ||
731 hashes_to_request.len() >
732 DEFAULT_SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST_ON_FETCH_PENDING_HASHES
733 {
734 break
735 }
736
737 if let Some(ref mut bud) = budget_fill_request {
738 *bud -= 1;
739 if *bud == 0 {
740 break
741 }
742 }
743 }
744
745 for hash in hashes_to_request.iter() {
747 self.hashes_pending_fetch.remove(hash);
748 }
749 }
750
751 pub fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
754 let info = &self.info;
755
756 self.has_capacity(info.max_inflight_requests)
757 }
758
759 fn has_capacity(&self, max_inflight_requests: usize) -> bool {
761 self.inflight_requests.len() <= max_inflight_requests
762 }
763
764 pub fn search_breadth_budget_find_idle_fallback_peer(
770 &self,
771 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
772 ) -> Option<usize> {
773 let info = &self.info;
774
775 let tx_fetcher_has_capacity = self.has_capacity(
776 info.max_inflight_requests /
777 DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_IDLE_PEER,
778 );
779 let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
780 DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_IDLE_PEER,
781 );
782
783 if tx_fetcher_has_capacity && tx_pool_has_capacity {
784 None
786 } else {
787 let limit = DEFAULT_BUDGET_FIND_IDLE_FALLBACK_PEER;
789
790 trace!(target: "net::tx",
791 inflight_requests=self.inflight_requests.len(),
792 max_inflight_transaction_requests=info.max_inflight_requests,
793 hashes_pending_fetch=self.hashes_pending_fetch.len(),
794 limit,
795 "search breadth limited in search for idle fallback peer for some hash pending fetch"
796 );
797
798 Some(limit)
799 }
800 }
801
802 pub fn search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
809 &self,
810 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
811 ) -> Option<usize> {
812 let info = &self.info;
813
814 let tx_fetcher_has_capacity = self.has_capacity(
815 info.max_inflight_requests /
816 DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_INTERSECTION,
817 );
818 let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
819 DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_INTERSECTION,
820 );
821
822 if tx_fetcher_has_capacity && tx_pool_has_capacity {
823 None
825 } else {
826 let limit = DEFAULT_BUDGET_FIND_INTERSECTION_ANNOUNCED_BY_PEER_AND_PENDING_FETCH;
828
829 trace!(target: "net::tx",
830 inflight_requests=self.inflight_requests.len(),
831 max_inflight_transaction_requests=self.info.max_inflight_requests,
832 hashes_pending_fetch=self.hashes_pending_fetch.len(),
833 limit=limit,
834 "search breadth limited in search for intersection of hashes announced by peer and hashes pending fetch"
835 );
836
837 Some(limit)
838 }
839 }
840
841 pub fn on_resolved_get_pooled_transactions_request_fut(
845 &mut self,
846 response: GetPooledTxResponse<N::PooledTransaction>,
847 ) -> FetchEvent<N::PooledTransaction> {
848 let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response;
851
852 self.decrement_inflight_request_count_for(&peer_id);
853
854 match result {
855 Ok(Ok(transactions)) => {
856 if transactions.is_empty() {
861 trace!(target: "net::tx",
862 peer_id=format!("{peer_id:#}"),
863 requested_hashes_len=requested_hashes.len(),
864 "received empty `PooledTransactions` response from peer, peer failed to serve hashes it announced"
865 );
866
867 return FetchEvent::EmptyResponse { peer_id }
868 }
869
870 let payload = UnverifiedPooledTransactions::new(transactions);
874
875 let unverified_len = payload.len();
876 let (verification_outcome, verified_payload) =
877 payload.verify(&requested_hashes, &peer_id);
878
879 let unsolicited = unverified_len - verified_payload.len();
880 if unsolicited > 0 {
881 self.metrics.unsolicited_transactions.increment(unsolicited as u64);
882 }
883
884 let report_peer = if verification_outcome == VerificationOutcome::ReportPeer {
885 trace!(target: "net::tx",
886 peer_id=format!("{peer_id:#}"),
887 unverified_len,
888 verified_payload_len=verified_payload.len(),
889 "received `PooledTransactions` response from peer with entries that didn't verify against request, filtered out transactions"
890 );
891 true
892 } else {
893 false
894 };
895
896 if verified_payload.is_empty() {
898 return FetchEvent::FetchError { peer_id, error: RequestError::BadResponse }
899 }
900
901 let unvalidated_payload_len = verified_payload.len();
905
906 let valid_payload = verified_payload.dedup();
907
908 if valid_payload.len() != unvalidated_payload_len {
914 trace!(target: "net::tx",
915 peer_id=format!("{peer_id:#}"),
916 unvalidated_payload_len,
917 valid_payload_len=valid_payload.len(),
918 "received `PooledTransactions` response from peer with duplicate entries, filtered them out"
919 );
920 }
921 let requested_hashes_len = requested_hashes.len();
929 let mut fetched = Vec::with_capacity(valid_payload.len());
930 requested_hashes.retain(|requested_hash| {
931 if valid_payload.contains_key(requested_hash) {
932 fetched.push(*requested_hash);
934 return false
935 }
936 true
937 });
938 fetched.shrink_to_fit();
939 self.metrics.fetched_transactions.increment(fetched.len() as u64);
940
941 if fetched.len() < requested_hashes_len {
942 trace!(target: "net::tx",
943 peer_id=format!("{peer_id:#}"),
944 requested_hashes_len=requested_hashes_len,
945 fetched_len=fetched.len(),
946 "peer failed to serve hashes it announced"
947 );
948 }
949
950 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
954
955 let transactions = valid_payload.into_data().into_values().collect();
956
957 FetchEvent::TransactionsFetched { peer_id, transactions, report_peer }
958 }
959 Ok(Err(req_err)) => {
960 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
961 FetchEvent::FetchError { peer_id, error: req_err }
962 }
963 Err(_) => {
964 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
965 FetchEvent::FetchError { peer_id, error: RequestError::ChannelClosed }
967 }
968 }
969 }
970}
971
972impl<N: NetworkPrimitives> Stream for TransactionFetcher<N> {
973 type Item = FetchEvent<N::PooledTransaction>;
974
975 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
977 if self.inflight_requests.is_empty() {
980 return Poll::Pending
981 }
982
983 if let Some(resp) = ready!(self.inflight_requests.poll_next_unpin(cx)) {
984 return Poll::Ready(Some(self.on_resolved_get_pooled_transactions_request_fut(resp)))
985 }
986
987 Poll::Pending
988 }
989}
990
991impl<T: NetworkPrimitives> Default for TransactionFetcher<T> {
992 fn default() -> Self {
993 Self {
994 active_peers: LruMap::new(DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS),
995 inflight_requests: Default::default(),
996 hashes_pending_fetch: LruCache::new(DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH),
997 hashes_fetch_inflight_and_pending_fetch: LruMap::new(
998 DEFAULT_MAX_CAPACITY_CACHE_INFLIGHT_AND_PENDING_FETCH,
999 ),
1000 info: TransactionFetcherInfo::default(),
1001 metrics: Default::default(),
1002 }
1003 }
1004}
1005
1006#[derive(Debug, Constructor)]
1008pub struct TxFetchMetadata {
1009 retries: u8,
1011 fallback_peers: LruCache<PeerId>,
1013 tx_encoded_length: Option<usize>,
1018}
1019
1020impl TxFetchMetadata {
1021 pub const fn fallback_peers_mut(&mut self) -> &mut LruCache<PeerId> {
1023 &mut self.fallback_peers
1024 }
1025
1026 pub const fn tx_encoded_len(&self) -> Option<usize> {
1031 self.tx_encoded_length
1032 }
1033}
1034
1035#[derive(Debug)]
1037pub enum FetchEvent<T = PooledTransaction> {
1038 TransactionsFetched {
1040 peer_id: PeerId,
1042 transactions: PooledTransactions<T>,
1044 report_peer: bool,
1047 },
1048 FetchError {
1050 peer_id: PeerId,
1052 error: RequestError,
1054 },
1055 EmptyResponse {
1057 peer_id: PeerId,
1059 },
1060}
1061
1062#[derive(Debug)]
1064pub struct GetPooledTxRequest<T = PooledTransaction> {
1065 peer_id: PeerId,
1066 requested_hashes: RequestTxHashes,
1068 response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1069}
1070
1071#[derive(Debug)]
1074pub struct GetPooledTxResponse<T = PooledTransaction> {
1075 peer_id: PeerId,
1076 requested_hashes: RequestTxHashes,
1079 result: Result<RequestResult<PooledTransactions<T>>, RecvError>,
1080}
1081
1082#[must_use = "futures do nothing unless polled"]
1085#[pin_project::pin_project]
1086#[derive(Debug)]
1087pub struct GetPooledTxRequestFut<T = PooledTransaction> {
1088 #[pin]
1089 inner: Option<GetPooledTxRequest<T>>,
1090}
1091
1092impl<T> GetPooledTxRequestFut<T> {
1093 #[inline]
1094 const fn new(
1095 peer_id: PeerId,
1096 requested_hashes: RequestTxHashes,
1097 response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1098 ) -> Self {
1099 Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) }
1100 }
1101}
1102
1103impl<T> Future for GetPooledTxRequestFut<T> {
1104 type Output = GetPooledTxResponse<T>;
1105
1106 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1107 let mut req = self.as_mut().project().inner.take().expect("polled after completion");
1108 match req.response.poll_unpin(cx) {
1109 Poll::Ready(result) => Poll::Ready(GetPooledTxResponse {
1110 peer_id: req.peer_id,
1111 requested_hashes: req.requested_hashes,
1112 result,
1113 }),
1114 Poll::Pending => {
1115 self.project().inner.set(Some(req));
1116 Poll::Pending
1117 }
1118 }
1119 }
1120}
1121
1122#[derive(Debug, Constructor, Deref)]
1124pub struct UnverifiedPooledTransactions<T> {
1125 txns: PooledTransactions<T>,
1126}
1127
1128#[derive(Debug, Constructor, Deref)]
1130pub struct VerifiedPooledTransactions<T> {
1131 txns: PooledTransactions<T>,
1132}
1133
1134impl<T: SignedTransaction> DedupPayload for VerifiedPooledTransactions<T> {
1135 type Value = T;
1136
1137 fn is_empty(&self) -> bool {
1138 self.txns.is_empty()
1139 }
1140
1141 fn len(&self) -> usize {
1142 self.txns.len()
1143 }
1144
1145 fn dedup(self) -> PartiallyValidData<Self::Value> {
1146 PartiallyValidData::from_raw_data(
1147 self.txns.into_iter().map(|tx| (*tx.tx_hash(), tx)).collect(),
1148 None,
1149 )
1150 }
1151}
1152
1153trait VerifyPooledTransactionsResponse {
1154 type Transaction: SignedTransaction;
1155
1156 fn verify(
1157 self,
1158 requested_hashes: &RequestTxHashes,
1159 peer_id: &PeerId,
1160 ) -> (VerificationOutcome, VerifiedPooledTransactions<Self::Transaction>);
1161}
1162
1163impl<T: SignedTransaction> VerifyPooledTransactionsResponse for UnverifiedPooledTransactions<T> {
1164 type Transaction = T;
1165
1166 fn verify(
1167 self,
1168 requested_hashes: &RequestTxHashes,
1169 _peer_id: &PeerId,
1170 ) -> (VerificationOutcome, VerifiedPooledTransactions<T>) {
1171 let mut verification_outcome = VerificationOutcome::Ok;
1172
1173 let Self { mut txns } = self;
1174
1175 #[cfg(debug_assertions)]
1176 let mut tx_hashes_not_requested: smallvec::SmallVec<[TxHash; 16]> = smallvec::smallvec!();
1177 #[cfg(not(debug_assertions))]
1178 let mut tx_hashes_not_requested_count = 0;
1179
1180 txns.0.retain(|tx| {
1181 if !requested_hashes.contains(tx.tx_hash()) {
1182 verification_outcome = VerificationOutcome::ReportPeer;
1183
1184 #[cfg(debug_assertions)]
1185 tx_hashes_not_requested.push(*tx.tx_hash());
1186 #[cfg(not(debug_assertions))]
1187 {
1188 tx_hashes_not_requested_count += 1;
1189 }
1190
1191 return false
1192 }
1193 true
1194 });
1195
1196 #[cfg(debug_assertions)]
1197 if !tx_hashes_not_requested.is_empty() {
1198 trace!(target: "net::tx",
1199 peer_id=format!("{_peer_id:#}"),
1200 ?tx_hashes_not_requested,
1201 "transactions in `PooledTransactions` response from peer were not requested"
1202 );
1203 }
1204 #[cfg(not(debug_assertions))]
1205 if tx_hashes_not_requested_count != 0 {
1206 trace!(target: "net::tx",
1207 peer_id=format!("{_peer_id:#}"),
1208 tx_hashes_not_requested_count,
1209 "transactions in `PooledTransactions` response from peer were not requested"
1210 );
1211 }
1212
1213 (verification_outcome, VerifiedPooledTransactions::new(txns))
1214 }
1215}
1216
1217#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1220pub enum VerificationOutcome {
1221 Ok,
1223 ReportPeer,
1226}
1227
1228#[derive(Debug, Constructor)]
1230pub struct TransactionFetcherInfo {
1231 pub max_inflight_requests: usize,
1233 pub max_inflight_requests_per_peer: u8,
1235 pub soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
1239 pub soft_limit_byte_size_pooled_transactions_response: usize,
1242 pub max_capacity_cache_txns_pending_fetch: u32,
1246}
1247
1248impl Default for TransactionFetcherInfo {
1249 fn default() -> Self {
1250 Self::new(
1251 DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS as usize,
1252 DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
1253 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
1254 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
1255 DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
1256 )
1257 }
1258}
1259
1260impl From<TransactionFetcherConfig> for TransactionFetcherInfo {
1261 fn from(config: TransactionFetcherConfig) -> Self {
1262 let TransactionFetcherConfig {
1263 max_inflight_requests,
1264 max_inflight_requests_per_peer,
1265 soft_limit_byte_size_pooled_transactions_response,
1266 soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1267 max_capacity_cache_txns_pending_fetch,
1268 } = config;
1269
1270 Self::new(
1271 max_inflight_requests as usize,
1272 max_inflight_requests_per_peer,
1273 soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1274 soft_limit_byte_size_pooled_transactions_response,
1275 max_capacity_cache_txns_pending_fetch,
1276 )
1277 }
1278}
1279
1280#[derive(Debug, Default)]
1281struct TxFetcherSearchDurations {
1282 find_idle_peer: Duration,
1283 fill_request: Duration,
1284}
1285
1286#[cfg(test)]
1287mod test {
1288 use super::*;
1289 use crate::test_utils::transactions::{buffer_hash_to_tx_fetcher, new_mock_session};
1290 use alloy_primitives::{hex, B256};
1291 use alloy_rlp::Decodable;
1292 use derive_more::IntoIterator;
1293 use reth_eth_wire_types::EthVersion;
1294 use reth_ethereum_primitives::TransactionSigned;
1295 use std::{collections::HashSet, str::FromStr};
1296
1297 #[derive(IntoIterator)]
1298 struct TestValidAnnouncementData(Vec<(TxHash, Option<(u8, usize)>)>);
1299
1300 impl HandleMempoolData for TestValidAnnouncementData {
1301 fn is_empty(&self) -> bool {
1302 self.0.is_empty()
1303 }
1304
1305 fn len(&self) -> usize {
1306 self.0.len()
1307 }
1308
1309 fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
1310 self.0.retain(|(hash, _)| f(hash))
1311 }
1312 }
1313
1314 impl HandleVersionedMempoolData for TestValidAnnouncementData {
1315 fn msg_version(&self) -> EthVersion {
1316 EthVersion::Eth68
1317 }
1318 }
1319
1320 #[test]
1321 fn pack_eth68_request() {
1322 reth_tracing::init_test_tracing();
1323
1324 let tx_fetcher = &mut TransactionFetcher::<EthNetworkPrimitives>::default();
1327
1328 let eth68_hashes = [
1329 B256::from_slice(&[1; 32]),
1330 B256::from_slice(&[2; 32]),
1331 B256::from_slice(&[3; 32]),
1332 B256::from_slice(&[4; 32]),
1333 B256::from_slice(&[5; 32]),
1334 ];
1335 let eth68_sizes = [
1336 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ - MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED - 1, DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ, 2, 9,
1340 0,
1341 ];
1342
1343 let expected_request_hashes =
1344 [eth68_hashes[0], eth68_hashes[2]].into_iter().collect::<HashSet<_>>();
1345
1346 let expected_surplus_hashes =
1347 [eth68_hashes[1], eth68_hashes[3], eth68_hashes[4]].into_iter().collect::<HashSet<_>>();
1348
1349 let mut eth68_hashes_to_request = RequestTxHashes::with_capacity(3);
1350
1351 let valid_announcement_data = TestValidAnnouncementData(
1352 eth68_hashes
1353 .into_iter()
1354 .zip(eth68_sizes)
1355 .map(|(hash, size)| (hash, Some((0u8, size))))
1356 .collect::<Vec<_>>(),
1357 );
1358
1359 let surplus_eth68_hashes =
1362 tx_fetcher.pack_request_eth68(&mut eth68_hashes_to_request, valid_announcement_data);
1363
1364 let eth68_hashes_to_request = eth68_hashes_to_request.into_iter().collect::<HashSet<_>>();
1365 let surplus_eth68_hashes = surplus_eth68_hashes.into_iter().collect::<HashSet<_>>();
1366
1367 assert_eq!(expected_request_hashes, eth68_hashes_to_request);
1368 assert_eq!(expected_surplus_hashes, surplus_eth68_hashes);
1369 }
1370
1371 #[tokio::test]
1372 async fn test_on_fetch_pending_hashes() {
1373 reth_tracing::init_test_tracing();
1374
1375 let tx_fetcher = &mut TransactionFetcher::default();
1376
1377 let seen_hashes = [
1381 B256::from_slice(&[1; 32]),
1382 B256::from_slice(&[2; 32]),
1383 B256::from_slice(&[3; 32]),
1384 B256::from_slice(&[4; 32]),
1385 ];
1386 let seen_eth68_hashes_sizes = [120, 158, 116];
1391
1392 let peer_1 = PeerId::new([1; 64]);
1394 let peer_2 = PeerId::new([2; 64]);
1396
1397 let (mut peer_1_data, mut peer_1_mock_session_rx) =
1401 new_mock_session(peer_1, EthVersion::Eth66);
1402 for hash in &seen_hashes {
1403 peer_1_data.seen_transactions.insert(*hash);
1404 }
1405 let (mut peer_2_data, _) = new_mock_session(peer_2, EthVersion::Eth66);
1406 for hash in &seen_hashes {
1407 peer_2_data.seen_transactions.insert(*hash);
1408 }
1409 let mut peers = HashMap::default();
1410 peers.insert(peer_1, peer_1_data);
1411 peers.insert(peer_2, peer_2_data);
1412
1413 for i in 0..3 {
1415 buffer_hash_to_tx_fetcher(
1417 tx_fetcher,
1418 seen_hashes[i],
1419 peer_2,
1420 0,
1421 Some(seen_eth68_hashes_sizes[i]),
1422 );
1423 }
1424 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[3], peer_2, 0, None);
1425
1426 let hash_other = B256::from_slice(&[5; 32]);
1428 buffer_hash_to_tx_fetcher(tx_fetcher, hash_other, peer_2, 0, None);
1429
1430 for hash in &seen_hashes {
1432 buffer_hash_to_tx_fetcher(tx_fetcher, *hash, peer_1, 0, None);
1433 }
1434
1435 assert_eq!(tx_fetcher.num_pending_hashes(), 5);
1437
1438 tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
1441
1442 let req = peer_1_mock_session_rx
1444 .recv()
1445 .await
1446 .expect("peer session should receive request with buffered hashes");
1447 let PeerRequest::GetPooledTransactions { request, .. } = req else { unreachable!() };
1448 let GetPooledTransactions(requested_hashes) = request;
1449
1450 assert_eq!(
1451 requested_hashes.into_iter().collect::<HashSet<_>>(),
1452 seen_hashes.into_iter().collect::<HashSet<_>>()
1453 )
1454 }
1455
1456 #[test]
1457 fn on_fetch_pending_hashes_rebuffers_on_disconnected_peer() {
1458 let tx_fetcher = &mut TransactionFetcher::default();
1459 let peer_1 = PeerId::new([1; 64]);
1460 let peer_2 = PeerId::new([2; 64]);
1461 let hash_1 = B256::from_slice(&[1; 32]);
1462
1463 buffer_hash_to_tx_fetcher(tx_fetcher, hash_1, peer_1, 0, Some(128));
1464 buffer_hash_to_tx_fetcher(tx_fetcher, hash_1, peer_2, 0, Some(128));
1465
1466 assert_eq!(tx_fetcher.num_pending_hashes(), 1);
1467
1468 let peers = HashMap::new();
1470 tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
1471
1472 assert_eq!(tx_fetcher.num_pending_hashes(), 1);
1474 }
1475
1476 #[test]
1477 fn verify_response_hashes() {
1478 let input = hex!(
1479 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598daa"
1480 );
1481 let signed_tx_1: PooledTransaction =
1482 TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1483 let input = hex!(
1484 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
1485 );
1486 let signed_tx_2: PooledTransaction =
1487 TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1488
1489 let request_hashes = [
1491 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e67890")
1492 .unwrap(),
1493 *signed_tx_1.hash(),
1494 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e12345")
1495 .unwrap(),
1496 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4edabe3")
1497 .unwrap(),
1498 ];
1499
1500 for hash in &request_hashes {
1501 assert_ne!(hash, signed_tx_2.hash())
1502 }
1503
1504 let request_hashes = RequestTxHashes::new(request_hashes.into_iter().collect());
1505
1506 let response_txns = PooledTransactions(vec![signed_tx_1.clone(), signed_tx_2]);
1508 let payload = UnverifiedPooledTransactions::new(response_txns);
1509
1510 let (outcome, verified_payload) = payload.verify(&request_hashes, &PeerId::ZERO);
1511
1512 assert_eq!(VerificationOutcome::ReportPeer, outcome);
1513 assert_eq!(1, verified_payload.len());
1514 assert!(verified_payload.contains(&signed_tx_1));
1515 }
1516}