reth_network/
required_block_filter.rs1use alloy_primitives::B256;
7use futures::StreamExt;
8use reth_eth_wire_types::{GetBlockHeaders, HeadersDirection};
9use reth_network_api::{
10 NetworkEvent, NetworkEventListenerProvider, PeerRequest, Peers, ReputationChangeKind,
11};
12use tokio::sync::oneshot;
13use tracing::{debug, info, trace};
14
15pub struct RequiredBlockFilter<N> {
20 network: N,
22 block_hashes: Vec<B256>,
24}
25
26impl<N> RequiredBlockFilter<N>
27where
28 N: NetworkEventListenerProvider + Peers + Clone + Send + Sync + 'static,
29{
30 pub const fn new(network: N, block_hashes: Vec<B256>) -> Self {
32 Self { network, block_hashes }
33 }
34
35 pub fn spawn(self) {
40 if self.block_hashes.is_empty() {
41 debug!(target: "net::filter", "No required block hashes configured, skipping peer filtering");
42 return;
43 }
44
45 info!(target: "net::filter", "Starting required block peer filter with {} block hashes", self.block_hashes.len());
46
47 tokio::spawn(async move {
48 self.run().await;
49 });
50 }
51
52 async fn run(self) {
54 let mut event_stream = self.network.event_listener();
55
56 while let Some(event) = event_stream.next().await {
57 if let NetworkEvent::ActivePeerSession { info, messages } = event {
58 let peer_id = info.peer_id;
59 debug!(target: "net::filter", "New peer session established: {}", peer_id);
60
61 let network = self.network.clone();
63 let block_hashes = self.block_hashes.clone();
64
65 tokio::spawn(async move {
66 Self::check_peer_blocks(network, peer_id, messages, block_hashes).await;
67 });
68 }
69 }
70 }
71
72 async fn check_peer_blocks(
74 network: N,
75 peer_id: reth_network_api::PeerId,
76 messages: reth_network_api::PeerRequestSender<PeerRequest<N::Primitives>>,
77 block_hashes: Vec<B256>,
78 ) {
79 for block_hash in block_hashes {
80 trace!(target: "net::filter", "Checking if peer {} has block {}", peer_id, block_hash);
81
82 let request = GetBlockHeaders {
84 start_block: block_hash.into(),
85 limit: 1,
86 skip: 0,
87 direction: HeadersDirection::Rising,
88 };
89
90 let (tx, rx) = oneshot::channel();
91 let peer_request = PeerRequest::GetBlockHeaders { request, response: tx };
92
93 if let Err(e) = messages.try_send(peer_request) {
95 debug!(target: "net::filter", "Failed to send block header request to peer {}: {:?}", peer_id, e);
96 continue;
97 }
98
99 let response = match rx.await {
101 Ok(response) => response,
102 Err(e) => {
103 debug!(
104 target: "net::filter",
105 "Channel error getting block {} from peer {}: {:?}",
106 block_hash, peer_id, e
107 );
108 continue;
109 }
110 };
111
112 let headers = match response {
113 Ok(headers) => headers,
114 Err(e) => {
115 debug!(target: "net::filter", "Error getting block {} from peer {}: {:?}", block_hash, peer_id, e);
116 network.reputation_change(peer_id, ReputationChangeKind::BadProtocol);
118 return;
119 }
120 };
121
122 if headers.0.is_empty() {
123 info!(
124 target: "net::filter",
125 "Peer {} does not have required block {}, banning",
126 peer_id, block_hash
127 );
128 network.reputation_change(peer_id, ReputationChangeKind::BadProtocol);
129 return; }
131
132 trace!(target: "net::filter", "Peer {} has required block {}", peer_id, block_hash);
133 }
134
135 debug!(target: "net::filter", "Peer {} has all required blocks", peer_id);
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142 use alloy_primitives::{b256, B256};
143 use reth_network_api::noop::NoopNetwork;
144
145 #[test]
146 fn test_required_block_filter_creation() {
147 let network = NoopNetwork::default();
148 let block_hashes = vec![
149 b256!("0x1111111111111111111111111111111111111111111111111111111111111111"),
150 b256!("0x2222222222222222222222222222222222222222222222222222222222222222"),
151 ];
152
153 let filter = RequiredBlockFilter::new(network, block_hashes.clone());
154 assert_eq!(filter.block_hashes.len(), 2);
155 assert_eq!(filter.block_hashes, block_hashes);
156 }
157
158 #[test]
159 fn test_required_block_filter_empty_hashes_does_not_spawn() {
160 let network = NoopNetwork::default();
161 let block_hashes = vec![];
162
163 let filter = RequiredBlockFilter::new(network, block_hashes);
164 filter.spawn();
166 }
167
168 #[tokio::test]
169 async fn test_required_block_filter_with_mock_peer() {
170 let network = NoopNetwork::default();
173 let block_hashes = vec![B256::default()];
174
175 let filter = RequiredBlockFilter::new(network, block_hashes);
176 assert_eq!(filter.block_hashes.len(), 1);
178 }
179}