reth_network/
required_block_filter.rs

1//! Required block peer filtering implementation.
2//!
3//! This module provides functionality to filter out peers that don't have
4//! specific required blocks (primarily used for shadowfork testing).
5
6use alloy_eips::BlockNumHash;
7use futures::StreamExt;
8use reth_eth_wire_types::{GetBlockHeaders, HeadersDirection};
9use reth_network_api::{
10    NetworkEvent, NetworkEventListenerProvider, PeerRequest, Peers, ReputationChangeKind,
11};
12use tokio::sync::oneshot;
13use tracing::{debug, info, trace};
14
15/// Task that filters peers based on required block hashes.
16///
17/// This task listens for new peer sessions and checks if they have the required
18/// block hashes. Peers that don't have these blocks are banned.
19///
20/// This type is mainly used to connect peers on shadow forks (e.g. mainnet shadowfork=
21pub struct RequiredBlockFilter<N> {
22    /// Network handle for listening to events and managing peer reputation.
23    network: N,
24    /// List of block number-hash pairs that peers must have to be considered valid.
25    block_num_hashes: Vec<BlockNumHash>,
26}
27
28impl<N> RequiredBlockFilter<N>
29where
30    N: NetworkEventListenerProvider + Peers + Clone + Send + Sync + 'static,
31{
32    /// Creates a new required block peer filter.
33    pub const fn new(network: N, block_num_hashes: Vec<BlockNumHash>) -> Self {
34        Self { network, block_num_hashes }
35    }
36
37    /// Spawns the required block peer filter task.
38    ///
39    /// This task will run indefinitely, monitoring new peer sessions and filtering
40    /// out peers that don't have the required blocks.
41    pub fn spawn(self) {
42        if self.block_num_hashes.is_empty() {
43            debug!(target: "net::filter", "No required block hashes configured, skipping peer filtering");
44            return;
45        }
46
47        info!(target: "net::filter", "Starting required block peer filter with {} block hashes", self.block_num_hashes.len());
48
49        tokio::spawn(async move {
50            self.run().await;
51        });
52    }
53
54    /// Main loop for the required block peer filter.
55    async fn run(self) {
56        let mut event_stream = self.network.event_listener();
57
58        while let Some(event) = event_stream.next().await {
59            if let NetworkEvent::ActivePeerSession { info, messages } = event {
60                let peer_id = info.peer_id;
61                debug!(target: "net::filter", "New peer session established: {}", peer_id);
62
63                // Spawn a task to check this peer's blocks
64                let network = self.network.clone();
65                let block_num_hashes = self.block_num_hashes.clone();
66                let peer_block_number = info.status.latest_block.unwrap_or(0);
67
68                tokio::spawn(async move {
69                    Self::check_peer_blocks(
70                        network,
71                        peer_id,
72                        messages,
73                        block_num_hashes,
74                        peer_block_number,
75                    )
76                    .await;
77                });
78            }
79        }
80    }
81
82    /// Checks if a peer has the required blocks and bans them if not.
83    async fn check_peer_blocks(
84        network: N,
85        peer_id: reth_network_api::PeerId,
86        messages: reth_network_api::PeerRequestSender<PeerRequest<N::Primitives>>,
87        block_num_hashes: Vec<BlockNumHash>,
88        latest_peer_block: u64,
89    ) {
90        for block_num_hash in block_num_hashes {
91            // Skip if peer's block number is lower than required, peer might also be syncing and
92            // still on the same chain.
93            if block_num_hash.number > 0 && latest_peer_block <= block_num_hash.number {
94                debug!(target: "net::filter", "Skipping check for block {} - peer {} only at block {}", 
95                       block_num_hash.number, peer_id, latest_peer_block);
96                continue;
97            }
98
99            let block_hash = block_num_hash.hash;
100            trace!(target: "net::filter", "Checking if peer {} has block {}", peer_id, block_hash);
101
102            // Create a request for block headers
103            let request = GetBlockHeaders {
104                start_block: block_hash.into(),
105                limit: 1,
106                skip: 0,
107                direction: HeadersDirection::Rising,
108            };
109
110            let (tx, rx) = oneshot::channel();
111            let peer_request = PeerRequest::GetBlockHeaders { request, response: tx };
112
113            // Send the request to the peer
114            if let Err(e) = messages.try_send(peer_request) {
115                debug!(target: "net::filter", "Failed to send block header request to peer {}: {:?}", peer_id, e);
116                continue;
117            }
118
119            // Wait for the response
120            let response = match rx.await {
121                Ok(response) => response,
122                Err(e) => {
123                    debug!(
124                        target: "net::filter",
125                        "Channel error getting block {} from peer {}: {:?}",
126                        block_hash, peer_id, e
127                    );
128                    continue;
129                }
130            };
131
132            let headers = match response {
133                Ok(headers) => headers,
134                Err(e) => {
135                    debug!(target: "net::filter", "Error getting block {} from peer {}: {:?}", block_hash, peer_id, e);
136                    // Ban the peer if they fail to respond properly
137                    network.reputation_change(peer_id, ReputationChangeKind::BadProtocol);
138                    return;
139                }
140            };
141
142            if headers.0.is_empty() {
143                info!(
144                    target: "net::filter",
145                    "Peer {} does not have required block {}, banning",
146                    peer_id, block_hash
147                );
148                network.reputation_change(peer_id, ReputationChangeKind::BadProtocol);
149                return; // No need to check more blocks if one is missing
150            }
151
152            trace!(target: "net::filter", "Peer {} has required block {}", peer_id, block_hash);
153        }
154
155        debug!(target: "net::filter", "Peer {} has all required blocks", peer_id);
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162    use alloy_eips::BlockNumHash;
163    use alloy_primitives::{b256, B256};
164    use reth_network_api::noop::NoopNetwork;
165
166    #[test]
167    fn test_required_block_filter_creation() {
168        let network = NoopNetwork::default();
169        let block_num_hashes = vec![
170            BlockNumHash::new(
171                0,
172                b256!("0x1111111111111111111111111111111111111111111111111111111111111111"),
173            ),
174            BlockNumHash::new(
175                23115201,
176                b256!("0x2222222222222222222222222222222222222222222222222222222222222222"),
177            ),
178        ];
179
180        let filter = RequiredBlockFilter::new(network, block_num_hashes.clone());
181        assert_eq!(filter.block_num_hashes.len(), 2);
182        assert_eq!(filter.block_num_hashes, block_num_hashes);
183    }
184
185    #[test]
186    fn test_required_block_filter_empty_hashes_does_not_spawn() {
187        let network = NoopNetwork::default();
188        let block_num_hashes = vec![];
189
190        let filter = RequiredBlockFilter::new(network, block_num_hashes);
191        // This should not panic and should exit early when spawn is called
192        filter.spawn();
193    }
194
195    #[tokio::test]
196    async fn test_required_block_filter_with_mock_peer() {
197        // This test would require a more complex setup with mock network components
198        // For now, we ensure the basic structure is correct
199        let network = NoopNetwork::default();
200        let block_num_hashes = vec![BlockNumHash::new(0, B256::default())];
201
202        let filter = RequiredBlockFilter::new(network, block_num_hashes);
203        // Verify the filter can be created and basic properties are set
204        assert_eq!(filter.block_num_hashes.len(), 1);
205    }
206}