Skip to main content

reth_downloaders/test_utils/
bodies_client.rs

1use alloy_primitives::{map::B256Map, B256};
2use reth_ethereum_primitives::BlockBody;
3use reth_network_p2p::{
4    bodies::client::{BodiesClient, BodiesFut},
5    download::DownloadClient,
6    priority::Priority,
7};
8use reth_network_peers::PeerId;
9use std::{
10    fmt::Debug,
11    ops::RangeInclusive,
12    sync::{
13        atomic::{AtomicU64, Ordering},
14        Arc,
15    },
16    time::Duration,
17};
18use tokio::sync::Mutex;
19
20/// A [`BodiesClient`] for testing.
21#[derive(Debug, Default)]
22pub struct TestBodiesClient {
23    bodies: Arc<Mutex<B256Map<BlockBody>>>,
24    should_delay: bool,
25    max_batch_size: Option<usize>,
26    times_requested: AtomicU64,
27    empty_response_mod: Option<u64>,
28}
29
30impl TestBodiesClient {
31    pub(crate) fn with_bodies(mut self, bodies: B256Map<BlockBody>) -> Self {
32        self.bodies = Arc::new(Mutex::new(bodies));
33        self
34    }
35
36    pub(crate) const fn with_should_delay(mut self, should_delay: bool) -> Self {
37        self.should_delay = should_delay;
38        self
39    }
40
41    /// Instructs the client to respond with empty responses some portion of the time. Every
42    /// `empty_mod` responses, the client will respond with an empty response.
43    pub(crate) const fn with_empty_responses(mut self, empty_mod: u64) -> Self {
44        self.empty_response_mod = Some(empty_mod);
45        self
46    }
47
48    pub(crate) const fn with_max_batch_size(mut self, max_batch_size: usize) -> Self {
49        self.max_batch_size = Some(max_batch_size);
50        self
51    }
52
53    pub(crate) fn times_requested(&self) -> u64 {
54        self.times_requested.load(Ordering::Relaxed)
55    }
56
57    /// Returns whether or not the client should respond with an empty response.
58    ///
59    /// This will only return true if `empty_response_mod` is `Some`, and `times_requested %
60    /// empty_response_mod == 0`.
61    pub(crate) fn should_respond_empty(&self) -> bool {
62        if let Some(empty_response_mod) = self.empty_response_mod {
63            self.times_requested.load(Ordering::Relaxed).is_multiple_of(empty_response_mod)
64        } else {
65            false
66        }
67    }
68}
69
70impl DownloadClient for TestBodiesClient {
71    fn report_bad_message(&self, _peer_id: PeerId) {
72        // noop
73    }
74
75    fn num_connected_peers(&self) -> usize {
76        0
77    }
78}
79
80impl BodiesClient for TestBodiesClient {
81    type Body = BlockBody;
82    type Output = BodiesFut;
83
84    fn get_block_bodies_with_priority_and_range_hint(
85        &self,
86        hashes: Vec<B256>,
87        _priority: Priority,
88        _range_hint: Option<RangeInclusive<u64>>,
89    ) -> Self::Output {
90        let should_delay = self.should_delay;
91        let bodies = self.bodies.clone();
92        let max_batch_size = self.max_batch_size;
93
94        self.times_requested.fetch_add(1, Ordering::Relaxed);
95        let should_respond_empty = self.should_respond_empty();
96
97        Box::pin(async move {
98            if should_respond_empty {
99                return Ok((PeerId::default(), vec![]).into())
100            }
101
102            if should_delay {
103                tokio::time::sleep(Duration::from_millis((hashes[0][0] % 100) as u64)).await;
104            }
105
106            let bodies = &mut *bodies.lock().await;
107            Ok((
108                PeerId::default(),
109                hashes
110                    .into_iter()
111                    .take(max_batch_size.unwrap_or(usize::MAX))
112                    .map(|hash| {
113                        bodies
114                            .remove(&hash)
115                            .expect("Downloader asked for a block it should not ask for")
116                    })
117                    .collect(),
118            )
119                .into())
120        })
121    }
122}