reth_downloaders/test_utils/
bodies_client.rs

1use alloy_primitives::B256;
2use reth_ethereum_primitives::BlockBody;
3use reth_network_p2p::{
4    bodies::client::{BodiesClient, BodiesFut},
5    download::DownloadClient,
6    priority::Priority,
7};
8use reth_network_peers::PeerId;
9use std::{
10    collections::HashMap,
11    fmt::Debug,
12    ops::RangeInclusive,
13    sync::{
14        atomic::{AtomicU64, Ordering},
15        Arc,
16    },
17    time::Duration,
18};
19use tokio::sync::Mutex;
20
21/// A [`BodiesClient`] for testing.
22#[derive(Debug, Default)]
23pub struct TestBodiesClient {
24    bodies: Arc<Mutex<HashMap<B256, BlockBody>>>,
25    should_delay: bool,
26    max_batch_size: Option<usize>,
27    times_requested: AtomicU64,
28    empty_response_mod: Option<u64>,
29}
30
31impl TestBodiesClient {
32    pub(crate) fn with_bodies(mut self, bodies: HashMap<B256, BlockBody>) -> Self {
33        self.bodies = Arc::new(Mutex::new(bodies));
34        self
35    }
36
37    pub(crate) const fn with_should_delay(mut self, should_delay: bool) -> Self {
38        self.should_delay = should_delay;
39        self
40    }
41
42    /// Instructs the client to respond with empty responses some portion of the time. Every
43    /// `empty_mod` responses, the client will respond with an empty response.
44    pub(crate) const fn with_empty_responses(mut self, empty_mod: u64) -> Self {
45        self.empty_response_mod = Some(empty_mod);
46        self
47    }
48
49    pub(crate) const fn with_max_batch_size(mut self, max_batch_size: usize) -> Self {
50        self.max_batch_size = Some(max_batch_size);
51        self
52    }
53
54    pub(crate) fn times_requested(&self) -> u64 {
55        self.times_requested.load(Ordering::Relaxed)
56    }
57
58    /// Returns whether or not the client should respond with an empty response.
59    ///
60    /// This will only return true if `empty_response_mod` is `Some`, and `times_requested %
61    /// empty_response_mod == 0`.
62    pub(crate) fn should_respond_empty(&self) -> bool {
63        if let Some(empty_response_mod) = self.empty_response_mod {
64            self.times_requested.load(Ordering::Relaxed).is_multiple_of(empty_response_mod)
65        } else {
66            false
67        }
68    }
69}
70
71impl DownloadClient for TestBodiesClient {
72    fn report_bad_message(&self, _peer_id: PeerId) {
73        // noop
74    }
75
76    fn num_connected_peers(&self) -> usize {
77        0
78    }
79}
80
81impl BodiesClient for TestBodiesClient {
82    type Body = BlockBody;
83    type Output = BodiesFut;
84
85    fn get_block_bodies_with_priority_and_range_hint(
86        &self,
87        hashes: Vec<B256>,
88        _priority: Priority,
89        _range_hint: Option<RangeInclusive<u64>>,
90    ) -> Self::Output {
91        let should_delay = self.should_delay;
92        let bodies = self.bodies.clone();
93        let max_batch_size = self.max_batch_size;
94
95        self.times_requested.fetch_add(1, Ordering::Relaxed);
96        let should_respond_empty = self.should_respond_empty();
97
98        Box::pin(async move {
99            if should_respond_empty {
100                return Ok((PeerId::default(), vec![]).into())
101            }
102
103            if should_delay {
104                tokio::time::sleep(Duration::from_millis((hashes[0][0] % 100) as u64)).await;
105            }
106
107            let bodies = &mut *bodies.lock().await;
108            Ok((
109                PeerId::default(),
110                hashes
111                    .into_iter()
112                    .take(max_batch_size.unwrap_or(usize::MAX))
113                    .map(|hash| {
114                        bodies
115                            .remove(&hash)
116                            .expect("Downloader asked for a block it should not ask for")
117                    })
118                    .collect(),
119            )
120                .into())
121        })
122    }
123}