1use super::{
29 config::TransactionFetcherConfig,
30 constants::{tx_fetcher::*, SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST},
31 PeerMetadata, PooledTransactions, SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
32};
33use crate::{
34 cache::{LruCache, LruMap},
35 duration_metered_exec,
36 metrics::TransactionFetcherMetrics,
37};
38use alloy_consensus::transaction::PooledTransaction;
39use alloy_primitives::{map::FbBuildHasher, TxHash};
40use derive_more::{Constructor, Deref};
41use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
42use pin_project::pin_project;
43use reth_eth_wire::{
44 DedupPayload, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
45 PartiallyValidData, RequestTxHashes, ValidAnnouncementData,
46};
47use reth_eth_wire_types::{EthNetworkPrimitives, NetworkPrimitives};
48use reth_network_api::PeerRequest;
49use reth_network_p2p::error::{RequestError, RequestResult};
50use reth_network_peers::PeerId;
51use reth_primitives_traits::SignedTransaction;
52use schnellru::ByLength;
53use std::{
54 collections::HashMap,
55 pin::Pin,
56 task::{ready, Context, Poll},
57 time::Duration,
58};
59use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
60use tracing::trace;
61
62#[derive(Debug)]
67#[pin_project]
68pub struct TransactionFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
69 pub active_peers: LruMap<PeerId, u8, ByLength, FbBuildHasher<64>>,
71 #[pin]
77 pub inflight_requests: FuturesUnordered<GetPooledTxRequestFut<N::PooledTransaction>>,
78 pub hashes_pending_fetch: LruCache<TxHash, FbBuildHasher<32>>,
83 pub hashes_fetch_inflight_and_pending_fetch:
85 LruMap<TxHash, TxFetchMetadata, ByLength, FbBuildHasher<32>>,
86 pub info: TransactionFetcherInfo,
88 #[doc(hidden)]
89 metrics: TransactionFetcherMetrics,
90}
91
92impl<N: NetworkPrimitives> TransactionFetcher<N> {
93 pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) {
95 self.active_peers.remove(peer_id);
96 }
97
98 #[inline]
100 pub fn update_metrics(&self) {
101 let metrics = &self.metrics;
102
103 metrics.inflight_transaction_requests.set(self.inflight_requests.len() as f64);
104
105 let hashes_pending_fetch = self.hashes_pending_fetch.len() as f64;
106 let total_hashes = self.hashes_fetch_inflight_and_pending_fetch.len() as f64;
107
108 metrics.hashes_pending_fetch.set(hashes_pending_fetch);
109 metrics.hashes_inflight_transaction_requests.set(total_hashes - hashes_pending_fetch);
110 }
111
112 #[inline]
113 fn update_pending_fetch_cache_search_metrics(&self, durations: TxFetcherSearchDurations) {
114 let metrics = &self.metrics;
115
116 let TxFetcherSearchDurations { find_idle_peer, fill_request } = durations;
117 metrics
118 .duration_find_idle_fallback_peer_for_any_pending_hash
119 .set(find_idle_peer.as_secs_f64());
120 metrics.duration_fill_request_from_hashes_pending_fetch.set(fill_request.as_secs_f64());
121 }
122
123 pub fn with_transaction_fetcher_config(config: &TransactionFetcherConfig) -> Self {
125 let TransactionFetcherConfig {
126 max_inflight_requests,
127 max_capacity_cache_txns_pending_fetch,
128 ..
129 } = *config;
130
131 let info = config.clone().into();
132
133 let metrics = TransactionFetcherMetrics::default();
134 metrics.capacity_inflight_requests.increment(max_inflight_requests as u64);
135
136 Self {
137 active_peers: LruMap::with_hasher(max_inflight_requests, Default::default()),
138 hashes_pending_fetch: LruCache::with_hasher(
139 max_capacity_cache_txns_pending_fetch,
140 Default::default(),
141 ),
142 hashes_fetch_inflight_and_pending_fetch: LruMap::with_hasher(
143 max_inflight_requests + max_capacity_cache_txns_pending_fetch,
144 Default::default(),
145 ),
146 info,
147 metrics,
148 ..Default::default()
149 }
150 }
151
152 #[inline]
154 pub fn remove_hashes_from_transaction_fetcher<'a, I>(&mut self, hashes: I)
155 where
156 I: IntoIterator<Item = &'a TxHash>,
157 {
158 for hash in hashes {
159 self.hashes_fetch_inflight_and_pending_fetch.remove(hash);
160 self.hashes_pending_fetch.remove(hash);
161 }
162 }
163
164 fn decrement_inflight_request_count_for(&mut self, peer_id: &PeerId) {
166 let remove = || -> bool {
167 if let Some(inflight_count) = self.active_peers.get(peer_id) {
168 *inflight_count = inflight_count.saturating_sub(1);
169 if *inflight_count == 0 {
170 return true
171 }
172 }
173 false
174 }();
175
176 if remove {
177 self.active_peers.remove(peer_id);
178 }
179 }
180
181 #[inline]
183 pub fn is_idle(&self, peer_id: &PeerId) -> bool {
184 let Some(inflight_count) = self.active_peers.peek(peer_id) else { return true };
185 if *inflight_count < self.info.max_inflight_requests_per_peer {
186 return true
187 }
188 false
189 }
190
191 pub fn get_idle_peer_for(&self, hash: TxHash) -> Option<&PeerId> {
193 let TxFetchMetadata { fallback_peers, .. } =
194 self.hashes_fetch_inflight_and_pending_fetch.peek(&hash)?;
195
196 fallback_peers.iter().find(|peer_id| self.is_idle(peer_id))
197 }
198
199 pub fn find_any_idle_fallback_peer_for_any_pending_hash(
205 &mut self,
206 hashes_to_request: &mut RequestTxHashes,
207 mut budget: Option<usize>, ) -> Option<PeerId> {
209 let mut hashes_pending_fetch_iter = self.hashes_pending_fetch.iter();
210
211 let idle_peer = loop {
212 let &hash = hashes_pending_fetch_iter.next()?;
213
214 let idle_peer = self.get_idle_peer_for(hash);
215
216 if idle_peer.is_some() {
217 hashes_to_request.insert(hash);
218 break idle_peer.copied()
219 }
220
221 if let Some(ref mut bud) = budget {
222 *bud = bud.saturating_sub(1);
223 if *bud == 0 {
224 return None
225 }
226 }
227 };
228 let hash = hashes_to_request.iter().next()?;
229
230 drop(hashes_pending_fetch_iter);
232 _ = self.hashes_pending_fetch.remove(hash);
233
234 idle_peer
235 }
236
237 pub fn pack_request(
242 &self,
243 hashes_to_request: &mut RequestTxHashes,
244 hashes_from_announcement: ValidAnnouncementData,
245 ) -> RequestTxHashes {
246 if hashes_from_announcement.msg_version().has_eth68_metadata() {
247 return self.pack_request_eth68(hashes_to_request, hashes_from_announcement)
248 }
249 self.pack_request_eth66(hashes_to_request, hashes_from_announcement)
250 }
251
252 pub fn pack_request_eth68(
263 &self,
264 hashes_to_request: &mut RequestTxHashes,
265 hashes_from_announcement: impl HandleMempoolData
266 + IntoIterator<Item = (TxHash, Option<(u8, usize)>)>,
267 ) -> RequestTxHashes {
268 let mut acc_size_response = 0;
269
270 let mut hashes_from_announcement_iter = hashes_from_announcement.into_iter();
271
272 if let Some((hash, Some((_ty, size)))) = hashes_from_announcement_iter.next() {
273 hashes_to_request.insert(hash);
274
275 if size >= self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request {
277 return hashes_from_announcement_iter.collect()
278 }
279 acc_size_response = size;
280 }
281
282 let mut surplus_hashes = RequestTxHashes::default();
283
284 for (hash, metadata) in hashes_from_announcement_iter.by_ref() {
287 let Some((_ty, size)) = metadata else {
288 unreachable!("this method is called upon reception of an eth68 announcement")
289 };
290
291 let next_acc_size = acc_size_response.checked_add(size).filter(|next_acc_size| {
292 *next_acc_size <=
293 self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request
294 });
295
296 if let Some(next_acc_size) = next_acc_size {
297 acc_size_response = next_acc_size;
300 _ = hashes_to_request.insert(hash)
301 } else {
302 _ = surplus_hashes.insert(hash)
303 }
304
305 let free_space =
306 self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request -
307 acc_size_response;
308
309 if free_space < MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED {
310 break
311 }
312 }
313
314 surplus_hashes.extend(hashes_from_announcement_iter.map(|(hash, _metadata)| hash));
315
316 surplus_hashes
317 }
318
319 pub fn pack_request_eth66(
326 &self,
327 hashes_to_request: &mut RequestTxHashes,
328 hashes_from_announcement: ValidAnnouncementData,
329 ) -> RequestTxHashes {
330 let (mut hashes, _version) = hashes_from_announcement.into_request_hashes();
331 if hashes.len() <= SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST {
332 *hashes_to_request = hashes;
333 hashes_to_request.shrink_to_fit();
334
335 RequestTxHashes::default()
336 } else {
337 let surplus_hashes =
338 hashes.retain_count(SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST);
339 *hashes_to_request = hashes;
340 hashes_to_request.shrink_to_fit();
341
342 surplus_hashes
343 }
344 }
345
346 pub fn try_buffer_hashes_for_retry(
348 &mut self,
349 mut hashes: RequestTxHashes,
350 peer_failed_to_serve: &PeerId,
351 ) {
352 hashes.retain(|hash| {
355 if let Some(entry) = self.hashes_fetch_inflight_and_pending_fetch.get(hash) {
356 entry.fallback_peers_mut().remove(peer_failed_to_serve);
357 return true
358 }
359 false
361 });
362
363 self.buffer_hashes(hashes, None)
364 }
365
366 pub fn num_pending_hashes(&self) -> usize {
368 self.hashes_pending_fetch.len()
369 }
370
371 pub fn num_all_hashes(&self) -> usize {
373 self.hashes_fetch_inflight_and_pending_fetch.len()
374 }
375
376 pub fn buffer_hashes(&mut self, hashes: RequestTxHashes, fallback_peer: Option<PeerId>) {
381 for hash in hashes {
382 let Some(TxFetchMetadata { retries, fallback_peers, .. }) =
384 self.hashes_fetch_inflight_and_pending_fetch.get(&hash)
385 else {
386 continue
387 };
388
389 if let Some(peer_id) = fallback_peer {
390 fallback_peers.insert(peer_id);
392 } else {
393 if *retries >= DEFAULT_MAX_RETRIES {
394 trace!(target: "net::tx",
395 %hash,
396 retries,
397 "retry limit for `GetPooledTransactions` requests reached for hash, dropping hash"
398 );
399
400 self.hashes_fetch_inflight_and_pending_fetch.remove(&hash);
401 self.hashes_pending_fetch.remove(&hash);
402 continue
403 }
404 *retries += 1;
405 }
406
407 if let (_, Some(evicted_hash)) = self.hashes_pending_fetch.insert_and_get_evicted(hash)
408 {
409 self.hashes_fetch_inflight_and_pending_fetch.remove(&evicted_hash);
410 }
411 }
412 }
413
414 pub fn on_fetch_pending_hashes(
419 &mut self,
420 peers: &HashMap<PeerId, PeerMetadata<N>>,
421 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
422 ) -> bool {
423 let mut hashes_to_request = RequestTxHashes::with_capacity(
424 DEFAULT_MARGINAL_COUNT_HASHES_GET_POOLED_TRANSACTIONS_REQUEST,
425 );
426 let mut search_durations = TxFetcherSearchDurations::default();
427
428 let budget_find_idle_fallback_peer = self
430 .search_breadth_budget_find_idle_fallback_peer(&has_capacity_wrt_pending_pool_imports);
431
432 let peer_id = duration_metered_exec!(
433 {
434 let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash(
435 &mut hashes_to_request,
436 budget_find_idle_fallback_peer,
437 ) else {
438 return false
440 };
441
442 peer_id
443 },
444 search_durations.find_idle_peer
445 );
446
447 let Some(peer) = peers.get(&peer_id) else {
450 self.buffer_hashes(hashes_to_request, None);
451 return false
452 };
453 let conn_eth_version = peer.version;
454
455 let budget_fill_request = self
460 .search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
461 &has_capacity_wrt_pending_pool_imports,
462 );
463
464 duration_metered_exec!(
465 {
466 self.fill_request_from_hashes_pending_fetch(
467 &mut hashes_to_request,
468 &peer.seen_transactions,
469 budget_fill_request,
470 )
471 },
472 search_durations.fill_request
473 );
474
475 self.update_pending_fetch_cache_search_metrics(search_durations);
476
477 trace!(target: "net::tx",
478 peer_id=format!("{peer_id:#}"),
479 hashes=?*hashes_to_request,
480 %conn_eth_version,
481 "requesting hashes that were stored pending fetch from peer"
482 );
483
484 if let Some(failed_to_request_hashes) =
486 self.request_transactions_from_peer(hashes_to_request, peer)
487 {
488 trace!(target: "net::tx",
489 peer_id=format!("{peer_id:#}"),
490 ?failed_to_request_hashes,
491 %conn_eth_version,
492 "failed sending request to peer's session, buffering hashes"
493 );
494
495 self.buffer_hashes(failed_to_request_hashes, Some(peer_id));
496 return false
497 }
498
499 true
500 }
501
502 pub fn filter_unseen_and_pending_hashes(
505 &mut self,
506 new_announced_hashes: &mut ValidAnnouncementData,
507 is_tx_bad_import: impl Fn(&TxHash) -> bool,
508 peer_id: &PeerId,
509 client_version: &str,
510 ) {
511 let mut previously_unseen_hashes_count = 0;
512
513 let msg_version = new_announced_hashes.msg_version();
514
515 new_announced_hashes.retain(|hash, metadata| {
517
518 if let Some(TxFetchMetadata{ tx_encoded_length: previously_seen_size, ..}) = self.hashes_fetch_inflight_and_pending_fetch.peek_mut(hash) {
520 if let Some((_ty, size)) = metadata {
522 if let Some(prev_size) = previously_seen_size {
523 if size != prev_size {
525 trace!(target: "net::tx",
526 peer_id=format!("{peer_id:#}"),
527 %hash,
528 size,
529 previously_seen_size,
530 %client_version,
531 "peer announced a different size for tx, this is especially worrying if one size is much bigger..."
532 );
533 }
534 }
535 *previously_seen_size = Some(*size);
537 }
538
539 if self.hashes_pending_fetch.remove(hash) {
541 return true
542 }
543
544 return false
545 }
546
547 if is_tx_bad_import(hash) {
550 return false
551 }
552
553 previously_unseen_hashes_count += 1;
554
555 if self
556 .hashes_fetch_inflight_and_pending_fetch
557 .get_or_insert(*hash, || TxFetchMetadata {
558 retries: 0,
559 fallback_peers: LruCache::with_hasher(
560 DEFAULT_MAX_COUNT_FALLBACK_PEERS as u32,
561 Default::default(),
562 ),
563 tx_encoded_length: None,
564 })
565 .is_none()
566 {
567
568 trace!(target: "net::tx",
569 peer_id=format!("{peer_id:#}"),
570 %hash,
571 ?msg_version,
572 %client_version,
573 "failed to cache new announced hash from peer in schnellru::LruMap, dropping hash"
574 );
575
576 return false
577 }
578 true
579 });
580
581 trace!(target: "net::tx",
582 peer_id=format!("{peer_id:#}"),
583 previously_unseen_hashes_count=previously_unseen_hashes_count,
584 msg_version=?msg_version,
585 client_version=%client_version,
586 "received previously unseen hashes in announcement from peer"
587 );
588 }
589
590 pub fn request_transactions_from_peer(
598 &mut self,
599 new_announced_hashes: RequestTxHashes,
600 peer: &PeerMetadata<N>,
601 ) -> Option<RequestTxHashes> {
602 let peer_id: PeerId = peer.request_tx.peer_id;
603 let conn_eth_version = peer.version;
604
605 if self.active_peers.len() >= self.info.max_inflight_requests {
606 trace!(target: "net::tx",
607 peer_id=format!("{peer_id:#}"),
608 hashes=?*new_announced_hashes,
609 %conn_eth_version,
610 max_inflight_transaction_requests=self.info.max_inflight_requests,
611 "limit for concurrent `GetPooledTransactions` requests reached, dropping request for hashes to peer"
612 );
613 return Some(new_announced_hashes)
614 }
615
616 let Some(inflight_count) = self.active_peers.get_or_insert(peer_id, || 0) else {
617 trace!(target: "net::tx",
618 peer_id=format!("{peer_id:#}"),
619 hashes=?*new_announced_hashes,
620 conn_eth_version=%conn_eth_version,
621 "failed to cache active peer in schnellru::LruMap, dropping request to peer"
622 );
623 return Some(new_announced_hashes)
624 };
625
626 if *inflight_count >= self.info.max_inflight_requests_per_peer {
627 trace!(target: "net::tx",
628 peer_id=format!("{peer_id:#}"),
629 hashes=?*new_announced_hashes,
630 %conn_eth_version,
631 max_concurrent_tx_reqs_per_peer=self.info.max_inflight_requests_per_peer,
632 "limit for concurrent `GetPooledTransactions` requests per peer reached"
633 );
634 return Some(new_announced_hashes)
635 }
636
637 #[cfg(debug_assertions)]
638 {
639 for hash in &new_announced_hashes {
640 if self.hashes_pending_fetch.contains(hash) {
641 tracing::debug!(target: "net::tx", "`{}` should have been taken out of buffer before packing in a request, breaks invariant `@hashes_pending_fetch` and `@inflight_requests`, `@hashes_fetch_inflight_and_pending_fetch` for `{}`: {:?}",
642 format!("{:?}", new_announced_hashes), format!("{:?}", new_announced_hashes),
644 new_announced_hashes.iter().map(|hash| {
645 let metadata = self.hashes_fetch_inflight_and_pending_fetch.get(hash);
646 (*hash, metadata.map(|m| (m.retries, m.tx_encoded_length)))
648 }).collect::<Vec<(TxHash, Option<(u8, Option<usize>)>)>>())
649 }
650 }
651 }
652
653 let (response, rx) = oneshot::channel();
654 let req = PeerRequest::GetPooledTransactions {
655 request: GetPooledTransactions(new_announced_hashes.iter().copied().collect()),
656 response,
657 };
658
659 if let Err(err) = peer.request_tx.try_send(req) {
661 return match err {
663 TrySendError::Full(_) | TrySendError::Closed(_) => {
664 self.metrics.egress_peer_channel_full.increment(1);
665 Some(new_announced_hashes)
666 }
667 }
668 }
669
670 *inflight_count += 1;
671 self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, new_announced_hashes, rx));
673
674 None
675 }
676
677 pub fn fill_request_from_hashes_pending_fetch(
697 &mut self,
698 hashes_to_request: &mut RequestTxHashes,
699 seen_hashes: &LruCache<TxHash, FbBuildHasher<32>>,
700 mut budget_fill_request: Option<usize>, ) {
702 let Some(hash) = hashes_to_request.iter().next() else { return };
703
704 let mut acc_size_response = self
705 .hashes_fetch_inflight_and_pending_fetch
706 .get(hash)
707 .and_then(|entry| entry.tx_encoded_len())
708 .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
709
710 if acc_size_response >=
712 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES
713 {
714 return
715 }
716
717 for hash in self.hashes_pending_fetch.iter() {
720 if !seen_hashes.contains(hash) {
722 continue
723 };
724
725 hashes_to_request.insert(*hash);
727
728 let size = self
730 .hashes_fetch_inflight_and_pending_fetch
731 .get(hash)
732 .and_then(|entry| entry.tx_encoded_len())
733 .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
734
735 acc_size_response = acc_size_response.saturating_add(size);
736
737 if acc_size_response >=
741 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES ||
742 hashes_to_request.len() >
743 DEFAULT_SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST_ON_FETCH_PENDING_HASHES
744 {
745 break
746 }
747
748 if let Some(ref mut bud) = budget_fill_request {
749 *bud -= 1;
750 if *bud == 0 {
751 break
752 }
753 }
754 }
755
756 for hash in hashes_to_request.iter() {
758 self.hashes_pending_fetch.remove(hash);
759 }
760 }
761
762 pub fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
765 let info = &self.info;
766
767 self.has_capacity(info.max_inflight_requests)
768 }
769
770 fn has_capacity(&self, max_inflight_requests: usize) -> bool {
772 self.inflight_requests.len() <= max_inflight_requests
773 }
774
775 pub fn search_breadth_budget_find_idle_fallback_peer(
781 &self,
782 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
783 ) -> Option<usize> {
784 let info = &self.info;
785
786 let tx_fetcher_has_capacity = self.has_capacity(
787 info.max_inflight_requests /
788 DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_IDLE_PEER,
789 );
790 let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
791 DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_IDLE_PEER,
792 );
793
794 if tx_fetcher_has_capacity && tx_pool_has_capacity {
795 None
797 } else {
798 let limit = DEFAULT_BUDGET_FIND_IDLE_FALLBACK_PEER;
800
801 trace!(target: "net::tx",
802 inflight_requests=self.inflight_requests.len(),
803 max_inflight_transaction_requests=info.max_inflight_requests,
804 hashes_pending_fetch=self.hashes_pending_fetch.len(),
805 limit,
806 "search breadth limited in search for idle fallback peer for some hash pending fetch"
807 );
808
809 Some(limit)
810 }
811 }
812
813 pub fn search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
820 &self,
821 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
822 ) -> Option<usize> {
823 let info = &self.info;
824
825 let tx_fetcher_has_capacity = self.has_capacity(
826 info.max_inflight_requests /
827 DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_INTERSECTION,
828 );
829 let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
830 DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_INTERSECTION,
831 );
832
833 if tx_fetcher_has_capacity && tx_pool_has_capacity {
834 None
836 } else {
837 let limit = DEFAULT_BUDGET_FIND_INTERSECTION_ANNOUNCED_BY_PEER_AND_PENDING_FETCH;
839
840 trace!(target: "net::tx",
841 inflight_requests=self.inflight_requests.len(),
842 max_inflight_transaction_requests=self.info.max_inflight_requests,
843 hashes_pending_fetch=self.hashes_pending_fetch.len(),
844 limit=limit,
845 "search breadth limited in search for intersection of hashes announced by peer and hashes pending fetch"
846 );
847
848 Some(limit)
849 }
850 }
851
852 pub fn on_resolved_get_pooled_transactions_request_fut(
856 &mut self,
857 response: GetPooledTxResponse<N::PooledTransaction>,
858 ) -> FetchEvent<N::PooledTransaction> {
859 let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response;
862
863 self.decrement_inflight_request_count_for(&peer_id);
864
865 match result {
866 Ok(Ok(transactions)) => {
867 if transactions.is_empty() {
872 trace!(target: "net::tx",
873 peer_id=format!("{peer_id:#}"),
874 requested_hashes_len=requested_hashes.len(),
875 "received empty `PooledTransactions` response from peer, peer failed to serve hashes it announced"
876 );
877
878 return FetchEvent::EmptyResponse { peer_id }
879 }
880
881 let payload = UnverifiedPooledTransactions::new(transactions);
885
886 let unverified_len = payload.len();
887 let (verification_outcome, verified_payload) =
888 payload.verify(&requested_hashes, &peer_id);
889
890 let unsolicited = unverified_len - verified_payload.len();
891 if unsolicited > 0 {
892 self.metrics.unsolicited_transactions.increment(unsolicited as u64);
893 }
894
895 let report_peer = if verification_outcome == VerificationOutcome::ReportPeer {
896 trace!(target: "net::tx",
897 peer_id=format!("{peer_id:#}"),
898 unverified_len,
899 verified_payload_len=verified_payload.len(),
900 "received `PooledTransactions` response from peer with entries that didn't verify against request, filtered out transactions"
901 );
902 true
903 } else {
904 false
905 };
906
907 if verified_payload.is_empty() {
909 return FetchEvent::FetchError { peer_id, error: RequestError::BadResponse }
910 }
911
912 let unvalidated_payload_len = verified_payload.len();
916
917 let valid_payload = verified_payload.dedup();
918
919 if valid_payload.len() != unvalidated_payload_len {
925 trace!(target: "net::tx",
926 peer_id=format!("{peer_id:#}"),
927 unvalidated_payload_len,
928 valid_payload_len=valid_payload.len(),
929 "received `PooledTransactions` response from peer with duplicate entries, filtered them out"
930 );
931 }
932 let requested_hashes_len = requested_hashes.len();
940 let mut fetched = Vec::with_capacity(valid_payload.len());
941 requested_hashes.retain(|requested_hash| {
942 if valid_payload.contains_key(requested_hash) {
943 fetched.push(*requested_hash);
945 return false
946 }
947 true
948 });
949 fetched.shrink_to_fit();
950 self.metrics.fetched_transactions.increment(fetched.len() as u64);
951
952 if fetched.len() < requested_hashes_len {
953 trace!(target: "net::tx",
954 peer_id=format!("{peer_id:#}"),
955 requested_hashes_len=requested_hashes_len,
956 fetched_len=fetched.len(),
957 "peer failed to serve hashes it announced"
958 );
959 }
960
961 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
965
966 let transactions = valid_payload.into_data().into_values().collect();
967
968 FetchEvent::TransactionsFetched { peer_id, transactions, report_peer }
969 }
970 Ok(Err(req_err)) => {
971 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
972 FetchEvent::FetchError { peer_id, error: req_err }
973 }
974 Err(_) => {
975 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
976 FetchEvent::FetchError { peer_id, error: RequestError::ChannelClosed }
978 }
979 }
980 }
981}
982
983impl<N: NetworkPrimitives> Stream for TransactionFetcher<N> {
984 type Item = FetchEvent<N::PooledTransaction>;
985
986 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
988 if self.inflight_requests.is_empty() {
991 return Poll::Pending
992 }
993
994 if let Some(resp) = ready!(self.inflight_requests.poll_next_unpin(cx)) {
995 return Poll::Ready(Some(self.on_resolved_get_pooled_transactions_request_fut(resp)))
996 }
997
998 Poll::Pending
999 }
1000}
1001
1002impl<T: NetworkPrimitives> Default for TransactionFetcher<T> {
1003 fn default() -> Self {
1004 Self {
1005 active_peers: LruMap::with_hasher(
1006 DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS,
1007 Default::default(),
1008 ),
1009 inflight_requests: Default::default(),
1010 hashes_pending_fetch: LruCache::with_hasher(
1011 DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
1012 Default::default(),
1013 ),
1014 hashes_fetch_inflight_and_pending_fetch: LruMap::with_hasher(
1015 DEFAULT_MAX_CAPACITY_CACHE_INFLIGHT_AND_PENDING_FETCH,
1016 Default::default(),
1017 ),
1018 info: TransactionFetcherInfo::default(),
1019 metrics: Default::default(),
1020 }
1021 }
1022}
1023
1024#[derive(Debug, Constructor)]
1026pub struct TxFetchMetadata {
1027 retries: u8,
1029 fallback_peers: LruCache<PeerId, FbBuildHasher<64>>,
1031 tx_encoded_length: Option<usize>,
1036}
1037
1038impl TxFetchMetadata {
1039 pub const fn fallback_peers_mut(&mut self) -> &mut LruCache<PeerId, FbBuildHasher<64>> {
1041 &mut self.fallback_peers
1042 }
1043
1044 pub const fn tx_encoded_len(&self) -> Option<usize> {
1049 self.tx_encoded_length
1050 }
1051}
1052
1053#[derive(Debug)]
1055pub enum FetchEvent<T = PooledTransaction> {
1056 TransactionsFetched {
1058 peer_id: PeerId,
1060 transactions: PooledTransactions<T>,
1062 report_peer: bool,
1065 },
1066 FetchError {
1068 peer_id: PeerId,
1070 error: RequestError,
1072 },
1073 EmptyResponse {
1075 peer_id: PeerId,
1077 },
1078}
1079
1080#[derive(Debug)]
1082pub struct GetPooledTxRequest<T = PooledTransaction> {
1083 peer_id: PeerId,
1084 requested_hashes: RequestTxHashes,
1086 response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1087}
1088
1089#[derive(Debug)]
1092pub struct GetPooledTxResponse<T = PooledTransaction> {
1093 peer_id: PeerId,
1094 requested_hashes: RequestTxHashes,
1097 result: Result<RequestResult<PooledTransactions<T>>, RecvError>,
1098}
1099
1100#[must_use = "futures do nothing unless polled"]
1103#[pin_project::pin_project]
1104#[derive(Debug)]
1105pub struct GetPooledTxRequestFut<T = PooledTransaction> {
1106 #[pin]
1107 inner: Option<GetPooledTxRequest<T>>,
1108}
1109
1110impl<T> GetPooledTxRequestFut<T> {
1111 #[inline]
1112 const fn new(
1113 peer_id: PeerId,
1114 requested_hashes: RequestTxHashes,
1115 response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1116 ) -> Self {
1117 Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) }
1118 }
1119}
1120
1121impl<T> Future for GetPooledTxRequestFut<T> {
1122 type Output = GetPooledTxResponse<T>;
1123
1124 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1125 let mut req = self.as_mut().project().inner.take().expect("polled after completion");
1126 match req.response.poll_unpin(cx) {
1127 Poll::Ready(result) => Poll::Ready(GetPooledTxResponse {
1128 peer_id: req.peer_id,
1129 requested_hashes: req.requested_hashes,
1130 result,
1131 }),
1132 Poll::Pending => {
1133 self.project().inner.set(Some(req));
1134 Poll::Pending
1135 }
1136 }
1137 }
1138}
1139
1140#[derive(Debug, Constructor, Deref)]
1142pub struct UnverifiedPooledTransactions<T> {
1143 txns: PooledTransactions<T>,
1144}
1145
1146#[derive(Debug, Constructor, Deref)]
1148pub struct VerifiedPooledTransactions<T> {
1149 txns: PooledTransactions<T>,
1150}
1151
1152impl<T: SignedTransaction> DedupPayload for VerifiedPooledTransactions<T> {
1153 type Value = T;
1154
1155 fn is_empty(&self) -> bool {
1156 self.txns.is_empty()
1157 }
1158
1159 fn len(&self) -> usize {
1160 self.txns.len()
1161 }
1162
1163 fn dedup(self) -> PartiallyValidData<Self::Value> {
1164 PartiallyValidData::from_raw_data(
1165 self.txns.into_iter().map(|tx| (*tx.tx_hash(), tx)).collect(),
1166 None,
1167 )
1168 }
1169}
1170
1171trait VerifyPooledTransactionsResponse {
1172 type Transaction: SignedTransaction;
1173
1174 fn verify(
1175 self,
1176 requested_hashes: &RequestTxHashes,
1177 peer_id: &PeerId,
1178 ) -> (VerificationOutcome, VerifiedPooledTransactions<Self::Transaction>);
1179}
1180
1181impl<T: SignedTransaction> VerifyPooledTransactionsResponse for UnverifiedPooledTransactions<T> {
1182 type Transaction = T;
1183
1184 fn verify(
1185 self,
1186 requested_hashes: &RequestTxHashes,
1187 _peer_id: &PeerId,
1188 ) -> (VerificationOutcome, VerifiedPooledTransactions<T>) {
1189 let mut verification_outcome = VerificationOutcome::Ok;
1190
1191 let Self { mut txns } = self;
1192
1193 #[cfg(debug_assertions)]
1194 let mut tx_hashes_not_requested: smallvec::SmallVec<[TxHash; 16]> = smallvec::smallvec!();
1195 #[cfg(not(debug_assertions))]
1196 let mut tx_hashes_not_requested_count = 0;
1197
1198 txns.0.retain(|tx| {
1199 if !requested_hashes.contains(tx.tx_hash()) {
1200 verification_outcome = VerificationOutcome::ReportPeer;
1201
1202 #[cfg(debug_assertions)]
1203 tx_hashes_not_requested.push(*tx.tx_hash());
1204 #[cfg(not(debug_assertions))]
1205 {
1206 tx_hashes_not_requested_count += 1;
1207 }
1208
1209 return false
1210 }
1211 true
1212 });
1213
1214 #[cfg(debug_assertions)]
1215 if !tx_hashes_not_requested.is_empty() {
1216 trace!(target: "net::tx",
1217 peer_id=format!("{_peer_id:#}"),
1218 ?tx_hashes_not_requested,
1219 "transactions in `PooledTransactions` response from peer were not requested"
1220 );
1221 }
1222 #[cfg(not(debug_assertions))]
1223 if tx_hashes_not_requested_count != 0 {
1224 trace!(target: "net::tx",
1225 peer_id=format!("{_peer_id:#}"),
1226 tx_hashes_not_requested_count,
1227 "transactions in `PooledTransactions` response from peer were not requested"
1228 );
1229 }
1230
1231 (verification_outcome, VerifiedPooledTransactions::new(txns))
1232 }
1233}
1234
1235#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1238pub enum VerificationOutcome {
1239 Ok,
1241 ReportPeer,
1244}
1245
1246#[derive(Debug, Constructor)]
1248pub struct TransactionFetcherInfo {
1249 pub max_inflight_requests: usize,
1251 pub max_inflight_requests_per_peer: u8,
1253 pub soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
1257 pub soft_limit_byte_size_pooled_transactions_response: usize,
1260 pub max_capacity_cache_txns_pending_fetch: u32,
1264}
1265
1266impl Default for TransactionFetcherInfo {
1267 fn default() -> Self {
1268 Self::new(
1269 DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS as usize,
1270 DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
1271 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
1272 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
1273 DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
1274 )
1275 }
1276}
1277
1278impl From<TransactionFetcherConfig> for TransactionFetcherInfo {
1279 fn from(config: TransactionFetcherConfig) -> Self {
1280 let TransactionFetcherConfig {
1281 max_inflight_requests,
1282 max_inflight_requests_per_peer,
1283 soft_limit_byte_size_pooled_transactions_response,
1284 soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1285 max_capacity_cache_txns_pending_fetch,
1286 } = config;
1287
1288 Self::new(
1289 max_inflight_requests as usize,
1290 max_inflight_requests_per_peer,
1291 soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1292 soft_limit_byte_size_pooled_transactions_response,
1293 max_capacity_cache_txns_pending_fetch,
1294 )
1295 }
1296}
1297
1298#[derive(Debug, Default)]
1299struct TxFetcherSearchDurations {
1300 find_idle_peer: Duration,
1301 fill_request: Duration,
1302}
1303
1304#[cfg(test)]
1305mod test {
1306 use super::*;
1307 use crate::test_utils::transactions::{buffer_hash_to_tx_fetcher, new_mock_session};
1308 use alloy_primitives::{
1309 hex,
1310 map::{B256Map, B256Set, HashMap},
1311 B256,
1312 };
1313 use alloy_rlp::Decodable;
1314 use derive_more::IntoIterator;
1315 use reth_eth_wire_types::EthVersion;
1316 use reth_ethereum_primitives::TransactionSigned;
1317 use std::str::FromStr;
1318
1319 #[derive(IntoIterator)]
1320 struct TestValidAnnouncementData(Vec<(TxHash, Option<(u8, usize)>)>);
1321
1322 impl HandleMempoolData for TestValidAnnouncementData {
1323 fn is_empty(&self) -> bool {
1324 self.0.is_empty()
1325 }
1326
1327 fn len(&self) -> usize {
1328 self.0.len()
1329 }
1330
1331 fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
1332 self.0.retain(|(hash, _)| f(hash))
1333 }
1334 }
1335
1336 impl HandleVersionedMempoolData for TestValidAnnouncementData {
1337 fn msg_version(&self) -> EthVersion {
1338 EthVersion::Eth68
1339 }
1340 }
1341
1342 #[test]
1343 fn pack_eth68_request() {
1344 reth_tracing::init_test_tracing();
1345
1346 let tx_fetcher = &mut TransactionFetcher::<EthNetworkPrimitives>::default();
1349
1350 let eth68_hashes = [
1351 B256::from_slice(&[1; 32]),
1352 B256::from_slice(&[2; 32]),
1353 B256::from_slice(&[3; 32]),
1354 B256::from_slice(&[4; 32]),
1355 B256::from_slice(&[5; 32]),
1356 ];
1357 let eth68_sizes = [
1358 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ - MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED - 1, DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ, 2, 9,
1362 0,
1363 ];
1364
1365 let expected_request_hashes =
1366 [eth68_hashes[0], eth68_hashes[2]].into_iter().collect::<B256Set>();
1367
1368 let expected_surplus_hashes =
1369 [eth68_hashes[1], eth68_hashes[3], eth68_hashes[4]].into_iter().collect::<B256Set>();
1370
1371 let mut eth68_hashes_to_request = RequestTxHashes::with_capacity(3);
1372
1373 let valid_announcement_data = TestValidAnnouncementData(
1374 eth68_hashes
1375 .into_iter()
1376 .zip(eth68_sizes)
1377 .map(|(hash, size)| (hash, Some((0u8, size))))
1378 .collect::<Vec<_>>(),
1379 );
1380
1381 let surplus_eth68_hashes =
1384 tx_fetcher.pack_request_eth68(&mut eth68_hashes_to_request, valid_announcement_data);
1385
1386 let eth68_hashes_to_request = eth68_hashes_to_request.into_iter().collect::<B256Set>();
1387 let surplus_eth68_hashes = surplus_eth68_hashes.into_iter().collect::<B256Set>();
1388
1389 assert_eq!(expected_request_hashes, eth68_hashes_to_request);
1390 assert_eq!(expected_surplus_hashes, surplus_eth68_hashes);
1391 }
1392
1393 #[test]
1394 fn pack_eth68_request_does_not_overflow_announced_size() {
1395 reth_tracing::init_test_tracing();
1396
1397 let tx_fetcher = &mut TransactionFetcher::<EthNetworkPrimitives>::default();
1398
1399 let eth68_hashes =
1400 [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32]), B256::from_slice(&[3; 32])];
1401 let eth68_sizes = [
1402 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ - MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED - 1,
1403 usize::MAX,
1404 2,
1405 ];
1406
1407 let expected_request_hashes =
1408 [eth68_hashes[0], eth68_hashes[2]].into_iter().collect::<B256Set>();
1409 let expected_surplus_hashes = std::iter::once(eth68_hashes[1]).collect::<B256Set>();
1410
1411 let mut eth68_hashes_to_request = RequestTxHashes::with_capacity(3);
1412 let valid_announcement_data = TestValidAnnouncementData(
1413 eth68_hashes
1414 .into_iter()
1415 .zip(eth68_sizes)
1416 .map(|(hash, size)| (hash, Some((0u8, size))))
1417 .collect::<Vec<_>>(),
1418 );
1419
1420 let surplus_eth68_hashes =
1421 tx_fetcher.pack_request_eth68(&mut eth68_hashes_to_request, valid_announcement_data);
1422
1423 let eth68_hashes_to_request = eth68_hashes_to_request.into_iter().collect::<B256Set>();
1424 let surplus_eth68_hashes = surplus_eth68_hashes.into_iter().collect::<B256Set>();
1425
1426 assert_eq!(expected_request_hashes, eth68_hashes_to_request);
1427 assert_eq!(expected_surplus_hashes, surplus_eth68_hashes);
1428 }
1429
1430 #[test]
1431 fn pack_eth72_request_uses_metadata_size_limit() {
1432 reth_tracing::init_test_tracing();
1433
1434 let tx_fetcher = &mut TransactionFetcher::<EthNetworkPrimitives>::default();
1435
1436 let hashes =
1437 [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32]), B256::from_slice(&[3; 32])];
1438 let announced_size =
1439 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ;
1440 let announcement_data = hashes
1441 .into_iter()
1442 .map(|hash| (hash, Some((0u8, announced_size))))
1443 .collect::<B256Map<_>>();
1444 let valid_announcement_data = ValidAnnouncementData::from_partially_valid_data(
1445 PartiallyValidData::from_raw_data_eth72(announcement_data),
1446 );
1447
1448 let mut hashes_to_request = RequestTxHashes::with_capacity(3);
1449 let surplus_hashes =
1450 tx_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
1451
1452 assert_eq!(1, hashes_to_request.len());
1453 assert_eq!(2, surplus_hashes.len());
1454 }
1455
1456 #[tokio::test]
1457 async fn test_on_fetch_pending_hashes() {
1458 reth_tracing::init_test_tracing();
1459
1460 let tx_fetcher = &mut TransactionFetcher::default();
1461
1462 let seen_hashes = [
1466 B256::from_slice(&[1; 32]),
1467 B256::from_slice(&[2; 32]),
1468 B256::from_slice(&[3; 32]),
1469 B256::from_slice(&[4; 32]),
1470 ];
1471 let seen_eth68_hashes_sizes = [120, 158, 116];
1476
1477 let peer_1 = PeerId::new([1; 64]);
1479 let peer_2 = PeerId::new([2; 64]);
1481
1482 let (mut peer_1_data, mut peer_1_mock_session_rx) =
1486 new_mock_session(peer_1, EthVersion::Eth66);
1487 for hash in &seen_hashes {
1488 peer_1_data.seen_transactions.insert(*hash);
1489 }
1490 let (mut peer_2_data, _) = new_mock_session(peer_2, EthVersion::Eth66);
1491 for hash in &seen_hashes {
1492 peer_2_data.seen_transactions.insert(*hash);
1493 }
1494 let mut peers = HashMap::default();
1495 peers.insert(peer_1, peer_1_data);
1496 peers.insert(peer_2, peer_2_data);
1497
1498 for i in 0..3 {
1500 buffer_hash_to_tx_fetcher(
1502 tx_fetcher,
1503 seen_hashes[i],
1504 peer_2,
1505 0,
1506 Some(seen_eth68_hashes_sizes[i]),
1507 );
1508 }
1509 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[3], peer_2, 0, None);
1510
1511 let hash_other = B256::from_slice(&[5; 32]);
1513 buffer_hash_to_tx_fetcher(tx_fetcher, hash_other, peer_2, 0, None);
1514
1515 for hash in &seen_hashes {
1517 buffer_hash_to_tx_fetcher(tx_fetcher, *hash, peer_1, 0, None);
1518 }
1519
1520 assert_eq!(tx_fetcher.num_pending_hashes(), 5);
1522
1523 tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
1526
1527 let req = peer_1_mock_session_rx
1529 .recv()
1530 .await
1531 .expect("peer session should receive request with buffered hashes");
1532 let PeerRequest::GetPooledTransactions { request, .. } = req else { unreachable!() };
1533 let GetPooledTransactions(requested_hashes) = request;
1534
1535 assert_eq!(
1536 requested_hashes.into_iter().collect::<B256Set>(),
1537 seen_hashes.into_iter().collect::<B256Set>()
1538 )
1539 }
1540
1541 #[test]
1542 fn on_fetch_pending_hashes_rebuffers_on_disconnected_peer() {
1543 let tx_fetcher = &mut TransactionFetcher::default();
1544 let peer_1 = PeerId::new([1; 64]);
1545 let peer_2 = PeerId::new([2; 64]);
1546 let hash_1 = B256::from_slice(&[1; 32]);
1547
1548 buffer_hash_to_tx_fetcher(tx_fetcher, hash_1, peer_1, 0, Some(128));
1549 buffer_hash_to_tx_fetcher(tx_fetcher, hash_1, peer_2, 0, Some(128));
1550
1551 assert_eq!(tx_fetcher.num_pending_hashes(), 1);
1552
1553 let peers = HashMap::new();
1555 tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
1556
1557 assert_eq!(tx_fetcher.num_pending_hashes(), 1);
1559 }
1560
1561 #[test]
1562 fn verify_response_hashes() {
1563 let input = hex!(
1564 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598daa"
1565 );
1566 let signed_tx_1: PooledTransaction =
1567 TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1568 let input = hex!(
1569 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
1570 );
1571 let signed_tx_2: PooledTransaction =
1572 TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1573
1574 let request_hashes = [
1576 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e67890")
1577 .unwrap(),
1578 *signed_tx_1.hash(),
1579 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e12345")
1580 .unwrap(),
1581 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4edabe3")
1582 .unwrap(),
1583 ];
1584
1585 for hash in &request_hashes {
1586 assert_ne!(hash, signed_tx_2.hash())
1587 }
1588
1589 let request_hashes = RequestTxHashes::new(request_hashes.into_iter().collect());
1590
1591 let response_txns = PooledTransactions(vec![signed_tx_1.clone(), signed_tx_2]);
1593 let payload = UnverifiedPooledTransactions::new(response_txns);
1594
1595 let (outcome, verified_payload) = payload.verify(&request_hashes, &PeerId::ZERO);
1596
1597 assert_eq!(VerificationOutcome::ReportPeer, outcome);
1598 assert_eq!(1, verified_payload.len());
1599 assert!(verified_payload.contains(&signed_tx_1));
1600 }
1601}