reth_downloaders/test_utils/
bodies_client.rs

1use alloy_primitives::B256;
2use reth_ethereum_primitives::BlockBody;
3use reth_network_p2p::{
4    bodies::client::{BodiesClient, BodiesFut},
5    download::DownloadClient,
6    priority::Priority,
7};
8use reth_network_peers::PeerId;
9use std::{
10    collections::HashMap,
11    fmt::Debug,
12    sync::{
13        atomic::{AtomicU64, Ordering},
14        Arc,
15    },
16    time::Duration,
17};
18use tokio::sync::Mutex;
19
20/// A [`BodiesClient`] for testing.
21#[derive(Debug, Default)]
22pub struct TestBodiesClient {
23    bodies: Arc<Mutex<HashMap<B256, BlockBody>>>,
24    should_delay: bool,
25    max_batch_size: Option<usize>,
26    times_requested: AtomicU64,
27    empty_response_mod: Option<u64>,
28}
29
30impl TestBodiesClient {
31    pub(crate) fn with_bodies(mut self, bodies: HashMap<B256, BlockBody>) -> Self {
32        self.bodies = Arc::new(Mutex::new(bodies));
33        self
34    }
35
36    pub(crate) const fn with_should_delay(mut self, should_delay: bool) -> Self {
37        self.should_delay = should_delay;
38        self
39    }
40
41    /// Instructs the client to respond with empty responses some portion of the time. Every
42    /// `empty_mod` responses, the client will respond with an empty response.
43    pub(crate) const fn with_empty_responses(mut self, empty_mod: u64) -> Self {
44        self.empty_response_mod = Some(empty_mod);
45        self
46    }
47
48    pub(crate) const fn with_max_batch_size(mut self, max_batch_size: usize) -> Self {
49        self.max_batch_size = Some(max_batch_size);
50        self
51    }
52
53    pub(crate) fn times_requested(&self) -> u64 {
54        self.times_requested.load(Ordering::Relaxed)
55    }
56
57    /// Returns whether or not the client should respond with an empty response.
58    ///
59    /// This will only return true if `empty_response_mod` is `Some`, and `times_requested %
60    /// empty_response_mod == 0`.
61    pub(crate) fn should_respond_empty(&self) -> bool {
62        if let Some(empty_response_mod) = self.empty_response_mod {
63            self.times_requested.load(Ordering::Relaxed) % empty_response_mod == 0
64        } else {
65            false
66        }
67    }
68}
69
70impl DownloadClient for TestBodiesClient {
71    fn report_bad_message(&self, _peer_id: PeerId) {
72        // noop
73    }
74
75    fn num_connected_peers(&self) -> usize {
76        0
77    }
78}
79
80impl BodiesClient for TestBodiesClient {
81    type Body = BlockBody;
82    type Output = BodiesFut;
83
84    fn get_block_bodies_with_priority(
85        &self,
86        hashes: Vec<B256>,
87        _priority: Priority,
88    ) -> Self::Output {
89        let should_delay = self.should_delay;
90        let bodies = self.bodies.clone();
91        let max_batch_size = self.max_batch_size;
92
93        self.times_requested.fetch_add(1, Ordering::Relaxed);
94        let should_respond_empty = self.should_respond_empty();
95
96        Box::pin(async move {
97            if should_respond_empty {
98                return Ok((PeerId::default(), vec![]).into())
99            }
100
101            if should_delay {
102                tokio::time::sleep(Duration::from_millis((hashes[0][0] % 100) as u64)).await;
103            }
104
105            let bodies = &mut *bodies.lock().await;
106            Ok((
107                PeerId::default(),
108                hashes
109                    .into_iter()
110                    .take(max_batch_size.unwrap_or(usize::MAX))
111                    .map(|hash| {
112                        bodies
113                            .remove(&hash)
114                            .expect("Downloader asked for a block it should not ask for")
115                    })
116                    .collect(),
117            )
118                .into())
119        })
120    }
121}