reth_network/
required_block_filter.rs1use alloy_eips::BlockNumHash;
7use futures::StreamExt;
8use reth_eth_wire_types::{GetBlockHeaders, HeadersDirection};
9use reth_network_api::{
10 NetworkEvent, NetworkEventListenerProvider, PeerRequest, Peers, ReputationChangeKind,
11};
12use tokio::sync::oneshot;
13use tracing::{debug, info, trace};
14
15pub struct RequiredBlockFilter<N> {
22 network: N,
24 block_num_hashes: Vec<BlockNumHash>,
26}
27
28impl<N> RequiredBlockFilter<N>
29where
30 N: NetworkEventListenerProvider + Peers + Clone + Send + Sync + 'static,
31{
32 pub const fn new(network: N, block_num_hashes: Vec<BlockNumHash>) -> Self {
34 Self { network, block_num_hashes }
35 }
36
37 pub fn spawn(self) {
42 if self.block_num_hashes.is_empty() {
43 debug!(target: "net::filter", "No required block hashes configured, skipping peer filtering");
44 return;
45 }
46
47 info!(target: "net::filter", "Starting required block peer filter with {} block hashes", self.block_num_hashes.len());
48
49 tokio::spawn(async move {
50 self.run().await;
51 });
52 }
53
54 async fn run(self) {
56 let mut event_stream = self.network.event_listener();
57
58 while let Some(event) = event_stream.next().await {
59 if let NetworkEvent::ActivePeerSession { info, messages } = event {
60 let peer_id = info.peer_id;
61 debug!(target: "net::filter", "New peer session established: {}", peer_id);
62
63 let network = self.network.clone();
65 let block_num_hashes = self.block_num_hashes.clone();
66 let peer_block_number = info.status.latest_block.unwrap_or(0);
67
68 tokio::spawn(async move {
69 Self::check_peer_blocks(
70 network,
71 peer_id,
72 messages,
73 block_num_hashes,
74 peer_block_number,
75 )
76 .await;
77 });
78 }
79 }
80 }
81
82 async fn check_peer_blocks(
84 network: N,
85 peer_id: reth_network_api::PeerId,
86 messages: reth_network_api::PeerRequestSender<PeerRequest<N::Primitives>>,
87 block_num_hashes: Vec<BlockNumHash>,
88 latest_peer_block: u64,
89 ) {
90 for block_num_hash in block_num_hashes {
91 if block_num_hash.number > 0 && latest_peer_block <= block_num_hash.number {
94 debug!(target: "net::filter", "Skipping check for block {} - peer {} only at block {}",
95 block_num_hash.number, peer_id, latest_peer_block);
96 continue;
97 }
98
99 let block_hash = block_num_hash.hash;
100 trace!(target: "net::filter", "Checking if peer {} has block {}", peer_id, block_hash);
101
102 let request = GetBlockHeaders {
104 start_block: block_hash.into(),
105 limit: 1,
106 skip: 0,
107 direction: HeadersDirection::Rising,
108 };
109
110 let (tx, rx) = oneshot::channel();
111 let peer_request = PeerRequest::GetBlockHeaders { request, response: tx };
112
113 if let Err(e) = messages.try_send(peer_request) {
115 debug!(target: "net::filter", "Failed to send block header request to peer {}: {:?}", peer_id, e);
116 continue;
117 }
118
119 let response = match rx.await {
121 Ok(response) => response,
122 Err(e) => {
123 debug!(
124 target: "net::filter",
125 "Channel error getting block {} from peer {}: {:?}",
126 block_hash, peer_id, e
127 );
128 continue;
129 }
130 };
131
132 let headers = match response {
133 Ok(headers) => headers,
134 Err(e) => {
135 debug!(target: "net::filter", "Error getting block {} from peer {}: {:?}", block_hash, peer_id, e);
136 network.reputation_change(peer_id, ReputationChangeKind::BadProtocol);
138 return;
139 }
140 };
141
142 if headers.0.is_empty() {
143 info!(
144 target: "net::filter",
145 "Peer {} does not have required block {}, banning",
146 peer_id, block_hash
147 );
148 network.reputation_change(peer_id, ReputationChangeKind::BadProtocol);
149 return; }
151
152 trace!(target: "net::filter", "Peer {} has required block {}", peer_id, block_hash);
153 }
154
155 debug!(target: "net::filter", "Peer {} has all required blocks", peer_id);
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162 use alloy_eips::BlockNumHash;
163 use alloy_primitives::{b256, B256};
164 use reth_network_api::noop::NoopNetwork;
165
166 #[test]
167 fn test_required_block_filter_creation() {
168 let network = NoopNetwork::default();
169 let block_num_hashes = vec![
170 BlockNumHash::new(
171 0,
172 b256!("0x1111111111111111111111111111111111111111111111111111111111111111"),
173 ),
174 BlockNumHash::new(
175 23115201,
176 b256!("0x2222222222222222222222222222222222222222222222222222222222222222"),
177 ),
178 ];
179
180 let filter = RequiredBlockFilter::new(network, block_num_hashes.clone());
181 assert_eq!(filter.block_num_hashes.len(), 2);
182 assert_eq!(filter.block_num_hashes, block_num_hashes);
183 }
184
185 #[test]
186 fn test_required_block_filter_empty_hashes_does_not_spawn() {
187 let network = NoopNetwork::default();
188 let block_num_hashes = vec![];
189
190 let filter = RequiredBlockFilter::new(network, block_num_hashes);
191 filter.spawn();
193 }
194
195 #[tokio::test]
196 async fn test_required_block_filter_with_mock_peer() {
197 let network = NoopNetwork::default();
200 let block_num_hashes = vec![BlockNumHash::new(0, B256::default())];
201
202 let filter = RequiredBlockFilter::new(network, block_num_hashes);
203 assert_eq!(filter.block_num_hashes.len(), 1);
205 }
206}