reth_downloaders/test_utils/
bodies_client.rs
1use alloy_primitives::B256;
2use reth_ethereum_primitives::BlockBody;
3use reth_network_p2p::{
4 bodies::client::{BodiesClient, BodiesFut},
5 download::DownloadClient,
6 priority::Priority,
7};
8use reth_network_peers::PeerId;
9use std::{
10 collections::HashMap,
11 fmt::Debug,
12 sync::{
13 atomic::{AtomicU64, Ordering},
14 Arc,
15 },
16 time::Duration,
17};
18use tokio::sync::Mutex;
19
20#[derive(Debug, Default)]
22pub struct TestBodiesClient {
23 bodies: Arc<Mutex<HashMap<B256, BlockBody>>>,
24 should_delay: bool,
25 max_batch_size: Option<usize>,
26 times_requested: AtomicU64,
27 empty_response_mod: Option<u64>,
28}
29
30impl TestBodiesClient {
31 pub(crate) fn with_bodies(mut self, bodies: HashMap<B256, BlockBody>) -> Self {
32 self.bodies = Arc::new(Mutex::new(bodies));
33 self
34 }
35
36 pub(crate) const fn with_should_delay(mut self, should_delay: bool) -> Self {
37 self.should_delay = should_delay;
38 self
39 }
40
41 pub(crate) const fn with_empty_responses(mut self, empty_mod: u64) -> Self {
44 self.empty_response_mod = Some(empty_mod);
45 self
46 }
47
48 pub(crate) const fn with_max_batch_size(mut self, max_batch_size: usize) -> Self {
49 self.max_batch_size = Some(max_batch_size);
50 self
51 }
52
53 pub(crate) fn times_requested(&self) -> u64 {
54 self.times_requested.load(Ordering::Relaxed)
55 }
56
57 pub(crate) fn should_respond_empty(&self) -> bool {
62 if let Some(empty_response_mod) = self.empty_response_mod {
63 self.times_requested.load(Ordering::Relaxed) % empty_response_mod == 0
64 } else {
65 false
66 }
67 }
68}
69
70impl DownloadClient for TestBodiesClient {
71 fn report_bad_message(&self, _peer_id: PeerId) {
72 }
74
75 fn num_connected_peers(&self) -> usize {
76 0
77 }
78}
79
80impl BodiesClient for TestBodiesClient {
81 type Body = BlockBody;
82 type Output = BodiesFut;
83
84 fn get_block_bodies_with_priority(
85 &self,
86 hashes: Vec<B256>,
87 _priority: Priority,
88 ) -> Self::Output {
89 let should_delay = self.should_delay;
90 let bodies = self.bodies.clone();
91 let max_batch_size = self.max_batch_size;
92
93 self.times_requested.fetch_add(1, Ordering::Relaxed);
94 let should_respond_empty = self.should_respond_empty();
95
96 Box::pin(async move {
97 if should_respond_empty {
98 return Ok((PeerId::default(), vec![]).into())
99 }
100
101 if should_delay {
102 tokio::time::sleep(Duration::from_millis((hashes[0][0] % 100) as u64)).await;
103 }
104
105 let bodies = &mut *bodies.lock().await;
106 Ok((
107 PeerId::default(),
108 hashes
109 .into_iter()
110 .take(max_batch_size.unwrap_or(usize::MAX))
111 .map(|hash| {
112 bodies
113 .remove(&hash)
114 .expect("Downloader asked for a block it should not ask for")
115 })
116 .collect(),
117 )
118 .into())
119 })
120 }
121}