1use crate::{
4 cache::LruMap,
5 error::{NetworkError, ServiceKind},
6};
7use enr::Enr;
8use futures::StreamExt;
9use reth_discv4::{DiscoveryUpdate, Discv4, Discv4Config};
10use reth_discv5::{DiscoveredPeer, Discv5};
11use reth_dns_discovery::{
12 DnsDiscoveryConfig, DnsDiscoveryHandle, DnsDiscoveryService, DnsNodeRecordUpdate, DnsResolver,
13};
14use reth_ethereum_forks::{EnrForkIdEntry, ForkId};
15use reth_network_api::{DiscoveredEvent, DiscoveryEvent};
16use reth_network_peers::{NodeRecord, PeerId};
17use reth_network_types::PeerAddr;
18use secp256k1::SecretKey;
19use std::{
20 collections::VecDeque,
21 net::{IpAddr, SocketAddr},
22 pin::Pin,
23 sync::Arc,
24 task::{ready, Context, Poll},
25};
26use tokio::{sync::mpsc, task::JoinHandle};
27use tokio_stream::{wrappers::ReceiverStream, Stream};
28use tracing::{debug, trace};
29
30pub const DEFAULT_MAX_CAPACITY_DISCOVERED_PEERS_CACHE: u32 = 10_000;
34
35#[derive(Debug)]
40pub struct Discovery {
41 discovered_nodes: LruMap<PeerId, PeerAddr>,
45 local_enr: NodeRecord,
47 discv4: Option<Discv4>,
49 discv4_updates: Option<ReceiverStream<DiscoveryUpdate>>,
51 _discv4_service: Option<JoinHandle<()>>,
53 discv5: Option<Discv5>,
55 discv5_updates: Option<ReceiverStream<discv5::Event>>,
57 _dns_discovery: Option<DnsDiscoveryHandle>,
59 dns_discovery_updates: Option<ReceiverStream<DnsNodeRecordUpdate>>,
61 _dns_disc_service: Option<JoinHandle<()>>,
63 queued_events: VecDeque<DiscoveryEvent>,
65 discovery_listeners: Vec<mpsc::UnboundedSender<DiscoveryEvent>>,
67}
68
69impl Discovery {
70 pub async fn new(
75 tcp_addr: SocketAddr,
76 discovery_v4_addr: SocketAddr,
77 sk: SecretKey,
78 discv4_config: Option<Discv4Config>,
79 discv5_config: Option<reth_discv5::Config>, dns_discovery_config: Option<DnsDiscoveryConfig>,
81 ) -> Result<Self, NetworkError> {
82 let local_enr =
84 NodeRecord::from_secret_key(discovery_v4_addr, &sk).with_tcp_port(tcp_addr.port());
85
86 let discv4_future = async {
87 let Some(disc_config) = discv4_config else { return Ok((None, None, None)) };
88 let (discv4, mut discv4_service) =
89 Discv4::bind(discovery_v4_addr, local_enr, sk, disc_config).await.map_err(
90 |err| {
91 NetworkError::from_io_error(err, ServiceKind::Discovery(discovery_v4_addr))
92 },
93 )?;
94 let discv4_updates = discv4_service.update_stream();
95 let discv4_service = discv4_service.spawn();
97
98 debug!(target:"net", ?discovery_v4_addr, "started discovery v4");
99
100 Ok((Some(discv4), Some(discv4_updates), Some(discv4_service)))
101 };
102
103 let discv5_future = async {
104 let Some(config) = discv5_config else { return Ok::<_, NetworkError>((None, None)) };
105 let (discv5, discv5_updates) = Discv5::start(&sk, config).await?;
106 debug!(target:"net", discovery_v5_enr=? discv5.local_enr(), "started discovery v5");
107 Ok((Some(discv5), Some(discv5_updates.into())))
108 };
109
110 let ((discv4, discv4_updates, _discv4_service), (discv5, discv5_updates)) =
111 tokio::try_join!(discv4_future, discv5_future)?;
112
113 let (_dns_discovery, dns_discovery_updates, _dns_disc_service) =
115 if let Some(dns_config) = dns_discovery_config {
116 let (mut service, dns_disc) = DnsDiscoveryService::new_pair(
117 Arc::new(DnsResolver::from_system_conf()?),
118 dns_config,
119 );
120 let dns_discovery_updates = service.node_record_stream();
121 let dns_disc_service = service.spawn();
122 (Some(dns_disc), Some(dns_discovery_updates), Some(dns_disc_service))
123 } else {
124 (None, None, None)
125 };
126
127 Ok(Self {
128 discovery_listeners: Default::default(),
129 local_enr,
130 discv4,
131 discv4_updates,
132 _discv4_service,
133 discv5,
134 discv5_updates,
135 discovered_nodes: LruMap::new(DEFAULT_MAX_CAPACITY_DISCOVERED_PEERS_CACHE),
136 queued_events: Default::default(),
137 _dns_disc_service,
138 _dns_discovery,
139 dns_discovery_updates,
140 })
141 }
142
143 pub(crate) fn add_listener(&mut self, tx: mpsc::UnboundedSender<DiscoveryEvent>) {
145 self.discovery_listeners.push(tx);
146 }
147
148 #[inline]
150 fn notify_listeners(&mut self, event: &DiscoveryEvent) {
151 self.discovery_listeners.retain_mut(|listener| listener.send(event.clone()).is_ok());
152 }
153
154 pub(crate) fn update_fork_id(&self, fork_id: ForkId) {
156 if let Some(discv4) = &self.discv4 {
157 discv4.set_eip868_rlp(b"eth".to_vec(), EnrForkIdEntry::from(fork_id))
159 }
160 if let Some(discv5) = &self.discv5 {
161 discv5
162 .encode_and_set_eip868_in_local_enr(b"eth".to_vec(), EnrForkIdEntry::from(fork_id))
163 }
164 }
165
166 pub(crate) fn ban_ip(&self, ip: IpAddr) {
168 if let Some(discv4) = &self.discv4 {
169 discv4.ban_ip(ip)
170 }
171 if let Some(discv5) = &self.discv5 {
172 discv5.ban_ip(ip)
173 }
174 }
175
176 pub(crate) fn ban(&self, peer_id: PeerId, ip: IpAddr) {
178 if let Some(discv4) = &self.discv4 {
179 discv4.ban(peer_id, ip)
180 }
181 if let Some(discv5) = &self.discv5 {
182 discv5.ban(peer_id, ip)
183 }
184 }
185
186 pub fn discv4(&self) -> Option<Discv4> {
188 self.discv4.clone()
189 }
190
191 pub(crate) const fn local_id(&self) -> PeerId {
193 self.local_enr.id }
195
196 pub(crate) fn add_discv4_node(&self, node: NodeRecord) {
198 if let Some(discv4) = &self.discv4 {
199 discv4.add_node(node);
200 }
201 }
202
203 pub fn discv5(&self) -> Option<Discv5> {
205 self.discv5.clone()
206 }
207
208 pub(crate) fn add_discv5_node(&self, enr: Enr<SecretKey>) -> Result<(), NetworkError> {
210 if let Some(discv5) = &self.discv5 {
211 discv5.add_node(enr).map_err(NetworkError::Discv5Error)?;
212 }
213
214 Ok(())
215 }
216
217 fn on_node_record_update(&mut self, record: NodeRecord, fork_id: Option<ForkId>) {
219 let peer_id = record.id;
220 let tcp_addr = record.tcp_addr();
221 if tcp_addr.port() == 0 {
222 return
224 }
225 let udp_addr = record.udp_addr();
226 let addr = PeerAddr::new(tcp_addr, Some(udp_addr));
227 _ =
228 self.discovered_nodes.get_or_insert(peer_id, || {
229 self.queued_events.push_back(DiscoveryEvent::NewNode(
230 DiscoveredEvent::EventQueued { peer_id, addr, fork_id },
231 ));
232
233 addr
234 })
235 }
236
237 fn on_discv4_update(&mut self, update: DiscoveryUpdate) {
238 match update {
239 DiscoveryUpdate::Added(record) | DiscoveryUpdate::DiscoveredAtCapacity(record) => {
240 self.on_node_record_update(record, None);
241 }
242 DiscoveryUpdate::EnrForkId(node, fork_id) => {
243 self.queued_events.push_back(DiscoveryEvent::EnrForkId(node, fork_id))
244 }
245 DiscoveryUpdate::Removed(peer_id) => {
246 self.discovered_nodes.remove(&peer_id);
247 }
248 DiscoveryUpdate::Batch(updates) => {
249 for update in updates {
250 self.on_discv4_update(update);
251 }
252 }
253 }
254 }
255
256 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DiscoveryEvent> {
257 loop {
258 if let Some(event) = self.queued_events.pop_front() {
260 self.notify_listeners(&event);
261 return Poll::Ready(event)
262 }
263
264 while let Some(Poll::Ready(Some(update))) =
266 self.discv4_updates.as_mut().map(|updates| updates.poll_next_unpin(cx))
267 {
268 self.on_discv4_update(update)
269 }
270
271 while let Some(Poll::Ready(Some(update))) =
273 self.discv5_updates.as_mut().map(|updates| updates.poll_next_unpin(cx))
274 {
275 if let Some(discv5) = self.discv5.as_mut() &&
276 let Some(DiscoveredPeer { node_record, fork_id }) =
277 discv5.on_discv5_update(update)
278 {
279 self.on_node_record_update(node_record, fork_id);
280 }
281 }
282
283 while let Some(Poll::Ready(Some(update))) =
285 self.dns_discovery_updates.as_mut().map(|updates| updates.poll_next_unpin(cx))
286 {
287 self.add_discv4_node(update.node_record);
288 if let Err(err) = self.add_discv5_node(update.enr) {
289 trace!(target: "net::discovery",
290 %err,
291 "failed adding node discovered by dns to discv5"
292 );
293 }
294 self.on_node_record_update(update.node_record, update.fork_id);
295 }
296
297 if self.queued_events.is_empty() {
298 return Poll::Pending
299 }
300 }
301 }
302}
303
304impl Drop for Discovery {
305 fn drop(&mut self) {
306 if let Some(discv4) = &self.discv4 {
307 discv4.terminate();
308 }
309 if let Some(handle) = self._discv4_service.take() {
310 handle.abort();
311 }
312 if let Some(handle) = self._dns_disc_service.take() {
313 handle.abort();
314 }
315 }
316}
317
318impl Stream for Discovery {
319 type Item = DiscoveryEvent;
320
321 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
322 Poll::Ready(Some(ready!(self.get_mut().poll(cx))))
323 }
324}
325
326#[cfg(test)]
327impl Discovery {
328 pub(crate) fn noop() -> Self {
332 let (_discovery_listeners, _): (mpsc::UnboundedSender<DiscoveryEvent>, _) =
333 mpsc::unbounded_channel();
334
335 Self {
336 discovered_nodes: LruMap::new(0),
337 local_enr: NodeRecord {
338 address: IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
339 tcp_port: 0,
340 udp_port: 0,
341 id: PeerId::random(),
342 },
343 discv4: Default::default(),
344 discv4_updates: Default::default(),
345 discv5: None,
346 discv5_updates: None,
347 queued_events: Default::default(),
348 _discv4_service: Default::default(),
349 _dns_discovery: None,
350 dns_discovery_updates: None,
351 _dns_disc_service: None,
352 discovery_listeners: Default::default(),
353 }
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360 use secp256k1::SECP256K1;
361 use std::net::{Ipv4Addr, SocketAddrV4};
362
363 #[tokio::test(flavor = "multi_thread")]
364 async fn test_discovery_setup() {
365 let (secret_key, _) = SECP256K1.generate_keypair(&mut rand_08::thread_rng());
366 let discovery_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
367 let _discovery = Discovery::new(
368 discovery_addr,
369 discovery_addr,
370 secret_key,
371 Default::default(),
372 None,
373 Default::default(),
374 )
375 .await
376 .unwrap();
377 }
378
379 use reth_discv4::Discv4ConfigBuilder;
380 use reth_discv5::{enr::EnrCombinedKeyWrapper, enr_to_discv4_id};
381 use tracing::trace;
382
383 async fn start_discovery_node(udp_port_discv4: u16, udp_port_discv5: u16) -> Discovery {
384 let secret_key = SecretKey::new(&mut rand_08::thread_rng());
385
386 let discv4_addr = format!("127.0.0.1:{udp_port_discv4}").parse().unwrap();
387 let discv5_addr: SocketAddr = format!("127.0.0.1:{udp_port_discv5}").parse().unwrap();
388
389 let discv4_config = Discv4ConfigBuilder::default().external_ip_resolver(None).build();
391
392 let discv5_listen_config = discv5::ListenConfig::from(discv5_addr);
393 let discv5_config = reth_discv5::Config::builder(discv5_addr)
394 .discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build())
395 .build();
396
397 Discovery::new(
398 discv4_addr,
399 discv4_addr,
400 secret_key,
401 Some(discv4_config),
402 Some(discv5_config),
403 None,
404 )
405 .await
406 .expect("should build discv5 with discv4 downgrade")
407 }
408
409 #[tokio::test(flavor = "multi_thread")]
410 async fn discv5_and_discv4_same_pk() {
411 reth_tracing::init_test_tracing();
412
413 let mut node_1 = start_discovery_node(40014, 40015).await;
415 let discv4_enr_1 = node_1.discv4.as_ref().unwrap().node_record();
416 let discv5_enr_node_1 =
417 node_1.discv5.as_ref().unwrap().with_discv5(|discv5| discv5.local_enr());
418 let discv4_id_1 = discv4_enr_1.id;
419 let discv5_id_1 = discv5_enr_node_1.node_id();
420
421 let mut node_2 = start_discovery_node(40024, 40025).await;
422 let discv4_enr_2 = node_2.discv4.as_ref().unwrap().node_record();
423 let discv5_enr_node_2 =
424 node_2.discv5.as_ref().unwrap().with_discv5(|discv5| discv5.local_enr());
425 let discv4_id_2 = discv4_enr_2.id;
426 let discv5_id_2 = discv5_enr_node_2.node_id();
427
428 trace!(target: "net::discovery::tests",
429 node_1_node_id=format!("{:#}", discv5_id_1),
430 node_2_node_id=format!("{:#}", discv5_id_2),
431 "started nodes"
432 );
433
434 assert_eq!(discv4_id_1, enr_to_discv4_id(&discv5_enr_node_1).unwrap());
438 assert_eq!(discv4_id_2, enr_to_discv4_id(&discv5_enr_node_2).unwrap());
439
440 node_1.add_discv4_node(discv4_enr_2);
442
443 let event_node_1 = node_1.next().await.unwrap();
445 let event_node_2 = node_2.next().await.unwrap();
446
447 assert_eq!(
448 DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued {
449 peer_id: discv4_id_2,
450 addr: PeerAddr::new(discv4_enr_2.tcp_addr(), Some(discv4_enr_2.udp_addr())),
451 fork_id: None
452 }),
453 event_node_1
454 );
455 assert_eq!(
456 DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued {
457 peer_id: discv4_id_1,
458 addr: PeerAddr::new(discv4_enr_1.tcp_addr(), Some(discv4_enr_1.udp_addr())),
459 fork_id: None
460 }),
461 event_node_2
462 );
463
464 assert_eq!(1, node_1.discovered_nodes.len());
465 assert_eq!(1, node_2.discovered_nodes.len());
466
467 node_1.add_discv5_node(EnrCombinedKeyWrapper(discv5_enr_node_2.clone()).into()).unwrap();
469 assert!(node_1
471 .discv5
472 .as_ref()
473 .unwrap()
474 .with_discv5(|discv5| discv5.table_entries_id().contains(&discv5_id_2)));
475
476 node_1
478 .discv5
479 .as_ref()
480 .unwrap()
481 .with_discv5(|discv5| discv5.send_ping(discv5_enr_node_2.clone()))
482 .await
483 .unwrap();
484
485 assert_eq!(1, node_1.discovered_nodes.len());
488 assert_eq!(1, node_2.discovered_nodes.len());
489 }
490}