reth_network/
required_block_filter.rs

1//! Required block peer filtering implementation.
2//!
3//! This module provides functionality to filter out peers that don't have
4//! specific required blocks (primarily used for shadowfork testing).
5
6use alloy_primitives::B256;
7use futures::StreamExt;
8use reth_eth_wire_types::{GetBlockHeaders, HeadersDirection};
9use reth_network_api::{
10    NetworkEvent, NetworkEventListenerProvider, PeerRequest, Peers, ReputationChangeKind,
11};
12use tokio::sync::oneshot;
13use tracing::{debug, info, trace};
14
15/// Task that filters peers based on required block hashes.
16///
17/// This task listens for new peer sessions and checks if they have the required
18/// block hashes. Peers that don't have these blocks are banned.
19pub struct RequiredBlockFilter<N> {
20    /// Network handle for listening to events and managing peer reputation.
21    network: N,
22    /// List of block hashes that peers must have to be considered valid.
23    block_hashes: Vec<B256>,
24}
25
26impl<N> RequiredBlockFilter<N>
27where
28    N: NetworkEventListenerProvider + Peers + Clone + Send + Sync + 'static,
29{
30    /// Creates a new required block peer filter.
31    pub const fn new(network: N, block_hashes: Vec<B256>) -> Self {
32        Self { network, block_hashes }
33    }
34
35    /// Spawns the required block peer filter task.
36    ///
37    /// This task will run indefinitely, monitoring new peer sessions and filtering
38    /// out peers that don't have the required blocks.
39    pub fn spawn(self) {
40        if self.block_hashes.is_empty() {
41            debug!(target: "net::filter", "No required block hashes configured, skipping peer filtering");
42            return;
43        }
44
45        info!(target: "net::filter", "Starting required block peer filter with {} block hashes", self.block_hashes.len());
46
47        tokio::spawn(async move {
48            self.run().await;
49        });
50    }
51
52    /// Main loop for the required block peer filter.
53    async fn run(self) {
54        let mut event_stream = self.network.event_listener();
55
56        while let Some(event) = event_stream.next().await {
57            if let NetworkEvent::ActivePeerSession { info, messages } = event {
58                let peer_id = info.peer_id;
59                debug!(target: "net::filter", "New peer session established: {}", peer_id);
60
61                // Spawn a task to check this peer's blocks
62                let network = self.network.clone();
63                let block_hashes = self.block_hashes.clone();
64
65                tokio::spawn(async move {
66                    Self::check_peer_blocks(network, peer_id, messages, block_hashes).await;
67                });
68            }
69        }
70    }
71
72    /// Checks if a peer has the required blocks and bans them if not.
73    async fn check_peer_blocks(
74        network: N,
75        peer_id: reth_network_api::PeerId,
76        messages: reth_network_api::PeerRequestSender<PeerRequest<N::Primitives>>,
77        block_hashes: Vec<B256>,
78    ) {
79        for block_hash in block_hashes {
80            trace!(target: "net::filter", "Checking if peer {} has block {}", peer_id, block_hash);
81
82            // Create a request for block headers
83            let request = GetBlockHeaders {
84                start_block: block_hash.into(),
85                limit: 1,
86                skip: 0,
87                direction: HeadersDirection::Rising,
88            };
89
90            let (tx, rx) = oneshot::channel();
91            let peer_request = PeerRequest::GetBlockHeaders { request, response: tx };
92
93            // Send the request to the peer
94            if let Err(e) = messages.try_send(peer_request) {
95                debug!(target: "net::filter", "Failed to send block header request to peer {}: {:?}", peer_id, e);
96                continue;
97            }
98
99            // Wait for the response
100            let response = match rx.await {
101                Ok(response) => response,
102                Err(e) => {
103                    debug!(
104                        target: "net::filter",
105                        "Channel error getting block {} from peer {}: {:?}",
106                        block_hash, peer_id, e
107                    );
108                    continue;
109                }
110            };
111
112            let headers = match response {
113                Ok(headers) => headers,
114                Err(e) => {
115                    debug!(target: "net::filter", "Error getting block {} from peer {}: {:?}", block_hash, peer_id, e);
116                    // Ban the peer if they fail to respond properly
117                    network.reputation_change(peer_id, ReputationChangeKind::BadProtocol);
118                    return;
119                }
120            };
121
122            if headers.0.is_empty() {
123                info!(
124                    target: "net::filter",
125                    "Peer {} does not have required block {}, banning",
126                    peer_id, block_hash
127                );
128                network.reputation_change(peer_id, ReputationChangeKind::BadProtocol);
129                return; // No need to check more blocks if one is missing
130            }
131
132            trace!(target: "net::filter", "Peer {} has required block {}", peer_id, block_hash);
133        }
134
135        debug!(target: "net::filter", "Peer {} has all required blocks", peer_id);
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142    use alloy_primitives::{b256, B256};
143    use reth_network_api::noop::NoopNetwork;
144
145    #[test]
146    fn test_required_block_filter_creation() {
147        let network = NoopNetwork::default();
148        let block_hashes = vec![
149            b256!("0x1111111111111111111111111111111111111111111111111111111111111111"),
150            b256!("0x2222222222222222222222222222222222222222222222222222222222222222"),
151        ];
152
153        let filter = RequiredBlockFilter::new(network, block_hashes.clone());
154        assert_eq!(filter.block_hashes.len(), 2);
155        assert_eq!(filter.block_hashes, block_hashes);
156    }
157
158    #[test]
159    fn test_required_block_filter_empty_hashes_does_not_spawn() {
160        let network = NoopNetwork::default();
161        let block_hashes = vec![];
162
163        let filter = RequiredBlockFilter::new(network, block_hashes);
164        // This should not panic and should exit early when spawn is called
165        filter.spawn();
166    }
167
168    #[tokio::test]
169    async fn test_required_block_filter_with_mock_peer() {
170        // This test would require a more complex setup with mock network components
171        // For now, we ensure the basic structure is correct
172        let network = NoopNetwork::default();
173        let block_hashes = vec![B256::default()];
174
175        let filter = RequiredBlockFilter::new(network, block_hashes);
176        // Verify the filter can be created and basic properties are set
177        assert_eq!(filter.block_hashes.len(), 1);
178    }
179}