1use super::{
29 config::TransactionFetcherConfig,
30 constants::{tx_fetcher::*, SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST},
31 PeerMetadata, PooledTransactions, SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
32};
33use crate::{
34 cache::{LruCache, LruMap},
35 duration_metered_exec,
36 metrics::TransactionFetcherMetrics,
37};
38use alloy_consensus::transaction::PooledTransaction;
39use alloy_primitives::{map::FbBuildHasher, TxHash};
40use derive_more::{Constructor, Deref};
41use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
42use pin_project::pin_project;
43use reth_eth_wire::{
44 DedupPayload, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
45 PartiallyValidData, RequestTxHashes, ValidAnnouncementData,
46};
47use reth_eth_wire_types::{EthNetworkPrimitives, NetworkPrimitives};
48use reth_network_api::PeerRequest;
49use reth_network_p2p::error::{RequestError, RequestResult};
50use reth_network_peers::PeerId;
51use reth_primitives_traits::SignedTransaction;
52use schnellru::ByLength;
53use std::{
54 collections::HashMap,
55 pin::Pin,
56 task::{ready, Context, Poll},
57 time::Duration,
58};
59use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
60use tracing::trace;
61
62#[derive(Debug)]
67#[pin_project]
68pub struct TransactionFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
69 pub active_peers: LruMap<PeerId, u8, ByLength, FbBuildHasher<64>>,
71 #[pin]
77 pub inflight_requests: FuturesUnordered<GetPooledTxRequestFut<N::PooledTransaction>>,
78 pub hashes_pending_fetch: LruCache<TxHash, FbBuildHasher<32>>,
83 pub hashes_fetch_inflight_and_pending_fetch:
85 LruMap<TxHash, TxFetchMetadata, ByLength, FbBuildHasher<32>>,
86 pub info: TransactionFetcherInfo,
88 #[doc(hidden)]
89 metrics: TransactionFetcherMetrics,
90}
91
92impl<N: NetworkPrimitives> TransactionFetcher<N> {
93 pub(crate) fn remove_peer(&mut self, peer_id: &PeerId) {
95 self.active_peers.remove(peer_id);
96 }
97
98 #[inline]
100 pub fn update_metrics(&self) {
101 let metrics = &self.metrics;
102
103 metrics.inflight_transaction_requests.set(self.inflight_requests.len() as f64);
104
105 let hashes_pending_fetch = self.hashes_pending_fetch.len() as f64;
106 let total_hashes = self.hashes_fetch_inflight_and_pending_fetch.len() as f64;
107
108 metrics.hashes_pending_fetch.set(hashes_pending_fetch);
109 metrics.hashes_inflight_transaction_requests.set(total_hashes - hashes_pending_fetch);
110 }
111
112 #[inline]
113 fn update_pending_fetch_cache_search_metrics(&self, durations: TxFetcherSearchDurations) {
114 let metrics = &self.metrics;
115
116 let TxFetcherSearchDurations { find_idle_peer, fill_request } = durations;
117 metrics
118 .duration_find_idle_fallback_peer_for_any_pending_hash
119 .set(find_idle_peer.as_secs_f64());
120 metrics.duration_fill_request_from_hashes_pending_fetch.set(fill_request.as_secs_f64());
121 }
122
123 pub fn with_transaction_fetcher_config(config: &TransactionFetcherConfig) -> Self {
125 let TransactionFetcherConfig {
126 max_inflight_requests,
127 max_capacity_cache_txns_pending_fetch,
128 ..
129 } = *config;
130
131 let info = config.clone().into();
132
133 let metrics = TransactionFetcherMetrics::default();
134 metrics.capacity_inflight_requests.increment(max_inflight_requests as u64);
135
136 Self {
137 active_peers: LruMap::with_hasher(max_inflight_requests, Default::default()),
138 hashes_pending_fetch: LruCache::with_hasher(
139 max_capacity_cache_txns_pending_fetch,
140 Default::default(),
141 ),
142 hashes_fetch_inflight_and_pending_fetch: LruMap::with_hasher(
143 max_inflight_requests + max_capacity_cache_txns_pending_fetch,
144 Default::default(),
145 ),
146 info,
147 metrics,
148 ..Default::default()
149 }
150 }
151
152 #[inline]
154 pub fn remove_hashes_from_transaction_fetcher<'a, I>(&mut self, hashes: I)
155 where
156 I: IntoIterator<Item = &'a TxHash>,
157 {
158 for hash in hashes {
159 self.hashes_fetch_inflight_and_pending_fetch.remove(hash);
160 self.hashes_pending_fetch.remove(hash);
161 }
162 }
163
164 fn decrement_inflight_request_count_for(&mut self, peer_id: &PeerId) {
166 let remove = || -> bool {
167 if let Some(inflight_count) = self.active_peers.get(peer_id) {
168 *inflight_count = inflight_count.saturating_sub(1);
169 if *inflight_count == 0 {
170 return true
171 }
172 }
173 false
174 }();
175
176 if remove {
177 self.active_peers.remove(peer_id);
178 }
179 }
180
181 #[inline]
183 pub fn is_idle(&self, peer_id: &PeerId) -> bool {
184 let Some(inflight_count) = self.active_peers.peek(peer_id) else { return true };
185 if *inflight_count < self.info.max_inflight_requests_per_peer {
186 return true
187 }
188 false
189 }
190
191 pub fn get_idle_peer_for(&self, hash: TxHash) -> Option<&PeerId> {
193 let TxFetchMetadata { fallback_peers, .. } =
194 self.hashes_fetch_inflight_and_pending_fetch.peek(&hash)?;
195
196 fallback_peers.iter().find(|peer_id| self.is_idle(peer_id))
197 }
198
199 pub fn find_any_idle_fallback_peer_for_any_pending_hash(
205 &mut self,
206 hashes_to_request: &mut RequestTxHashes,
207 mut budget: Option<usize>, ) -> Option<PeerId> {
209 let mut hashes_pending_fetch_iter = self.hashes_pending_fetch.iter();
210
211 let idle_peer = loop {
212 let &hash = hashes_pending_fetch_iter.next()?;
213
214 let idle_peer = self.get_idle_peer_for(hash);
215
216 if idle_peer.is_some() {
217 hashes_to_request.insert(hash);
218 break idle_peer.copied()
219 }
220
221 if let Some(ref mut bud) = budget {
222 *bud = bud.saturating_sub(1);
223 if *bud == 0 {
224 return None
225 }
226 }
227 };
228 let hash = hashes_to_request.iter().next()?;
229
230 drop(hashes_pending_fetch_iter);
232 _ = self.hashes_pending_fetch.remove(hash);
233
234 idle_peer
235 }
236
237 pub fn pack_request(
242 &self,
243 hashes_to_request: &mut RequestTxHashes,
244 hashes_from_announcement: ValidAnnouncementData,
245 ) -> RequestTxHashes {
246 if hashes_from_announcement.msg_version().has_eth68_metadata() {
247 return self.pack_request_eth68(hashes_to_request, hashes_from_announcement)
248 }
249 self.pack_request_eth66(hashes_to_request, hashes_from_announcement)
250 }
251
252 pub fn pack_request_eth68(
263 &self,
264 hashes_to_request: &mut RequestTxHashes,
265 hashes_from_announcement: impl HandleMempoolData
266 + IntoIterator<Item = (TxHash, Option<(u8, usize)>)>,
267 ) -> RequestTxHashes {
268 let mut acc_size_response = 0;
269
270 let mut hashes_from_announcement_iter = hashes_from_announcement.into_iter();
271
272 if let Some((hash, Some((_ty, size)))) = hashes_from_announcement_iter.next() {
273 hashes_to_request.insert(hash);
274
275 if size >= self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request {
277 return hashes_from_announcement_iter.collect()
278 }
279 acc_size_response = size;
280 }
281
282 let mut surplus_hashes = RequestTxHashes::default();
283
284 for (hash, metadata) in hashes_from_announcement_iter.by_ref() {
287 let Some((_ty, size)) = metadata else {
288 unreachable!("this method is called upon reception of an eth68 announcement")
289 };
290
291 let next_acc_size = acc_size_response.checked_add(size).filter(|next_acc_size| {
292 *next_acc_size <=
293 self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request
294 });
295
296 if let Some(next_acc_size) = next_acc_size {
297 acc_size_response = next_acc_size;
300 _ = hashes_to_request.insert(hash)
301 } else {
302 _ = surplus_hashes.insert(hash)
303 }
304
305 let free_space =
306 self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request -
307 acc_size_response;
308
309 if free_space < MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED {
310 break
311 }
312 }
313
314 surplus_hashes.extend(hashes_from_announcement_iter.map(|(hash, _metadata)| hash));
315
316 surplus_hashes
317 }
318
319 pub fn pack_request_eth66(
326 &self,
327 hashes_to_request: &mut RequestTxHashes,
328 hashes_from_announcement: ValidAnnouncementData,
329 ) -> RequestTxHashes {
330 let (mut hashes, _version) = hashes_from_announcement.into_request_hashes();
331 if hashes.len() <= SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST {
332 *hashes_to_request = hashes;
333 hashes_to_request.shrink_to_fit();
334
335 RequestTxHashes::default()
336 } else {
337 let surplus_hashes =
338 hashes.retain_count(SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST);
339 *hashes_to_request = hashes;
340 hashes_to_request.shrink_to_fit();
341
342 surplus_hashes
343 }
344 }
345
346 pub fn try_buffer_hashes_for_retry(
348 &mut self,
349 mut hashes: RequestTxHashes,
350 peer_failed_to_serve: &PeerId,
351 ) {
352 hashes.retain(|hash| {
355 if let Some(entry) = self.hashes_fetch_inflight_and_pending_fetch.get(hash) {
356 entry.fallback_peers_mut().remove(peer_failed_to_serve);
357 return true
358 }
359 false
361 });
362
363 self.buffer_hashes(hashes, None)
364 }
365
366 pub fn num_pending_hashes(&self) -> usize {
368 self.hashes_pending_fetch.len()
369 }
370
371 pub fn num_all_hashes(&self) -> usize {
373 self.hashes_fetch_inflight_and_pending_fetch.len()
374 }
375
376 pub fn buffer_hashes(&mut self, hashes: RequestTxHashes, fallback_peer: Option<PeerId>) {
381 for hash in hashes {
382 if self.hashes_fetch_inflight_and_pending_fetch.peek(&hash).is_none() {
384 continue
385 }
386
387 let Some(TxFetchMetadata { retries, fallback_peers, .. }) =
388 self.hashes_fetch_inflight_and_pending_fetch.get(&hash)
389 else {
390 continue
391 };
392
393 if let Some(peer_id) = fallback_peer {
394 fallback_peers.insert(peer_id);
396 } else {
397 if *retries >= DEFAULT_MAX_RETRIES {
398 trace!(target: "net::tx",
399 %hash,
400 retries,
401 "retry limit for `GetPooledTransactions` requests reached for hash, dropping hash"
402 );
403
404 self.hashes_fetch_inflight_and_pending_fetch.remove(&hash);
405 self.hashes_pending_fetch.remove(&hash);
406 continue
407 }
408 *retries += 1;
409 }
410
411 if let (_, Some(evicted_hash)) = self.hashes_pending_fetch.insert_and_get_evicted(hash)
412 {
413 self.hashes_fetch_inflight_and_pending_fetch.remove(&evicted_hash);
414 }
415 }
416 }
417
418 pub fn on_fetch_pending_hashes(
423 &mut self,
424 peers: &HashMap<PeerId, PeerMetadata<N>>,
425 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
426 ) -> bool {
427 let mut hashes_to_request = RequestTxHashes::with_capacity(
428 DEFAULT_MARGINAL_COUNT_HASHES_GET_POOLED_TRANSACTIONS_REQUEST,
429 );
430 let mut search_durations = TxFetcherSearchDurations::default();
431
432 let budget_find_idle_fallback_peer = self
434 .search_breadth_budget_find_idle_fallback_peer(&has_capacity_wrt_pending_pool_imports);
435
436 let peer_id = duration_metered_exec!(
437 {
438 let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash(
439 &mut hashes_to_request,
440 budget_find_idle_fallback_peer,
441 ) else {
442 return false
444 };
445
446 peer_id
447 },
448 search_durations.find_idle_peer
449 );
450
451 let Some(peer) = peers.get(&peer_id) else {
454 self.buffer_hashes(hashes_to_request, None);
455 return false
456 };
457 let conn_eth_version = peer.version;
458
459 let budget_fill_request = self
464 .search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
465 &has_capacity_wrt_pending_pool_imports,
466 );
467
468 duration_metered_exec!(
469 {
470 self.fill_request_from_hashes_pending_fetch(
471 &mut hashes_to_request,
472 &peer.seen_transactions,
473 budget_fill_request,
474 )
475 },
476 search_durations.fill_request
477 );
478
479 self.update_pending_fetch_cache_search_metrics(search_durations);
480
481 trace!(target: "net::tx",
482 peer_id=format!("{peer_id:#}"),
483 hashes=?*hashes_to_request,
484 %conn_eth_version,
485 "requesting hashes that were stored pending fetch from peer"
486 );
487
488 if let Some(failed_to_request_hashes) =
490 self.request_transactions_from_peer(hashes_to_request, peer)
491 {
492 trace!(target: "net::tx",
493 peer_id=format!("{peer_id:#}"),
494 ?failed_to_request_hashes,
495 %conn_eth_version,
496 "failed sending request to peer's session, buffering hashes"
497 );
498
499 self.buffer_hashes(failed_to_request_hashes, Some(peer_id));
500 return false
501 }
502
503 true
504 }
505
506 pub fn filter_unseen_and_pending_hashes(
509 &mut self,
510 new_announced_hashes: &mut ValidAnnouncementData,
511 is_tx_bad_import: impl Fn(&TxHash) -> bool,
512 peer_id: &PeerId,
513 client_version: &str,
514 ) {
515 let mut previously_unseen_hashes_count = 0;
516
517 let msg_version = new_announced_hashes.msg_version();
518
519 new_announced_hashes.retain(|hash, metadata| {
521
522 if let Some(TxFetchMetadata{ tx_encoded_length: previously_seen_size, ..}) = self.hashes_fetch_inflight_and_pending_fetch.peek_mut(hash) {
524 if let Some((_ty, size)) = metadata {
526 if let Some(prev_size) = previously_seen_size {
527 if size != prev_size {
529 trace!(target: "net::tx",
530 peer_id=format!("{peer_id:#}"),
531 %hash,
532 size,
533 previously_seen_size,
534 %client_version,
535 "peer announced a different size for tx, this is especially worrying if one size is much bigger..."
536 );
537 }
538 }
539 *previously_seen_size = Some(*size);
541 }
542
543 if self.hashes_pending_fetch.remove(hash) {
545 return true
546 }
547
548 return false
549 }
550
551 if is_tx_bad_import(hash) {
554 return false
555 }
556
557 previously_unseen_hashes_count += 1;
558
559 if self
560 .hashes_fetch_inflight_and_pending_fetch
561 .get_or_insert(*hash, || TxFetchMetadata {
562 retries: 0,
563 fallback_peers: LruCache::with_hasher(
564 DEFAULT_MAX_COUNT_FALLBACK_PEERS as u32,
565 Default::default(),
566 ),
567 tx_encoded_length: None,
568 })
569 .is_none()
570 {
571
572 trace!(target: "net::tx",
573 peer_id=format!("{peer_id:#}"),
574 %hash,
575 ?msg_version,
576 %client_version,
577 "failed to cache new announced hash from peer in schnellru::LruMap, dropping hash"
578 );
579
580 return false
581 }
582 true
583 });
584
585 trace!(target: "net::tx",
586 peer_id=format!("{peer_id:#}"),
587 previously_unseen_hashes_count=previously_unseen_hashes_count,
588 msg_version=?msg_version,
589 client_version=%client_version,
590 "received previously unseen hashes in announcement from peer"
591 );
592 }
593
594 pub fn request_transactions_from_peer(
602 &mut self,
603 new_announced_hashes: RequestTxHashes,
604 peer: &PeerMetadata<N>,
605 ) -> Option<RequestTxHashes> {
606 let peer_id: PeerId = peer.request_tx.peer_id;
607 let conn_eth_version = peer.version;
608
609 if self.active_peers.len() >= self.info.max_inflight_requests {
610 trace!(target: "net::tx",
611 peer_id=format!("{peer_id:#}"),
612 hashes=?*new_announced_hashes,
613 %conn_eth_version,
614 max_inflight_transaction_requests=self.info.max_inflight_requests,
615 "limit for concurrent `GetPooledTransactions` requests reached, dropping request for hashes to peer"
616 );
617 return Some(new_announced_hashes)
618 }
619
620 let Some(inflight_count) = self.active_peers.get_or_insert(peer_id, || 0) else {
621 trace!(target: "net::tx",
622 peer_id=format!("{peer_id:#}"),
623 hashes=?*new_announced_hashes,
624 conn_eth_version=%conn_eth_version,
625 "failed to cache active peer in schnellru::LruMap, dropping request to peer"
626 );
627 return Some(new_announced_hashes)
628 };
629
630 if *inflight_count >= self.info.max_inflight_requests_per_peer {
631 trace!(target: "net::tx",
632 peer_id=format!("{peer_id:#}"),
633 hashes=?*new_announced_hashes,
634 %conn_eth_version,
635 max_concurrent_tx_reqs_per_peer=self.info.max_inflight_requests_per_peer,
636 "limit for concurrent `GetPooledTransactions` requests per peer reached"
637 );
638 return Some(new_announced_hashes)
639 }
640
641 #[cfg(debug_assertions)]
642 {
643 for hash in &new_announced_hashes {
644 if self.hashes_pending_fetch.contains(hash) {
645 tracing::debug!(target: "net::tx", "`{}` should have been taken out of buffer before packing in a request, breaks invariant `@hashes_pending_fetch` and `@inflight_requests`, `@hashes_fetch_inflight_and_pending_fetch` for `{}`: {:?}",
646 format!("{:?}", new_announced_hashes), format!("{:?}", new_announced_hashes),
648 new_announced_hashes.iter().map(|hash| {
649 let metadata = self.hashes_fetch_inflight_and_pending_fetch.get(hash);
650 (*hash, metadata.map(|m| (m.retries, m.tx_encoded_length)))
652 }).collect::<Vec<(TxHash, Option<(u8, Option<usize>)>)>>())
653 }
654 }
655 }
656
657 let (response, rx) = oneshot::channel();
658 let req = PeerRequest::GetPooledTransactions {
659 request: GetPooledTransactions(new_announced_hashes.iter().copied().collect()),
660 response,
661 };
662
663 if let Err(err) = peer.request_tx.try_send(req) {
665 return match err {
667 TrySendError::Full(_) | TrySendError::Closed(_) => {
668 self.metrics.egress_peer_channel_full.increment(1);
669 Some(new_announced_hashes)
670 }
671 }
672 }
673
674 *inflight_count += 1;
675 self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, new_announced_hashes, rx));
677
678 None
679 }
680
681 pub fn fill_request_from_hashes_pending_fetch(
701 &mut self,
702 hashes_to_request: &mut RequestTxHashes,
703 seen_hashes: &LruCache<TxHash, FbBuildHasher<32>>,
704 mut budget_fill_request: Option<usize>, ) {
706 let Some(hash) = hashes_to_request.iter().next() else { return };
707
708 let mut acc_size_response = self
709 .hashes_fetch_inflight_and_pending_fetch
710 .get(hash)
711 .and_then(|entry| entry.tx_encoded_len())
712 .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
713
714 if acc_size_response >=
716 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES
717 {
718 return
719 }
720
721 for hash in self.hashes_pending_fetch.iter() {
724 if !seen_hashes.contains(hash) {
726 continue
727 };
728
729 hashes_to_request.insert(*hash);
731
732 let size = self
734 .hashes_fetch_inflight_and_pending_fetch
735 .get(hash)
736 .and_then(|entry| entry.tx_encoded_len())
737 .unwrap_or(AVERAGE_BYTE_SIZE_TX_ENCODED);
738
739 acc_size_response = acc_size_response.saturating_add(size);
740
741 if acc_size_response >=
745 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE_ON_FETCH_PENDING_HASHES ||
746 hashes_to_request.len() >
747 DEFAULT_SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST_ON_FETCH_PENDING_HASHES
748 {
749 break
750 }
751
752 if let Some(ref mut bud) = budget_fill_request {
753 *bud -= 1;
754 if *bud == 0 {
755 break
756 }
757 }
758 }
759
760 for hash in hashes_to_request.iter() {
762 self.hashes_pending_fetch.remove(hash);
763 }
764 }
765
766 pub fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
769 let info = &self.info;
770
771 self.has_capacity(info.max_inflight_requests)
772 }
773
774 fn has_capacity(&self, max_inflight_requests: usize) -> bool {
776 self.inflight_requests.len() <= max_inflight_requests
777 }
778
779 pub fn search_breadth_budget_find_idle_fallback_peer(
785 &self,
786 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
787 ) -> Option<usize> {
788 let info = &self.info;
789
790 let tx_fetcher_has_capacity = self.has_capacity(
791 info.max_inflight_requests /
792 DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_IDLE_PEER,
793 );
794 let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
795 DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_IDLE_PEER,
796 );
797
798 if tx_fetcher_has_capacity && tx_pool_has_capacity {
799 None
801 } else {
802 let limit = DEFAULT_BUDGET_FIND_IDLE_FALLBACK_PEER;
804
805 trace!(target: "net::tx",
806 inflight_requests=self.inflight_requests.len(),
807 max_inflight_transaction_requests=info.max_inflight_requests,
808 hashes_pending_fetch=self.hashes_pending_fetch.len(),
809 limit,
810 "search breadth limited in search for idle fallback peer for some hash pending fetch"
811 );
812
813 Some(limit)
814 }
815 }
816
817 pub fn search_breadth_budget_find_intersection_pending_hashes_and_hashes_seen_by_peer(
824 &self,
825 has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
826 ) -> Option<usize> {
827 let info = &self.info;
828
829 let tx_fetcher_has_capacity = self.has_capacity(
830 info.max_inflight_requests /
831 DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_INTERSECTION,
832 );
833 let tx_pool_has_capacity = has_capacity_wrt_pending_pool_imports(
834 DEFAULT_DIVISOR_MAX_COUNT_PENDING_POOL_IMPORTS_ON_FIND_INTERSECTION,
835 );
836
837 if tx_fetcher_has_capacity && tx_pool_has_capacity {
838 None
840 } else {
841 let limit = DEFAULT_BUDGET_FIND_INTERSECTION_ANNOUNCED_BY_PEER_AND_PENDING_FETCH;
843
844 trace!(target: "net::tx",
845 inflight_requests=self.inflight_requests.len(),
846 max_inflight_transaction_requests=self.info.max_inflight_requests,
847 hashes_pending_fetch=self.hashes_pending_fetch.len(),
848 limit=limit,
849 "search breadth limited in search for intersection of hashes announced by peer and hashes pending fetch"
850 );
851
852 Some(limit)
853 }
854 }
855
856 pub fn on_resolved_get_pooled_transactions_request_fut(
860 &mut self,
861 response: GetPooledTxResponse<N::PooledTransaction>,
862 ) -> FetchEvent<N::PooledTransaction> {
863 let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response;
866
867 self.decrement_inflight_request_count_for(&peer_id);
868
869 match result {
870 Ok(Ok(transactions)) => {
871 if transactions.is_empty() {
876 trace!(target: "net::tx",
877 peer_id=format!("{peer_id:#}"),
878 requested_hashes_len=requested_hashes.len(),
879 "received empty `PooledTransactions` response from peer, peer failed to serve hashes it announced"
880 );
881
882 return FetchEvent::EmptyResponse { peer_id }
883 }
884
885 let payload = UnverifiedPooledTransactions::new(transactions);
889
890 let unverified_len = payload.len();
891 let (verification_outcome, verified_payload) =
892 payload.verify(&requested_hashes, &peer_id);
893
894 let unsolicited = unverified_len - verified_payload.len();
895 if unsolicited > 0 {
896 self.metrics.unsolicited_transactions.increment(unsolicited as u64);
897 }
898
899 let report_peer = if verification_outcome == VerificationOutcome::ReportPeer {
900 trace!(target: "net::tx",
901 peer_id=format!("{peer_id:#}"),
902 unverified_len,
903 verified_payload_len=verified_payload.len(),
904 "received `PooledTransactions` response from peer with entries that didn't verify against request, filtered out transactions"
905 );
906 true
907 } else {
908 false
909 };
910
911 if verified_payload.is_empty() {
913 return FetchEvent::FetchError { peer_id, error: RequestError::BadResponse }
914 }
915
916 let unvalidated_payload_len = verified_payload.len();
920
921 let valid_payload = verified_payload.dedup();
922
923 if valid_payload.len() != unvalidated_payload_len {
929 trace!(target: "net::tx",
930 peer_id=format!("{peer_id:#}"),
931 unvalidated_payload_len,
932 valid_payload_len=valid_payload.len(),
933 "received `PooledTransactions` response from peer with duplicate entries, filtered them out"
934 );
935 }
936 let requested_hashes_len = requested_hashes.len();
944 let mut fetched = Vec::with_capacity(valid_payload.len());
945 requested_hashes.retain(|requested_hash| {
946 if valid_payload.contains_key(requested_hash) {
947 fetched.push(*requested_hash);
949 return false
950 }
951 true
952 });
953 fetched.shrink_to_fit();
954 self.metrics.fetched_transactions.increment(fetched.len() as u64);
955
956 if fetched.len() < requested_hashes_len {
957 trace!(target: "net::tx",
958 peer_id=format!("{peer_id:#}"),
959 requested_hashes_len=requested_hashes_len,
960 fetched_len=fetched.len(),
961 "peer failed to serve hashes it announced"
962 );
963 }
964
965 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
969
970 let transactions = valid_payload.into_data().into_values().collect();
971
972 FetchEvent::TransactionsFetched { peer_id, transactions, report_peer }
973 }
974 Ok(Err(req_err)) => {
975 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
976 FetchEvent::FetchError { peer_id, error: req_err }
977 }
978 Err(_) => {
979 self.try_buffer_hashes_for_retry(requested_hashes, &peer_id);
980 FetchEvent::FetchError { peer_id, error: RequestError::ChannelClosed }
982 }
983 }
984 }
985}
986
987impl<N: NetworkPrimitives> Stream for TransactionFetcher<N> {
988 type Item = FetchEvent<N::PooledTransaction>;
989
990 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
992 if self.inflight_requests.is_empty() {
995 return Poll::Pending
996 }
997
998 if let Some(resp) = ready!(self.inflight_requests.poll_next_unpin(cx)) {
999 return Poll::Ready(Some(self.on_resolved_get_pooled_transactions_request_fut(resp)))
1000 }
1001
1002 Poll::Pending
1003 }
1004}
1005
1006impl<T: NetworkPrimitives> Default for TransactionFetcher<T> {
1007 fn default() -> Self {
1008 Self {
1009 active_peers: LruMap::with_hasher(
1010 DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS,
1011 Default::default(),
1012 ),
1013 inflight_requests: Default::default(),
1014 hashes_pending_fetch: LruCache::with_hasher(
1015 DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
1016 Default::default(),
1017 ),
1018 hashes_fetch_inflight_and_pending_fetch: LruMap::with_hasher(
1019 DEFAULT_MAX_CAPACITY_CACHE_INFLIGHT_AND_PENDING_FETCH,
1020 Default::default(),
1021 ),
1022 info: TransactionFetcherInfo::default(),
1023 metrics: Default::default(),
1024 }
1025 }
1026}
1027
1028#[derive(Debug, Constructor)]
1030pub struct TxFetchMetadata {
1031 retries: u8,
1033 fallback_peers: LruCache<PeerId, FbBuildHasher<64>>,
1035 tx_encoded_length: Option<usize>,
1040}
1041
1042impl TxFetchMetadata {
1043 pub const fn fallback_peers_mut(&mut self) -> &mut LruCache<PeerId, FbBuildHasher<64>> {
1045 &mut self.fallback_peers
1046 }
1047
1048 pub const fn tx_encoded_len(&self) -> Option<usize> {
1053 self.tx_encoded_length
1054 }
1055}
1056
1057#[derive(Debug)]
1059pub enum FetchEvent<T = PooledTransaction> {
1060 TransactionsFetched {
1062 peer_id: PeerId,
1064 transactions: PooledTransactions<T>,
1066 report_peer: bool,
1069 },
1070 FetchError {
1072 peer_id: PeerId,
1074 error: RequestError,
1076 },
1077 EmptyResponse {
1079 peer_id: PeerId,
1081 },
1082}
1083
1084#[derive(Debug)]
1086pub struct GetPooledTxRequest<T = PooledTransaction> {
1087 peer_id: PeerId,
1088 requested_hashes: RequestTxHashes,
1090 response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1091}
1092
1093#[derive(Debug)]
1096pub struct GetPooledTxResponse<T = PooledTransaction> {
1097 peer_id: PeerId,
1098 requested_hashes: RequestTxHashes,
1101 result: Result<RequestResult<PooledTransactions<T>>, RecvError>,
1102}
1103
1104#[must_use = "futures do nothing unless polled"]
1107#[pin_project::pin_project]
1108#[derive(Debug)]
1109pub struct GetPooledTxRequestFut<T = PooledTransaction> {
1110 #[pin]
1111 inner: Option<GetPooledTxRequest<T>>,
1112}
1113
1114impl<T> GetPooledTxRequestFut<T> {
1115 #[inline]
1116 const fn new(
1117 peer_id: PeerId,
1118 requested_hashes: RequestTxHashes,
1119 response: oneshot::Receiver<RequestResult<PooledTransactions<T>>>,
1120 ) -> Self {
1121 Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) }
1122 }
1123}
1124
1125impl<T> Future for GetPooledTxRequestFut<T> {
1126 type Output = GetPooledTxResponse<T>;
1127
1128 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1129 let mut req = self.as_mut().project().inner.take().expect("polled after completion");
1130 match req.response.poll_unpin(cx) {
1131 Poll::Ready(result) => Poll::Ready(GetPooledTxResponse {
1132 peer_id: req.peer_id,
1133 requested_hashes: req.requested_hashes,
1134 result,
1135 }),
1136 Poll::Pending => {
1137 self.project().inner.set(Some(req));
1138 Poll::Pending
1139 }
1140 }
1141 }
1142}
1143
1144#[derive(Debug, Constructor, Deref)]
1146pub struct UnverifiedPooledTransactions<T> {
1147 txns: PooledTransactions<T>,
1148}
1149
1150#[derive(Debug, Constructor, Deref)]
1152pub struct VerifiedPooledTransactions<T> {
1153 txns: PooledTransactions<T>,
1154}
1155
1156impl<T: SignedTransaction> DedupPayload for VerifiedPooledTransactions<T> {
1157 type Value = T;
1158
1159 fn is_empty(&self) -> bool {
1160 self.txns.is_empty()
1161 }
1162
1163 fn len(&self) -> usize {
1164 self.txns.len()
1165 }
1166
1167 fn dedup(self) -> PartiallyValidData<Self::Value> {
1168 PartiallyValidData::from_raw_data(
1169 self.txns.into_iter().map(|tx| (*tx.tx_hash(), tx)).collect(),
1170 None,
1171 )
1172 }
1173}
1174
1175trait VerifyPooledTransactionsResponse {
1176 type Transaction: SignedTransaction;
1177
1178 fn verify(
1179 self,
1180 requested_hashes: &RequestTxHashes,
1181 peer_id: &PeerId,
1182 ) -> (VerificationOutcome, VerifiedPooledTransactions<Self::Transaction>);
1183}
1184
1185impl<T: SignedTransaction> VerifyPooledTransactionsResponse for UnverifiedPooledTransactions<T> {
1186 type Transaction = T;
1187
1188 fn verify(
1189 self,
1190 requested_hashes: &RequestTxHashes,
1191 _peer_id: &PeerId,
1192 ) -> (VerificationOutcome, VerifiedPooledTransactions<T>) {
1193 let mut verification_outcome = VerificationOutcome::Ok;
1194
1195 let Self { mut txns } = self;
1196
1197 #[cfg(debug_assertions)]
1198 let mut tx_hashes_not_requested: smallvec::SmallVec<[TxHash; 16]> = smallvec::smallvec!();
1199 #[cfg(not(debug_assertions))]
1200 let mut tx_hashes_not_requested_count = 0;
1201
1202 txns.0.retain(|tx| {
1203 if !requested_hashes.contains(tx.tx_hash()) {
1204 verification_outcome = VerificationOutcome::ReportPeer;
1205
1206 #[cfg(debug_assertions)]
1207 tx_hashes_not_requested.push(*tx.tx_hash());
1208 #[cfg(not(debug_assertions))]
1209 {
1210 tx_hashes_not_requested_count += 1;
1211 }
1212
1213 return false
1214 }
1215 true
1216 });
1217
1218 #[cfg(debug_assertions)]
1219 if !tx_hashes_not_requested.is_empty() {
1220 trace!(target: "net::tx",
1221 peer_id=format!("{_peer_id:#}"),
1222 ?tx_hashes_not_requested,
1223 "transactions in `PooledTransactions` response from peer were not requested"
1224 );
1225 }
1226 #[cfg(not(debug_assertions))]
1227 if tx_hashes_not_requested_count != 0 {
1228 trace!(target: "net::tx",
1229 peer_id=format!("{_peer_id:#}"),
1230 tx_hashes_not_requested_count,
1231 "transactions in `PooledTransactions` response from peer were not requested"
1232 );
1233 }
1234
1235 (verification_outcome, VerifiedPooledTransactions::new(txns))
1236 }
1237}
1238
1239#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1242pub enum VerificationOutcome {
1243 Ok,
1245 ReportPeer,
1248}
1249
1250#[derive(Debug, Constructor)]
1252pub struct TransactionFetcherInfo {
1253 pub max_inflight_requests: usize,
1255 pub max_inflight_requests_per_peer: u8,
1257 pub soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
1261 pub soft_limit_byte_size_pooled_transactions_response: usize,
1264 pub max_capacity_cache_txns_pending_fetch: u32,
1268}
1269
1270impl Default for TransactionFetcherInfo {
1271 fn default() -> Self {
1272 Self::new(
1273 DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS as usize,
1274 DEFAULT_MAX_COUNT_CONCURRENT_REQUESTS_PER_PEER,
1275 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ,
1276 SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
1277 DEFAULT_MAX_CAPACITY_CACHE_PENDING_FETCH,
1278 )
1279 }
1280}
1281
1282impl From<TransactionFetcherConfig> for TransactionFetcherInfo {
1283 fn from(config: TransactionFetcherConfig) -> Self {
1284 let TransactionFetcherConfig {
1285 max_inflight_requests,
1286 max_inflight_requests_per_peer,
1287 soft_limit_byte_size_pooled_transactions_response,
1288 soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1289 max_capacity_cache_txns_pending_fetch,
1290 } = config;
1291
1292 Self::new(
1293 max_inflight_requests as usize,
1294 max_inflight_requests_per_peer,
1295 soft_limit_byte_size_pooled_transactions_response_on_pack_request,
1296 soft_limit_byte_size_pooled_transactions_response,
1297 max_capacity_cache_txns_pending_fetch,
1298 )
1299 }
1300}
1301
1302#[derive(Debug, Default)]
1303struct TxFetcherSearchDurations {
1304 find_idle_peer: Duration,
1305 fill_request: Duration,
1306}
1307
1308#[cfg(test)]
1309mod test {
1310 use super::*;
1311 use crate::test_utils::transactions::{buffer_hash_to_tx_fetcher, new_mock_session};
1312 use alloy_primitives::{
1313 hex,
1314 map::{B256Map, B256Set, HashMap},
1315 B256,
1316 };
1317 use alloy_rlp::Decodable;
1318 use derive_more::IntoIterator;
1319 use reth_eth_wire_types::EthVersion;
1320 use reth_ethereum_primitives::TransactionSigned;
1321 use std::str::FromStr;
1322
1323 #[derive(IntoIterator)]
1324 struct TestValidAnnouncementData(Vec<(TxHash, Option<(u8, usize)>)>);
1325
1326 impl HandleMempoolData for TestValidAnnouncementData {
1327 fn is_empty(&self) -> bool {
1328 self.0.is_empty()
1329 }
1330
1331 fn len(&self) -> usize {
1332 self.0.len()
1333 }
1334
1335 fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
1336 self.0.retain(|(hash, _)| f(hash))
1337 }
1338 }
1339
1340 impl HandleVersionedMempoolData for TestValidAnnouncementData {
1341 fn msg_version(&self) -> EthVersion {
1342 EthVersion::Eth68
1343 }
1344 }
1345
1346 #[test]
1347 fn pack_eth68_request() {
1348 reth_tracing::init_test_tracing();
1349
1350 let tx_fetcher = &mut TransactionFetcher::<EthNetworkPrimitives>::default();
1353
1354 let eth68_hashes = [
1355 B256::from_slice(&[1; 32]),
1356 B256::from_slice(&[2; 32]),
1357 B256::from_slice(&[3; 32]),
1358 B256::from_slice(&[4; 32]),
1359 B256::from_slice(&[5; 32]),
1360 ];
1361 let eth68_sizes = [
1362 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ - MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED - 1, DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ, 2, 9,
1366 0,
1367 ];
1368
1369 let expected_request_hashes =
1370 [eth68_hashes[0], eth68_hashes[2]].into_iter().collect::<B256Set>();
1371
1372 let expected_surplus_hashes =
1373 [eth68_hashes[1], eth68_hashes[3], eth68_hashes[4]].into_iter().collect::<B256Set>();
1374
1375 let mut eth68_hashes_to_request = RequestTxHashes::with_capacity(3);
1376
1377 let valid_announcement_data = TestValidAnnouncementData(
1378 eth68_hashes
1379 .into_iter()
1380 .zip(eth68_sizes)
1381 .map(|(hash, size)| (hash, Some((0u8, size))))
1382 .collect::<Vec<_>>(),
1383 );
1384
1385 let surplus_eth68_hashes =
1388 tx_fetcher.pack_request_eth68(&mut eth68_hashes_to_request, valid_announcement_data);
1389
1390 let eth68_hashes_to_request = eth68_hashes_to_request.into_iter().collect::<B256Set>();
1391 let surplus_eth68_hashes = surplus_eth68_hashes.into_iter().collect::<B256Set>();
1392
1393 assert_eq!(expected_request_hashes, eth68_hashes_to_request);
1394 assert_eq!(expected_surplus_hashes, surplus_eth68_hashes);
1395 }
1396
1397 #[test]
1398 fn pack_eth68_request_does_not_overflow_announced_size() {
1399 reth_tracing::init_test_tracing();
1400
1401 let tx_fetcher = &mut TransactionFetcher::<EthNetworkPrimitives>::default();
1402
1403 let eth68_hashes =
1404 [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32]), B256::from_slice(&[3; 32])];
1405 let eth68_sizes = [
1406 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ - MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED - 1,
1407 usize::MAX,
1408 2,
1409 ];
1410
1411 let expected_request_hashes =
1412 [eth68_hashes[0], eth68_hashes[2]].into_iter().collect::<B256Set>();
1413 let expected_surplus_hashes = std::iter::once(eth68_hashes[1]).collect::<B256Set>();
1414
1415 let mut eth68_hashes_to_request = RequestTxHashes::with_capacity(3);
1416 let valid_announcement_data = TestValidAnnouncementData(
1417 eth68_hashes
1418 .into_iter()
1419 .zip(eth68_sizes)
1420 .map(|(hash, size)| (hash, Some((0u8, size))))
1421 .collect::<Vec<_>>(),
1422 );
1423
1424 let surplus_eth68_hashes =
1425 tx_fetcher.pack_request_eth68(&mut eth68_hashes_to_request, valid_announcement_data);
1426
1427 let eth68_hashes_to_request = eth68_hashes_to_request.into_iter().collect::<B256Set>();
1428 let surplus_eth68_hashes = surplus_eth68_hashes.into_iter().collect::<B256Set>();
1429
1430 assert_eq!(expected_request_hashes, eth68_hashes_to_request);
1431 assert_eq!(expected_surplus_hashes, surplus_eth68_hashes);
1432 }
1433
1434 #[test]
1435 fn pack_eth72_request_uses_metadata_size_limit() {
1436 reth_tracing::init_test_tracing();
1437
1438 let tx_fetcher = &mut TransactionFetcher::<EthNetworkPrimitives>::default();
1439
1440 let hashes =
1441 [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32]), B256::from_slice(&[3; 32])];
1442 let announced_size =
1443 DEFAULT_SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESP_ON_PACK_GET_POOLED_TRANSACTIONS_REQ;
1444 let announcement_data = hashes
1445 .into_iter()
1446 .map(|hash| (hash, Some((0u8, announced_size))))
1447 .collect::<B256Map<_>>();
1448 let valid_announcement_data = ValidAnnouncementData::from_partially_valid_data(
1449 PartiallyValidData::from_raw_data_eth72(announcement_data),
1450 );
1451
1452 let mut hashes_to_request = RequestTxHashes::with_capacity(3);
1453 let surplus_hashes =
1454 tx_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
1455
1456 assert_eq!(1, hashes_to_request.len());
1457 assert_eq!(2, surplus_hashes.len());
1458 }
1459
1460 #[tokio::test]
1461 async fn test_on_fetch_pending_hashes() {
1462 reth_tracing::init_test_tracing();
1463
1464 let tx_fetcher = &mut TransactionFetcher::default();
1465
1466 let seen_hashes = [
1470 B256::from_slice(&[1; 32]),
1471 B256::from_slice(&[2; 32]),
1472 B256::from_slice(&[3; 32]),
1473 B256::from_slice(&[4; 32]),
1474 ];
1475 let seen_eth68_hashes_sizes = [120, 158, 116];
1480
1481 let peer_1 = PeerId::new([1; 64]);
1483 let peer_2 = PeerId::new([2; 64]);
1485
1486 let (mut peer_1_data, mut peer_1_mock_session_rx) =
1490 new_mock_session(peer_1, EthVersion::Eth66);
1491 for hash in &seen_hashes {
1492 peer_1_data.seen_transactions.insert(*hash);
1493 }
1494 let (mut peer_2_data, _) = new_mock_session(peer_2, EthVersion::Eth66);
1495 for hash in &seen_hashes {
1496 peer_2_data.seen_transactions.insert(*hash);
1497 }
1498 let mut peers = HashMap::default();
1499 peers.insert(peer_1, peer_1_data);
1500 peers.insert(peer_2, peer_2_data);
1501
1502 for i in 0..3 {
1504 buffer_hash_to_tx_fetcher(
1506 tx_fetcher,
1507 seen_hashes[i],
1508 peer_2,
1509 0,
1510 Some(seen_eth68_hashes_sizes[i]),
1511 );
1512 }
1513 buffer_hash_to_tx_fetcher(tx_fetcher, seen_hashes[3], peer_2, 0, None);
1514
1515 let hash_other = B256::from_slice(&[5; 32]);
1517 buffer_hash_to_tx_fetcher(tx_fetcher, hash_other, peer_2, 0, None);
1518
1519 for hash in &seen_hashes {
1521 buffer_hash_to_tx_fetcher(tx_fetcher, *hash, peer_1, 0, None);
1522 }
1523
1524 assert_eq!(tx_fetcher.num_pending_hashes(), 5);
1526
1527 tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
1530
1531 let req = peer_1_mock_session_rx
1533 .recv()
1534 .await
1535 .expect("peer session should receive request with buffered hashes");
1536 let PeerRequest::GetPooledTransactions { request, .. } = req else { unreachable!() };
1537 let GetPooledTransactions(requested_hashes) = request;
1538
1539 assert_eq!(
1540 requested_hashes.into_iter().collect::<B256Set>(),
1541 seen_hashes.into_iter().collect::<B256Set>()
1542 )
1543 }
1544
1545 #[test]
1546 fn on_fetch_pending_hashes_rebuffers_on_disconnected_peer() {
1547 let tx_fetcher = &mut TransactionFetcher::default();
1548 let peer_1 = PeerId::new([1; 64]);
1549 let peer_2 = PeerId::new([2; 64]);
1550 let hash_1 = B256::from_slice(&[1; 32]);
1551
1552 buffer_hash_to_tx_fetcher(tx_fetcher, hash_1, peer_1, 0, Some(128));
1553 buffer_hash_to_tx_fetcher(tx_fetcher, hash_1, peer_2, 0, Some(128));
1554
1555 assert_eq!(tx_fetcher.num_pending_hashes(), 1);
1556
1557 let peers = HashMap::new();
1559 tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
1560
1561 assert_eq!(tx_fetcher.num_pending_hashes(), 1);
1563 }
1564
1565 #[test]
1566 fn verify_response_hashes() {
1567 let input = hex!(
1568 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598daa"
1569 );
1570 let signed_tx_1: PooledTransaction =
1571 TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1572 let input = hex!(
1573 "02f871018302a90f808504890aef60826b6c94ddf4c5025d1a5742cf12f74eec246d4432c295e487e09c3bbcc12b2b80c080a0f21a4eacd0bf8fea9c5105c543be5a1d8c796516875710fafafdf16d16d8ee23a001280915021bb446d1973501a67f93d2b38894a514b976e7b46dc2fe54598d76"
1574 );
1575 let signed_tx_2: PooledTransaction =
1576 TransactionSigned::decode(&mut &input[..]).unwrap().try_into().unwrap();
1577
1578 let request_hashes = [
1580 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e67890")
1581 .unwrap(),
1582 *signed_tx_1.hash(),
1583 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e12345")
1584 .unwrap(),
1585 B256::from_str("0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4edabe3")
1586 .unwrap(),
1587 ];
1588
1589 for hash in &request_hashes {
1590 assert_ne!(hash, signed_tx_2.hash())
1591 }
1592
1593 let request_hashes = RequestTxHashes::new(request_hashes.into_iter().collect());
1594
1595 let response_txns = PooledTransactions(vec![signed_tx_1.clone(), signed_tx_2]);
1597 let payload = UnverifiedPooledTransactions::new(response_txns);
1598
1599 let (outcome, verified_payload) = payload.verify(&request_hashes, &PeerId::ZERO);
1600
1601 assert_eq!(VerificationOutcome::ReportPeer, outcome);
1602 assert_eq!(1, verified_payload.len());
1603 assert!(verified_payload.contains(&signed_tx_1));
1604 }
1605}