reth_downloaders/test_utils/
bodies_client.rs1use alloy_primitives::{map::B256Map, B256};
2use reth_ethereum_primitives::BlockBody;
3use reth_network_p2p::{
4 bodies::client::{BodiesClient, BodiesFut},
5 download::DownloadClient,
6 priority::Priority,
7};
8use reth_network_peers::PeerId;
9use std::{
10 fmt::Debug,
11 ops::RangeInclusive,
12 sync::{
13 atomic::{AtomicU64, Ordering},
14 Arc,
15 },
16 time::Duration,
17};
18use tokio::sync::Mutex;
19
20#[derive(Debug, Default)]
22pub struct TestBodiesClient {
23 bodies: Arc<Mutex<B256Map<BlockBody>>>,
24 should_delay: bool,
25 max_batch_size: Option<usize>,
26 times_requested: AtomicU64,
27 empty_response_mod: Option<u64>,
28}
29
30impl TestBodiesClient {
31 pub(crate) fn with_bodies(mut self, bodies: B256Map<BlockBody>) -> Self {
32 self.bodies = Arc::new(Mutex::new(bodies));
33 self
34 }
35
36 pub(crate) const fn with_should_delay(mut self, should_delay: bool) -> Self {
37 self.should_delay = should_delay;
38 self
39 }
40
41 pub(crate) const fn with_empty_responses(mut self, empty_mod: u64) -> Self {
44 self.empty_response_mod = Some(empty_mod);
45 self
46 }
47
48 pub(crate) const fn with_max_batch_size(mut self, max_batch_size: usize) -> Self {
49 self.max_batch_size = Some(max_batch_size);
50 self
51 }
52
53 pub(crate) fn times_requested(&self) -> u64 {
54 self.times_requested.load(Ordering::Relaxed)
55 }
56
57 pub(crate) fn should_respond_empty(&self) -> bool {
62 if let Some(empty_response_mod) = self.empty_response_mod {
63 self.times_requested.load(Ordering::Relaxed).is_multiple_of(empty_response_mod)
64 } else {
65 false
66 }
67 }
68}
69
70impl DownloadClient for TestBodiesClient {
71 fn report_bad_message(&self, _peer_id: PeerId) {
72 }
74
75 fn num_connected_peers(&self) -> usize {
76 0
77 }
78}
79
80impl BodiesClient for TestBodiesClient {
81 type Body = BlockBody;
82 type Output = BodiesFut;
83
84 fn get_block_bodies_with_priority_and_range_hint(
85 &self,
86 hashes: Vec<B256>,
87 _priority: Priority,
88 _range_hint: Option<RangeInclusive<u64>>,
89 ) -> Self::Output {
90 let should_delay = self.should_delay;
91 let bodies = self.bodies.clone();
92 let max_batch_size = self.max_batch_size;
93
94 self.times_requested.fetch_add(1, Ordering::Relaxed);
95 let should_respond_empty = self.should_respond_empty();
96
97 Box::pin(async move {
98 if should_respond_empty {
99 return Ok((PeerId::default(), vec![]).into())
100 }
101
102 if should_delay {
103 tokio::time::sleep(Duration::from_millis((hashes[0][0] % 100) as u64)).await;
104 }
105
106 let bodies = &mut *bodies.lock().await;
107 Ok((
108 PeerId::default(),
109 hashes
110 .into_iter()
111 .take(max_batch_size.unwrap_or(usize::MAX))
112 .map(|hash| {
113 bodies
114 .remove(&hash)
115 .expect("Downloader asked for a block it should not ask for")
116 })
117 .collect(),
118 )
119 .into())
120 })
121 }
122}