1use super::{
29 config::TransactionFetcherConfig,
30 constants::{tx_fetcher::*, SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST},
31 PeerMetadata, PooledTransactions, SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
32};
33use crate::{
34 cache::{LruCache, LruMap},
35 duration_metered_exec,
36 metrics::TransactionFetcherMetrics,
37};
38use alloy_consensus::transaction::PooledTransaction;
39use alloy_primitives::TxHash;
40use derive_more::{Constructor, Deref};
41use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
42use pin_project::pin_project;
43use reth_eth_wire::{
44 DedupPayload, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
45 PartiallyValidData, RequestTxHashes, ValidAnnouncementData,
46};
47use reth_eth_wire_types::{EthNetworkPrimitives, NetworkPrimitives};
48use reth_network_api::PeerRequest;
49use reth_network_p2p::error::{RequestError, RequestResult};
50use reth_network_peers::PeerId;
51use reth_primitives_traits::SignedTransaction;
52use schnellru::ByLength;
53use std::{
54 collections::HashMap,
55 pin::Pin,
56 task::{ready, Context, Poll},
57 time::Duration,
58};
59use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
60use tracing::trace;
61
62#[derive(Debug)]
67#[pin_project]
68pub struct TransactionFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
69 pub active_peers: LruMap<PeerId, u8, ByLength>,
71 #[pin]
77 pub inflight_requests: FuturesUnordered<GetPooledTxRequestFut<N::PooledTransaction>>,
78 pub hashes_pending_fetch: LruCache<TxHash>,
83 pub hashes_fetch_inflight_and_pending_fetch: LruMap<TxHash, TxFetchMetadata, ByLength>,
85 pub info: TransactionFetcherInfo,
87 #[doc(hidden)]
88 metrics: TransactionFetcherMetrics,
89}
90
91impl<N: NetworkPrimitives> TransactionFetcher<N> {
92 pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) {
94 self.active_peers.remove(peer_id);
95 }
96
97 #[inline]
99 pub fn update_metrics(&self) {
100 let metrics = &self.metrics;
101
102 metrics.inflight_transaction_requests.set(self.inflight_requests.len() as f64);
103
104 let hashes_pending_fetch = self.hashes_pending_fetch.len() as f64;
105 let total_hashes = self.hashes_fetch_inflight_and_pending_fetch.len() as f64;
106
107 metrics.hashes_pending_fetch.set(hashes_pending_fetch);
108 metrics.hashes_inflight_transaction_requests.set(total_hashes - hashes_pending_fetch);
109 }
110
111 #[inline]
112 fn update_pending_fetch_cache_search_metrics(&self, durations: TxFetcherSearchDurations) {
113 let metrics = &self.metrics;
114
115 let TxFetcherSearchDurations { find_idle_peer, fill_request } = durations;
116 metrics
117 .duration_find_idle_fallback_peer_for_any_pending_hash
118 .set(find_idle_peer.as_secs_f64());
119 metrics.duration_fill_request_from_hashes_pending_fetch.set(fill_request.as_secs_f64());
120 }
121
122 pub fn with_transaction_fetcher_config(config: &TransactionFetcherConfig) -> Self {
124 let TransactionFetcherConfig {
125 max_inflight_requests,
126 max_capacity_cache_txns_pending_fetch,
127 ..
128 } = *config;
129
130 let info = config.clone().into();
131
132 let metrics = TransactionFetcherMetrics::default();
133 metrics.capacity_inflight_requests.increment(max_inflight_requests as u64);
134
135 Self {
136 active_peers: LruMap::new(max_inflight_requests),
137 hashes_pending_fetch: LruCache::new(max_capacity_cache_txns_pending_fetch),
138 hashes_fetch_inflight_and_pending_fetch: LruMap::new(
139 max_inflight_requests + max_capacity_cache_txns_pending_fetch,
140 ),
141 info,
142 metrics,
143 ..Default::default()
144 }
145 }
146
147 #[inline]
149 pub fn remove_hashes_from_transaction_fetcher<'a, I>(&mut self, hashes: I)
150 where
151 I: IntoIterator<Item = &'a TxHash>,
152 {
153 for hash in hashes {
154 self.hashes_fetch_inflight_and_pending_fetch.remove(hash);
155 self.hashes_pending_fetch.remove(hash);
156 }
157 }
158
159 fn decrement_inflight_request_count_for(&mut self, peer_id: &PeerId) {
161 let remove = || -> bool {
162 if let Some(inflight_count) = self.active_peers.get(peer_id) {
163 *inflight_count = inflight_count.saturating_sub(1);
164 if *inflight_count == 0 {
165 return true
166 }
167 }
168 false
169 }();
170
171 if remove {
172 self.active_peers.remove(peer_id);
173 }
174 }
175
176 #[inline]
178 pub fn is_idle(&self, peer_id: &PeerId) -> bool {
179 let Some(inflight_count) = self.active_peers.peek(peer_id) else { return true };
180 if *inflight_count < self.info.max_inflight_requests_per_peer {
181 return true
182 }
183 false
184 }
185
186 pub fn get_idle_peer_for(&self, hash: TxHash) -> Option<&PeerId> {
188 let TxFetchMetadata { fallback_peers, .. } =
189 self.hashes_fetch_inflight_and_pending_fetch.peek(&hash)?;
190
191 fallback_peers.iter().find(|peer_id| self.is_idle(peer_id))
192 }
193
194 pub fn find_any_idle_fallback_peer_for_any_pending_hash(
200 &mut self,
201 hashes_to_request: &mut RequestTxHashes,
202 mut budget: Option<usize>, ) -> Option<PeerId> {
204 let mut hashes_pending_fetch_iter = self.hashes_pending_fetch.iter();
205
206 let idle_peer = loop {
207 let &hash = hashes_pending_fetch_iter.next()?;
208
209 let idle_peer = self.get_idle_peer_for(hash);
210
211 if idle_peer.is_some() {
212 hashes_to_request.insert(hash);
213 break idle_peer.copied()
214 }
215
216 if let Some(ref mut bud) = budget {
217 *bud = bud.saturating_sub(1);
218 if *bud == 0 {
219 return None
220 }
221 }
222 };
223 let hash = hashes_to_request.iter().next()?;
224
225 drop(hashes_pending_fetch_iter);
227 _ = self.hashes_pending_fetch.remove(hash);
228
229 idle_peer
230 }
231
232 pub fn pack_request(
237 &self,
238 hashes_to_request: &mut RequestTxHashes,
239 hashes_from_announcement: ValidAnnouncementData,
240 ) -> RequestTxHashes {
241 if hashes_from_announcement.msg_version().is_eth68() {
242 return self.pack_request_eth68(hashes_to_request, hashes_from_announcement)
243 }
244 self.pack_request_eth66(hashes_to_request, hashes_from_announcement)
245 }
246
247 pub fn pack_request_eth68(
258 &self,
259 hashes_to_request: &mut RequestTxHashes,
260 hashes_from_announcement: impl HandleMempoolData
261 + IntoIterator<Item = (TxHash, Option<(u8, usize)>)>,
262 ) -> RequestTxHashes {
263 let mut acc_size_response = 0;
264
265 let mut hashes_from_announcement_iter = hashes_from_announcement.into_iter();
266
267 if let Some((hash, Some((_ty, size)))) = hashes_from_announcement_iter.next() {
268 hashes_to_request.insert(hash);
269
270 if size >= self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request {
272 return hashes_from_announcement_iter.collect()
273 }
274 acc_size_response = size;
275 }
276
277 let mut surplus_hashes = RequestTxHashes::default();
278
279 for (hash, metadata) in hashes_from_announcement_iter.by_ref() {
282 let Some((_ty, size)) = metadata else {
283 unreachable!("this method is called upon reception of an eth68 announcement")
284 };
285
286 let next_acc_size = acc_size_response.checked_add(size).filter(|next_acc_size| {
287 *next_acc_size <=
288 self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request
289 });
290
291 if let Some(next_acc_size) = next_acc_size {
292 acc_size_response = next_acc_size;
295 _ = hashes_to_request.insert(hash)
296 } else {
297 _ = surplus_hashes.insert(hash)
298 }
299
300 let free_space =
301 self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request -
302 acc_size_response;
303
304 if free_space < MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED {
305 break
306 }
307 }
308
309 surplus_hashes.extend(hashes_from_announcement_iter.map(|(hash, _metadata)| hash));
310
311 surplus_hashes
312 }
313
314 pub fn pack_request_eth66(
321 &self,
322 hashes_to_request: &mut RequestTxHashes,
323 hashes_from_announcement: ValidAnnouncementData,
324 ) -> RequestTxHashes {
325 let (mut hashes, _version) = hashes_from_announcement.into_request_hashes();
326 if hashes.len() <= SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST {
327 *hashes_to_request = hashes;
328 hashes_to_request.shrink_to_fit();
329
330 RequestTxHashes::default()
331 } else {
332 let surplus_hashes =
333 hashes.retain_count(SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST);
334 *hashes_to_request = hashes;
335 hashes_to_request.shrink_to_fit();
336
337 surplus_hashes
338 }
339 }
340
341 pub fn try_buffer_hashes_for_retry(
343 &mut self,
344 mut hashes: RequestTxHashes,
345 peer_failed_to_serve: &PeerId,
346 ) {
347 hashes.retain(|hash| {
350 if let Some(entry) = self.hashes_fetch_inflight_and_pending_fetch.get(hash) {
351 entry.fallback_peers_mut().remove(peer_failed_to_serve);
352 return true
353 }
354 false
356 });
357
358 self.buffer_hashes(hashes, None)
359 }
360
361 pub fn num_pending_hashes(&self) -> usize {
363 self.hashes_pending_fetch.len()
364 }
365
366 pub fn num_all_hashes(&self) -> usize {
368 self.hashes_fetch_inflight_and_pending_fetch.len()
369 }
370
371 pub fn buffer_hashes(&mut self, hashes: RequestTxHashes, fallback_peer: Option<PeerId>) {
376 for hash in hashes {
377 if self.hashes_fetch_inflight_and_pending_fetch.peek(&hash).is_none() {
379 continue
380 }
381
382 let Some(TxFetchMetadata { retries, fallback_peers, .. }) =
383 self.hashes_fetch_inflight_and_pending_fetch.get(&hash)
384 else {
385 continue
386 };
387
388 if let Some(peer_id) = fallback_peer {
389 fallback_peers.insert(peer_id);
391 } else {
392 if *retries >= DEFAULT_MAX_RETRIES {
393 trace!(target: "net::tx",
394 %hash,
395 retries,
396 "retry limit for `GetPooledTransactions` requests reached for hash, dropping hash"
397 );
398
399 self.hashes_fetch_inflight_and_pending_fetch.remove(&hash);
400 self.hashes_pending_fetch.remove(&hash);
401 continue
402 }
403 *retries += 1;
404 }
405
406 if let (_, Some(evicted_hash)) = self.hashes_pending_fetch.insert_and_get_evicted(hash)
407 {
408 self.hashes_fetch_inflight_and_pending_fetch.remove(&evicted_hash);
409 }
410 }
411 }
412
413 pub fn on_fetch_pending_hashes(
418 &mut self,
419 peers: &HashMap<PeerId, PeerMetadata<N>>,
420 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
421 ) -> bool {
422 let mut hashes_to_request = RequestTxHashes::with_capacity(
423 DEFAULT_MARGINAL_COUNT_HASHES_GET_POOLED_TRANSACTIONS_REQUEST,
424 );
425 let mut search_durations = TxFetcherSearchDurations::default();
426
427 let budget_find_idle_fallback_peer = self
429 .search_breadth_budget_find_idle_fallback_peer(&has_capacity_wrt_pending_pool_imports);
430
431 let peer_id = duration_metered_exec!(
432 {
433 let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash(
434 &mut hashes_to_request,
435 budget_find_idle_fallback_peer,
436 ) else {
437 return false
439 };
440
441 peer_id
442 },
443 search_durations.find_idle_peer
444 );
445
446 let Some(peer) = peers.get(&peer_id) else {
449 self.buffer_hashes(hashes_to_request, None);
450 return false
451 };
452 let conn_eth_version = peer.version;
453
454 let budget_fill_request = self
459 .search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
460 &has_capacity_wrt_pending_pool_imports,
461 );
462
463 duration_metered_exec!(
464 {
465 self.fill_request_from_hashes_pending_fetch(
466 &mut hashes_to_request,
467 &peer.seen_transactions,
468 budget_fill_request,
469 )
470 },
471 search_durations.fill_request
472 );
473
474 self.update_pending_fetch_cache_search_metrics(search_durations);
475
476 trace!(target: "net::tx",
477 peer_id=format!("{peer_id:#}"),
478 hashes=?*hashes_to_request,
479 %conn_eth_version,
480 "requesting hashes that were stored pending fetch from peer"
481 );
482
483 if let Some(failed_to_request_hashes) =
485 self.request_transactions_from_peer(hashes_to_request, peer)
486 {
487 trace!(target: "net::tx",
488 peer_id=format!("{peer_id:#}"),
489 ?failed_to_request_hashes,
490 %conn_eth_version,
491 "failed sending request to peer's session, buffering hashes"
492 );
493
494 self.buffer_hashes(failed_to_request_hashes, Some(peer_id));
495 return false
496 }
497
498 true
499 }
500
501 pub fn filter_unseen_and_pending_hashes(
504 &mut self,
505 new_announced_hashes: &mut ValidAnnouncementData,
506 is_tx_bad_import: impl Fn(&TxHash) -> bool,
507 peer_id: &PeerId,
508 client_version: &str,
509 ) {
510 let mut previously_unseen_hashes_count = 0;
511
512 let msg_version = new_announced_hashes.msg_version();
513
514 new_announced_hashes.retain(|hash, metadata| {
516
517 if let Some(TxFetchMetadata{ tx_encoded_length: previously_seen_size, ..}) = self.hashes_fetch_inflight_and_pending_fetch.peek_mut(hash) {
519 if let Some((_ty, size)) = metadata {
521 if let Some(prev_size) = previously_seen_size {
522 if size != prev_size {
524 trace!(target: "net::tx",
525 peer_id=format!("{peer_id:#}"),
526 %hash,
527 size,
528 previously_seen_size,
529 %client_version,
530 "peer announced a different size for tx, this is especially worrying if one size is much bigger..."
531 );
532 }
533 }
534 *previously_seen_size = Some(*size);
536 }
537
538 if self.hashes_pending_fetch.remove(hash) {
540 return true
541 }
542
543 return false
544 }
545
546 if is_tx_bad_import(hash) {
549 return false
550 }
551
552 previously_unseen_hashes_count += 1;
553
554 if self.hashes_fetch_inflight_and_pending_fetch.get_or_insert(*hash, ||
555 TxFetchMetadata{retries: 0, fallback_peers: LruCache::new(DEFAULT_MAX_COUNT_FALLBACK_PEERS as u32), tx_encoded_length: None}
556 ).is_none() {
557
558 trace!(target: "net::tx",
559 peer_id=format!("{peer_id:#}"),
560 %hash,
561 ?msg_version,
562 %client_version,
563 "failed to cache new announced hash from peer in schnellru::LruMap, dropping hash"
564 );
565
566 return false
567 }
568 true
569 });
570
571 trace!(target: "net::tx",
572 peer_id=format!("{peer_id:#}"),
573 previously_unseen_hashes_count=previously_unseen_hashes_count,
574 msg_version=?msg_version,
575 client_version=%client_version,
576 "received previously unseen hashes in announcement from peer"
577 );
578 }
579
580 pub fn request_transactions_from_peer(
588 &mut self,
589 new_announced_hashes: RequestTxHashes,
590 peer: &PeerMetadata<N>,
591 ) -> Option<RequestTxHashes> {
592 let peer_id: PeerId = peer.request_tx.peer_id;
593 let conn_eth_version = peer.version;
594
595 if self.active_peers.len() >= self.info.max_inflight_requests {
596 trace!(target: "net::tx",
597 peer_id=format!("{peer_id:#}"),
598 hashes=?*new_announced_hashes,
599 %conn_eth_version,
600 max_inflight_transaction_requests=self.info.max_inflight_requests,
601 "limit for concurrent `GetPooledTransactions` requests reached, dropping request for hashes to peer"
602 );
603 return Some(new_announced_hashes)
604 }
605
606 let Some(inflight_count) = self.active_peers.get_or_insert(peer_id, || 0) else {
607 trace!(target: "net::tx",
608 peer_id=format!("{peer_id:#}"),
609 hashes=?*new_announced_hashes,
610 conn_eth_version=%conn_eth_version,
611 "failed to cache active peer in schnellru::LruMap, dropping request to peer"
612 );
613 return Some(new_announced_hashes)
614 };
615
616 if *inflight_count >= self.info.max_inflight_requests_per_peer {
617 trace!(target: "net::tx",
618 peer_id=format!("{peer_id:#}"),
619 hashes=?*new_announced_hashes,
620 %conn_eth_version,
621 max_concurrent_tx_reqs_per_peer=self.info.max_inflight_requests_per_peer,
622 "limit for concurrent `GetPooledTransactions` requests per peer reached"
623 );
624 return Some(new_announced_hashes)
625 }
626
627 #[cfg(debug_assertions)]
628 {
629 for hash in &new_announced_hashes {
630 if self.hashes_pending_fetch.contains(hash) {
631 tracing::debug!(target: "net::tx", "`{}` should have been taken out of buffer before packing in a request, breaks invariant `@hashes_pending_fetch` and `@inflight_requests`, `@hashes_fetch_inflight_and_pending_fetch` for `{}`: {:?}",
632 format!("{:?}", new_announced_hashes), format!("{:?}", new_announced_hashes),
634 new_announced_hashes.iter().map(|hash| {
635 let metadata = self.hashes_fetch_inflight_and_pending_fetch.get(hash);
636 (*hash, metadata.map(|m| (m.retries, m.tx_encoded_length)))
638 }).collect::<Vec<(TxHash, Option<(u8, Option<usize>)>)>>())
639 }
640 }
641 }
642
643 let (response, rx) = oneshot::channel();
644 let req = PeerRequest::GetPooledTransactions {
645 request: GetPooledTransactions(new_announced_hashes.iter().copied().collect()),
646 response,
647 };
648
649 if let Err(err) = peer.request_tx.try_send(req) {
651 return match err {
653 TrySendError::Full(_) | TrySendError::Closed(_) => {
654 self.metrics.egress_peer_channel_full.increment(1);
655 Some(new_announced_hashes)
656 }
657 }
658 }
659
660 *inflight_count += 1;
661 self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, new_announced_hashes, rx));
663
664 None
665 }
666
667 pub fn fill_request_from_hashes_pending_fetch(
687 &mut self,
688 hashes_to_request: &mut RequestTxHashes,
689 seen_hashes: &LruCache<TxHash>,
690 mut budget_fill_request: Option<usize>, ) {
692 let Some(hash) = hashes_to_request.iter().next() else { return };
693
694 let mut acc_size_response = self
695 .hashes_fetch_inflight_and_pending_fetch
696 .get(hash)
697 .and_then(|entry| entry.tx_encoded_len())
698 .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
699
700 if acc_size_response >=
702 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES
703 {
704 return
705 }
706
707 for hash in self.hashes_pending_fetch.iter() {
710 if !seen_hashes.contains(hash) {
712 continue
713 };
714
715 hashes_to_request.insert(*hash);
717
718 let size = self
720 .hashes_fetch_inflight_and_pending_fetch
721 .get(hash)
722 .and_then(|entry| entry.tx_encoded_len())
723 .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
724
725 acc_size_response = acc_size_response.saturating_add(size);
726
727 if acc_size_response >=
731 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES ||
732 hashes_to_request.len() >
733 DEFAULT_SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST_ON_FETCH_PENDING_HASHES
734 {
735 break
736 }
737
738 if let Some(ref mut bud) = budget_fill_request {
739 *bud -= 1;
740 if *bud == 0 {
741 break
742 }
743 }
744 }
745
746 for hash in hashes_to_request.iter() {
748 self.hashes_pending_fetch.remove(hash);
749 }
750 }
751
752 pub fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
755 let info = &self.info;
756
757 self.has_capacity(info.max_inflight_requests)
758 }
759
760 fn has_capacity(&self, max_inflight_requests: usize) -> bool {
762 self.inflight_requests.len() <= max_inflight_requests
763 }
764
765 pub fn search_breadth_budget_find_idle_fallback_peer(
771 &self,
772 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
773 ) -> Option<usize> {
774 let info = &self.info;
775
776 let tx_fetcher_has_capacity = self.has_capacity(
777 info.max_inflight_requests /
778 DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_IDLE_PEER,
779 );
780 let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
781 DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_IDLE_PEER,
782 );
783
784 if tx_fetcher_has_capacity && tx_pool_has_capacity {
785 None
787 } else {
788 let limit = DEFAULT_BUDGET_FIND_IDLE_FALLBACK_PEER;
790
791 trace!(target: "net::tx",
792 inflight_requests=self.inflight_requests.len(),
793 max_inflight_transaction_requests=info.max_inflight_requests,
794 hashes_pending_fetch=self.hashes_pending_fetch.len(),
795 limit,
796 "search breadth limited in search for idle fallback peer for some hash pending fetch"
797 );
798
799 Some(limit)
800 }
801 }
802
803 pub fn search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
810 &self,
811 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
812 ) -> Option<usize> {
813 let info = &self.info;
814
815 let tx_fetcher_has_capacity = self.has_capacity(
816 info.max_inflight_requests /
817 DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_INTERSECTION,
818 );
819 let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
820 DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_INTERSECTION,
821 );
822
823 if tx_fetcher_has_capacity && tx_pool_has_capacity {
824 None
826 } else {
827 let limit = DEFAULT_BUDGET_FIND_INTERSECTION_ANNOUNCED_BY_PEER_AND_PENDING_FETCH;
829
830 trace!(target: "net::tx",
831 inflight_requests=self.inflight_requests.len(),
832 max_inflight_transaction_requests=self.info.max_inflight_requests,
833 hashes_pending_fetch=self.hashes_pending_fetch.len(),
834 limit=limit,
835 "search breadth limited in search for intersection of hashes announced by peer and hashes pending fetch"
836 );
837
838 Some(limit)
839 }
840 }
841
842 pub fn on_resolved_get_pooled_transactions_request_fut(
846 &mut self,
847 response: GetPooledTxResponse<N::PooledTransaction>,
848 ) -> FetchEvent<N::PooledTransaction> {
849 let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response;
852
853 self.decrement_inflight_request_count_for(&peer_id);
854
855 match result {
856 Ok(Ok(transactions)) => {
857 if transactions.is_empty() {
862 trace!(target: "net::tx",
863 peer_id=format!("{peer_id:#}"),
864 requested_hashes_len=requested_hashes.len(),
865 "received empty `PooledTransactions` response from peer, peer failed to serve hashes it announced"
866 );
867
868 return FetchEvent::EmptyResponse { peer_id }
869 }
870
871 let payload = UnverifiedPooledTransactions::new(transactions);
875
876 let unverified_len = payload.len();
877 let (verification_outcome, verified_payload) =
878 payload.verify(&requested_hashes, &peer_id);
879
880 let unsolicited = unverified_len - verified_payload.len();
881 if unsolicited > 0 {
882 self.metrics.unsolicited_transactions.increment(unsolicited as u64);
883 }
884
885 let report_peer = if verification_outcome == VerificationOutcome::ReportPeer {
886 trace!(target: "net::tx",
887 peer_id=format!("{peer_id:#}"),
888 unverified_len,
889 verified_payload_len=verified_payload.len(),
890 "received `PooledTransactions` response from peer with entries that didn't verify against request, filtered out transactions"
891 );
892 true
893 } else {
894 false
895 };
896
897 if verified_payload.is_empty() {
899 return FetchEvent::FetchError { peer_id, error: RequestError::BadResponse }
900 }
901
902 let unvalidated_payload_len = verified_payload.len();
906
907 let valid_payload = verified_payload.dedup();
908
909 if valid_payload.len() != unvalidated_payload_len {
915 trace!(target: "net::tx",
916 peer_id=format!("{peer_id:#}"),
917 unvalidated_payload_len,
918 valid_payload_len=valid_payload.len(),
919 "received `PooledTransactions` response from peer with duplicate entries, filtered them out"
920 );
921 }
922 let requested_hashes_len = requested_hashes.len();
930 let mut fetched = Vec::with_capacity(valid_payload.len());
931 requested_hashes.retain(|requested_hash| {
932 if valid_payload.contains_key(requested_hash) {
933 fetched.push(*requested_hash);
935 return false
936 }
937 true
938 });
939 fetched.shrink_to_fit();
940 self.metrics.fetched_transactions.increment(fetched.len() as u64);
941
942 if fetched.len() < requested_hashes_len {
943 trace!(target: "net::tx",
944 peer_id=format!("{peer_id:#}"),
945 requested_hashes_len=requested_hashes_len,
946 fetched_len=fetched.len(),
947 "peer failed to serve hashes it announced"
948 );
949 }
950
951 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
955
956 let transactions = valid_payload.into_data().into_values().collect();
957
958 FetchEvent::TransactionsFetched { peer_id, transactions, report_peer }
959 }
960 Ok(Err(req_err)) => {
961 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
962 FetchEvent::FetchError { peer_id, error: req_err }
963 }
964 Err(_) => {
965 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
966 FetchEvent::FetchError { peer_id, error: RequestError::ChannelClosed }
968 }
969 }
970 }
971}
972
973impl<N: NetworkPrimitives> Stream for TransactionFetcher<N> {
974 type Item = FetchEvent<N::PooledTransaction>;
975
976 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
978 if self.inflight_requests.is_empty() {
981 return Poll::Pending
982 }
983
984 if let Some(resp) = ready!(self.inflight_requests.poll_next_unpin(cx)) {
985 return Poll::Ready(Some(self.on_resolved_get_pooled_transactions_request_fut(resp)))
986 }
987
988 Poll::Pending
989 }
990}
991
992impl<T: NetworkPrimitives> Default for TransactionFetcher<T> {
993 fn default() -> Self {
994 Self {
995 active_peers: LruMap::new(DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS),
996 inflight_requests: Default::default(),
997 hashes_pending_fetch: LruCache::new(DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH),
998 hashes_fetch_inflight_and_pending_fetch: LruMap::new(
999 DEFAULT_MAX_CAPACITY_CACHE_INFLIGHT_AND_PENDING_FETCH,
1000 ),
1001 info: TransactionFetcherInfo::default(),
1002 metrics: Default::default(),
1003 }
1004 }
1005}
1006
1007#[derive(Debug, Constructor)]
1009pub struct TxFetchMetadata {
1010 retries: u8,
1012 fallback_peers: LruCache<PeerId>,
1014 tx_encoded_length: Option<usize>,
1019}
1020
1021impl TxFetchMetadata {
1022 pub const fn fallback_peers_mut(&mut self) -> &mut LruCache<PeerId> {
1024 &mut self.fallback_peers
1025 }
1026
1027 pub const fn tx_encoded_len(&self) -> Option<usize> {
1032 self.tx_encoded_length
1033 }
1034}
1035
1036#[derive(Debug)]
1038pub enum FetchEvent<T = PooledTransaction> {
1039 TransactionsFetched {
1041 peer_id: PeerId,
1043 transactions: PooledTransactions<T>,
1045 report_peer: bool,
1048 },
1049 FetchError {
1051 peer_id: PeerId,
1053 error: RequestError,
1055 },
1056 EmptyResponse {
1058 peer_id: PeerId,
1060 },
1061}
1062
1063#[derive(Debug)]
1065pub struct GetPooledTxRequest<T = PooledTransaction> {
1066 peer_id: PeerId,
1067 requested_hashes: RequestTxHashes,
1069 response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1070}
1071
1072#[derive(Debug)]
1075pub struct GetPooledTxResponse<T = PooledTransaction> {
1076 peer_id: PeerId,
1077 requested_hashes: RequestTxHashes,
1080 result: Result<RequestResult<PooledTransactions<T>>, RecvError>,
1081}
1082
1083#[must_use = "futures do nothing unless polled"]
1086#[pin_project::pin_project]
1087#[derive(Debug)]
1088pub struct GetPooledTxRequestFut<T = PooledTransaction> {
1089 #[pin]
1090 inner: Option<GetPooledTxRequest<T>>,
1091}
1092
1093impl<T> GetPooledTxRequestFut<T> {
1094 #[inline]
1095 const fn new(
1096 peer_id: PeerId,
1097 requested_hashes: RequestTxHashes,
1098 response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1099 ) -> Self {
1100 Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) }
1101 }
1102}
1103
1104impl<T> Future for GetPooledTxRequestFut<T> {
1105 type Output = GetPooledTxResponse<T>;
1106
1107 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1108 let mut req = self.as_mut().project().inner.take().expect("polled after completion");
1109 match req.response.poll_unpin(cx) {
1110 Poll::Ready(result) => Poll::Ready(GetPooledTxResponse {
1111 peer_id: req.peer_id,
1112 requested_hashes: req.requested_hashes,
1113 result,
1114 }),
1115 Poll::Pending => {
1116 self.project().inner.set(Some(req));
1117 Poll::Pending
1118 }
1119 }
1120 }
1121}
1122
1123#[derive(Debug, Constructor, Deref)]
1125pub struct UnverifiedPooledTransactions<T> {
1126 txns: PooledTransactions<T>,
1127}
1128
1129#[derive(Debug, Constructor, Deref)]
1131pub struct VerifiedPooledTransactions<T> {
1132 txns: PooledTransactions<T>,
1133}
1134
1135impl<T: SignedTransaction> DedupPayload for VerifiedPooledTransactions<T> {
1136 type Value = T;
1137
1138 fn is_empty(&self) -> bool {
1139 self.txns.is_empty()
1140 }
1141
1142 fn len(&self) -> usize {
1143 self.txns.len()
1144 }
1145
1146 fn dedup(self) -> PartiallyValidData<Self::Value> {
1147 PartiallyValidData::from_raw_data(
1148 self.txns.into_iter().map(|tx| (*tx.tx_hash(), tx)).collect(),
1149 None,
1150 )
1151 }
1152}
1153
1154trait VerifyPooledTransactionsResponse {
1155 type Transaction: SignedTransaction;
1156
1157 fn verify(
1158 self,
1159 requested_hashes: &RequestTxHashes,
1160 peer_id: &PeerId,
1161 ) -> (VerificationOutcome, VerifiedPooledTransactions<Self::Transaction>);
1162}
1163
1164impl<T: SignedTransaction> VerifyPooledTransactionsResponse for UnverifiedPooledTransactions<T> {
1165 type Transaction = T;
1166
1167 fn verify(
1168 self,
1169 requested_hashes: &RequestTxHashes,
1170 _peer_id: &PeerId,
1171 ) -> (VerificationOutcome, VerifiedPooledTransactions<T>) {
1172 let mut verification_outcome = VerificationOutcome::Ok;
1173
1174 let Self { mut txns } = self;
1175
1176 #[cfg(debug_assertions)]
1177 let mut tx_hashes_not_requested: smallvec::SmallVec<[TxHash; 16]> = smallvec::smallvec!();
1178 #[cfg(not(debug_assertions))]
1179 let mut tx_hashes_not_requested_count = 0;
1180
1181 txns.0.retain(|tx| {
1182 if !requested_hashes.contains(tx.tx_hash()) {
1183 verification_outcome = VerificationOutcome::ReportPeer;
1184
1185 #[cfg(debug_assertions)]
1186 tx_hashes_not_requested.push(*tx.tx_hash());
1187 #[cfg(not(debug_assertions))]
1188 {
1189 tx_hashes_not_requested_count += 1;
1190 }
1191
1192 return false
1193 }
1194 true
1195 });
1196
1197 #[cfg(debug_assertions)]
1198 if !tx_hashes_not_requested.is_empty() {
1199 trace!(target: "net::tx",
1200 peer_id=format!("{_peer_id:#}"),
1201 ?tx_hashes_not_requested,
1202 "transactions in `PooledTransactions` response from peer were not requested"
1203 );
1204 }
1205 #[cfg(not(debug_assertions))]
1206 if tx_hashes_not_requested_count != 0 {
1207 trace!(target: "net::tx",
1208 peer_id=format!("{_peer_id:#}"),
1209 tx_hashes_not_requested_count,
1210 "transactions in `PooledTransactions` response from peer were not requested"
1211 );
1212 }
1213
1214 (verification_outcome, VerifiedPooledTransactions::new(txns))
1215 }
1216}
1217
1218#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1221pub enum VerificationOutcome {
1222 Ok,
1224 ReportPeer,
1227}
1228
1229#[derive(Debug, Constructor)]
1231pub struct TransactionFetcherInfo {
1232 pub max_inflight_requests: usize,
1234 pub max_inflight_requests_per_peer: u8,
1236 pub soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
1240 pub soft_limit_byte_size_pooled_transactions_response: usize,
1243 pub max_capacity_cache_txns_pending_fetch: u32,
1247}
1248
1249impl Default for TransactionFetcherInfo {
1250 fn default() -> Self {
1251 Self::new(
1252 DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS as usize,
1253 DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
1254 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
1255 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
1256 DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
1257 )
1258 }
1259}
1260
1261impl From<TransactionFetcherConfig> for TransactionFetcherInfo {
1262 fn from(config: TransactionFetcherConfig) -> Self {
1263 let TransactionFetcherConfig {
1264 max_inflight_requests,
1265 max_inflight_requests_per_peer,
1266 soft_limit_byte_size_pooled_transactions_response,
1267 soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1268 max_capacity_cache_txns_pending_fetch,
1269 } = config;
1270
1271 Self::new(
1272 max_inflight_requests as usize,
1273 max_inflight_requests_per_peer,
1274 soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1275 soft_limit_byte_size_pooled_transactions_response,
1276 max_capacity_cache_txns_pending_fetch,
1277 )
1278 }
1279}
1280
1281#[derive(Debug, Default)]
1282struct TxFetcherSearchDurations {
1283 find_idle_peer: Duration,
1284 fill_request: Duration,
1285}
1286
1287#[cfg(test)]
1288mod test {
1289 use super::*;
1290 use crate::test_utils::transactions::{buffer_hash_to_tx_fetcher, new_mock_session};
1291 use alloy_primitives::{hex, B256};
1292 use alloy_rlp::Decodable;
1293 use derive_more::IntoIterator;
1294 use reth_eth_wire_types::EthVersion;
1295 use reth_ethereum_primitives::TransactionSigned;
1296 use std::{collections::HashSet, str::FromStr};
1297
1298 #[derive(IntoIterator)]
1299 struct TestValidAnnouncementData(Vec<(TxHash, Option<(u8, usize)>)>);
1300
1301 impl HandleMempoolData for TestValidAnnouncementData {
1302 fn is_empty(&self) -> bool {
1303 self.0.is_empty()
1304 }
1305
1306 fn len(&self) -> usize {
1307 self.0.len()
1308 }
1309
1310 fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
1311 self.0.retain(|(hash, _)| f(hash))
1312 }
1313 }
1314
1315 impl HandleVersionedMempoolData for TestValidAnnouncementData {
1316 fn msg_version(&self) -> EthVersion {
1317 EthVersion::Eth68
1318 }
1319 }
1320
1321 #[test]
1322 fn pack_eth68_request() {
1323 reth_tracing::init_test_tracing();
1324
1325 let tx_fetcher = &mut TransactionFetcher::<EthNetworkPrimitives>::default();
1328
1329 let eth68_hashes = [
1330 B256::from_slice(&[1; 32]),
1331 B256::from_slice(&[2; 32]),
1332 B256::from_slice(&[3; 32]),
1333 B256::from_slice(&[4; 32]),
1334 B256::from_slice(&[5; 32]),
1335 ];
1336 let eth68_sizes = [
1337 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ - MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED - 1, DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ, 2, 9,
1341 0,
1342 ];
1343
1344 let expected_request_hashes =
1345 [eth68_hashes[0], eth68_hashes[2]].into_iter().collect::<HashSet<_>>();
1346
1347 let expected_surplus_hashes =
1348 [eth68_hashes[1], eth68_hashes[3], eth68_hashes[4]].into_iter().collect::<HashSet<_>>();
1349
1350 let mut eth68_hashes_to_request = RequestTxHashes::with_capacity(3);
1351
1352 let valid_announcement_data = TestValidAnnouncementData(
1353 eth68_hashes
1354 .into_iter()
1355 .zip(eth68_sizes)
1356 .map(|(hash, size)| (hash, Some((0u8, size))))
1357 .collect::<Vec<_>>(),
1358 );
1359
1360 let surplus_eth68_hashes =
1363 tx_fetcher.pack_request_eth68(&mut eth68_hashes_to_request, valid_announcement_data);
1364
1365 let eth68_hashes_to_request = eth68_hashes_to_request.into_iter().collect::<HashSet<_>>();
1366 let surplus_eth68_hashes = surplus_eth68_hashes.into_iter().collect::<HashSet<_>>();
1367
1368 assert_eq!(expected_request_hashes, eth68_hashes_to_request);
1369 assert_eq!(expected_surplus_hashes, surplus_eth68_hashes);
1370 }
1371
1372 #[test]
1373 fn pack_eth68_request_does_not_overflow_announced_size() {
1374 reth_tracing::init_test_tracing();
1375
1376 let tx_fetcher = &mut TransactionFetcher::<EthNetworkPrimitives>::default();
1377
1378 let eth68_hashes =
1379 [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32]), B256::from_slice(&[3; 32])];
1380 let eth68_sizes = [
1381 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ - MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED - 1,
1382 usize::MAX,
1383 2,
1384 ];
1385
1386 let expected_request_hashes =
1387 [eth68_hashes[0], eth68_hashes[2]].into_iter().collect::<HashSet<_>>();
1388 let expected_surplus_hashes = std::iter::once(eth68_hashes[1]).collect::<HashSet<_>>();
1389
1390 let mut eth68_hashes_to_request = RequestTxHashes::with_capacity(3);
1391 let valid_announcement_data = TestValidAnnouncementData(
1392 eth68_hashes
1393 .into_iter()
1394 .zip(eth68_sizes)
1395 .map(|(hash, size)| (hash, Some((0u8, size))))
1396 .collect::<Vec<_>>(),
1397 );
1398
1399 let surplus_eth68_hashes =
1400 tx_fetcher.pack_request_eth68(&mut eth68_hashes_to_request, valid_announcement_data);
1401
1402 let eth68_hashes_to_request = eth68_hashes_to_request.into_iter().collect::<HashSet<_>>();
1403 let surplus_eth68_hashes = surplus_eth68_hashes.into_iter().collect::<HashSet<_>>();
1404
1405 assert_eq!(expected_request_hashes, eth68_hashes_to_request);
1406 assert_eq!(expected_surplus_hashes, surplus_eth68_hashes);
1407 }
1408
1409 #[tokio::test]
1410 async fn test_on_fetch_pending_hashes() {
1411 reth_tracing::init_test_tracing();
1412
1413 let tx_fetcher = &mut TransactionFetcher::default();
1414
1415 let seen_hashes = [
1419 B256::from_slice(&[1; 32]),
1420 B256::from_slice(&[2; 32]),
1421 B256::from_slice(&[3; 32]),
1422 B256::from_slice(&[4; 32]),
1423 ];
1424 let seen_eth68_hashes_sizes = [120, 158, 116];
1429
1430 let peer_1 = PeerId::new([1; 64]);
1432 let peer_2 = PeerId::new([2; 64]);
1434
1435 let (mut peer_1_data, mut peer_1_mock_session_rx) =
1439 new_mock_session(peer_1, EthVersion::Eth66);
1440 for hash in &seen_hashes {
1441 peer_1_data.seen_transactions.insert(*hash);
1442 }
1443 let (mut peer_2_data, _) = new_mock_session(peer_2, EthVersion::Eth66);
1444 for hash in &seen_hashes {
1445 peer_2_data.seen_transactions.insert(*hash);
1446 }
1447 let mut peers = HashMap::default();
1448 peers.insert(peer_1, peer_1_data);
1449 peers.insert(peer_2, peer_2_data);
1450
1451 for i in 0..3 {
1453 buffer_hash_to_tx_fetcher(
1455 tx_fetcher,
1456 seen_hashes[i],
1457 peer_2,
1458 0,
1459 Some(seen_eth68_hashes_sizes[i]),
1460 );
1461 }
1462 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[3], peer_2, 0, None);
1463
1464 let hash_other = B256::from_slice(&[5; 32]);
1466 buffer_hash_to_tx_fetcher(tx_fetcher, hash_other, peer_2, 0, None);
1467
1468 for hash in &seen_hashes {
1470 buffer_hash_to_tx_fetcher(tx_fetcher, *hash, peer_1, 0, None);
1471 }
1472
1473 assert_eq!(tx_fetcher.num_pending_hashes(), 5);
1475
1476 tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
1479
1480 let req = peer_1_mock_session_rx
1482 .recv()
1483 .await
1484 .expect("peer session should receive request with buffered hashes");
1485 let PeerRequest::GetPooledTransactions { request, .. } = req else { unreachable!() };
1486 let GetPooledTransactions(requested_hashes) = request;
1487
1488 assert_eq!(
1489 requested_hashes.into_iter().collect::<HashSet<_>>(),
1490 seen_hashes.into_iter().collect::<HashSet<_>>()
1491 )
1492 }
1493
1494 #[test]
1495 fn on_fetch_pending_hashes_rebuffers_on_disconnected_peer() {
1496 let tx_fetcher = &mut TransactionFetcher::default();
1497 let peer_1 = PeerId::new([1; 64]);
1498 let peer_2 = PeerId::new([2; 64]);
1499 let hash_1 = B256::from_slice(&[1; 32]);
1500
1501 buffer_hash_to_tx_fetcher(tx_fetcher, hash_1, peer_1, 0, Some(128));
1502 buffer_hash_to_tx_fetcher(tx_fetcher, hash_1, peer_2, 0, Some(128));
1503
1504 assert_eq!(tx_fetcher.num_pending_hashes(), 1);
1505
1506 let peers = HashMap::new();
1508 tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
1509
1510 assert_eq!(tx_fetcher.num_pending_hashes(), 1);
1512 }
1513
1514 #[test]
1515 fn verify_response_hashes() {
1516 let input = hex!(
1517 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598daa"
1518 );
1519 let signed_tx_1: PooledTransaction =
1520 TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1521 let input = hex!(
1522 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
1523 );
1524 let signed_tx_2: PooledTransaction =
1525 TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1526
1527 let request_hashes = [
1529 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e67890")
1530 .unwrap(),
1531 *signed_tx_1.hash(),
1532 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e12345")
1533 .unwrap(),
1534 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4edabe3")
1535 .unwrap(),
1536 ];
1537
1538 for hash in &request_hashes {
1539 assert_ne!(hash, signed_tx_2.hash())
1540 }
1541
1542 let request_hashes = RequestTxHashes::new(request_hashes.into_iter().collect());
1543
1544 let response_txns = PooledTransactions(vec![signed_tx_1.clone(), signed_tx_2]);
1546 let payload = UnverifiedPooledTransactions::new(response_txns);
1547
1548 let (outcome, verified_payload) = payload.verify(&request_hashes, &PeerId::ZERO);
1549
1550 assert_eq!(VerificationOutcome::ReportPeer, outcome);
1551 assert_eq!(1, verified_payload.len());
1552 assert!(verified_payload.contains(&signed_tx_1));
1553 }
1554}