reth_downloaders/test_utils/
bodies_client.rs1use alloy_primitives::B256;
2use reth_ethereum_primitives::BlockBody;
3use reth_network_p2p::{
4 bodies::client::{BodiesClient, BodiesFut},
5 download::DownloadClient,
6 priority::Priority,
7};
8use reth_network_peers::PeerId;
9use std::{
10 collections::HashMap,
11 fmt::Debug,
12 ops::RangeInclusive,
13 sync::{
14 atomic::{AtomicU64, Ordering},
15 Arc,
16 },
17 time::Duration,
18};
19use tokio::sync::Mutex;
20
21#[derive(Debug, Default)]
23pub struct TestBodiesClient {
24 bodies: Arc<Mutex<HashMap<B256, BlockBody>>>,
25 should_delay: bool,
26 max_batch_size: Option<usize>,
27 times_requested: AtomicU64,
28 empty_response_mod: Option<u64>,
29}
30
31impl TestBodiesClient {
32 pub(crate) fn with_bodies(mut self, bodies: HashMap<B256, BlockBody>) -> Self {
33 self.bodies = Arc::new(Mutex::new(bodies));
34 self
35 }
36
37 pub(crate) const fn with_should_delay(mut self, should_delay: bool) -> Self {
38 self.should_delay = should_delay;
39 self
40 }
41
42 pub(crate) const fn with_empty_responses(mut self, empty_mod: u64) -> Self {
45 self.empty_response_mod = Some(empty_mod);
46 self
47 }
48
49 pub(crate) const fn with_max_batch_size(mut self, max_batch_size: usize) -> Self {
50 self.max_batch_size = Some(max_batch_size);
51 self
52 }
53
54 pub(crate) fn times_requested(&self) -> u64 {
55 self.times_requested.load(Ordering::Relaxed)
56 }
57
58 pub(crate) fn should_respond_empty(&self) -> bool {
63 if let Some(empty_response_mod) = self.empty_response_mod {
64 self.times_requested.load(Ordering::Relaxed).is_multiple_of(empty_response_mod)
65 } else {
66 false
67 }
68 }
69}
70
71impl DownloadClient for TestBodiesClient {
72 fn report_bad_message(&self, _peer_id: PeerId) {
73 }
75
76 fn num_connected_peers(&self) -> usize {
77 0
78 }
79}
80
81impl BodiesClient for TestBodiesClient {
82 type Body = BlockBody;
83 type Output = BodiesFut;
84
85 fn get_block_bodies_with_priority_and_range_hint(
86 &self,
87 hashes: Vec<B256>,
88 _priority: Priority,
89 _range_hint: Option<RangeInclusive<u64>>,
90 ) -> Self::Output {
91 let should_delay = self.should_delay;
92 let bodies = self.bodies.clone();
93 let max_batch_size = self.max_batch_size;
94
95 self.times_requested.fetch_add(1, Ordering::Relaxed);
96 let should_respond_empty = self.should_respond_empty();
97
98 Box::pin(async move {
99 if should_respond_empty {
100 return Ok((PeerId::default(), vec![]).into())
101 }
102
103 if should_delay {
104 tokio::time::sleep(Duration::from_millis((hashes[0][0] % 100) as u64)).await;
105 }
106
107 let bodies = &mut *bodies.lock().await;
108 Ok((
109 PeerId::default(),
110 hashes
111 .into_iter()
112 .take(max_batch_size.unwrap_or(usize::MAX))
113 .map(|hash| {
114 bodies
115 .remove(&hash)
116 .expect("Downloader asked for a block it should not ask for")
117 })
118 .collect(),
119 )
120 .into())
121 })
122 }
123}