1#![doc(
4 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10
11use std::{
12 collections::HashSet,
13 fmt,
14 net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
15 sync::Arc,
16 time::Duration,
17};
18
19use ::enr::Enr;
20use alloy_primitives::bytes::Bytes;
21use enr::{discv4_id_to_discv5_id, EnrCombinedKeyWrapper};
22use futures::future::join_all;
23use itertools::Itertools;
24use rand::{Rng, RngCore};
25use reth_ethereum_forks::{EnrForkIdEntry, ForkId};
26use reth_network_peers::{NodeRecord, PeerId};
27use secp256k1::SecretKey;
28use tokio::{sync::mpsc, task};
29use tracing::{debug, error, trace};
30
31pub mod config;
32pub mod enr;
33pub mod error;
34pub mod filter;
35pub mod metrics;
36pub mod network_stack_id;
37
38pub use discv5::{self, IpMode};
39
40pub use config::{
41 BootNode, Config, ConfigBuilder, DEFAULT_COUNT_BOOTSTRAP_LOOKUPS, DEFAULT_DISCOVERY_V5_ADDR,
42 DEFAULT_DISCOVERY_V5_ADDR_IPV6, DEFAULT_DISCOVERY_V5_LISTEN_CONFIG, DEFAULT_DISCOVERY_V5_PORT,
43 DEFAULT_SECONDS_BOOTSTRAP_LOOKUP_INTERVAL, DEFAULT_SECONDS_LOOKUP_INTERVAL,
44};
45pub use enr::enr_to_discv4_id;
46pub use error::Error;
47pub use filter::{FilterOutcome, MustNotIncludeKeys};
48pub use network_stack_id::NetworkStackId;
49
50use metrics::{DiscoveredPeersMetrics, Discv5Metrics};
51
52pub const MAX_KBUCKET_INDEX: usize = 255;
56
57pub const DEFAULT_MIN_TARGET_KBUCKET_INDEX: usize = 0;
63
64#[derive(Clone)]
66pub struct Discv5 {
67 discv5: Arc<discv5::Discv5>,
69 rlpx_ip_mode: IpMode,
71 fork_key: Option<&'static [u8]>,
73 discovered_peer_filter: MustNotIncludeKeys,
75 metrics: Discv5Metrics,
77 local_node_record: NodeRecord,
82}
83
84impl Discv5 {
85 pub fn add_node(&self, node_record: Enr<SecretKey>) -> Result<(), Error> {
91 let EnrCombinedKeyWrapper(enr) = node_record.into();
92 self.discv5.add_enr(enr).map_err(Error::AddNodeFailed)
93 }
94
95 pub fn set_eip868_in_local_enr(&self, key: Vec<u8>, rlp: Bytes) {
101 let Ok(key_str) = std::str::from_utf8(&key) else {
102 error!(target: "net::discv5",
103 err="key not utf-8",
104 "failed to update local enr"
105 );
106 return
107 };
108 if let Err(err) = self.discv5.enr_insert(key_str, &rlp) {
109 error!(target: "net::discv5",
110 %err,
111 "failed to update local enr"
112 );
113 }
114 }
115
116 pub fn encode_and_set_eip868_in_local_enr(
120 &self,
121 key: Vec<u8>,
122 value: impl alloy_rlp::Encodable,
123 ) {
124 let mut buf = Vec::new();
125 value.encode(&mut buf);
126 self.set_eip868_in_local_enr(key, buf.into())
127 }
128
129 pub fn ban(&self, peer_id: PeerId, ip: IpAddr) {
133 match discv4_id_to_discv5_id(peer_id) {
134 Ok(node_id) => {
135 self.discv5.ban_node(&node_id, None);
136 self.ban_ip(ip);
137 }
138 Err(err) => error!(target: "net::discv5",
139 %err,
140 "failed to ban peer"
141 ),
142 }
143 }
144
145 pub fn ban_ip(&self, ip: IpAddr) {
149 self.discv5.ban_ip(ip, None);
150 }
151
152 pub fn node_record(&self) -> Option<NodeRecord> {
158 let enr: Enr<_> = EnrCombinedKeyWrapper(self.discv5.local_enr()).into();
159 enr.try_into().ok()
160 }
161
162 pub fn local_enr(&self) -> Enr<discv5::enr::CombinedKey> {
164 self.discv5.local_enr()
165 }
166
167 pub const fn local_port(&self) -> u16 {
169 self.local_node_record.udp_port
170 }
171
172 pub async fn start(
176 sk: &SecretKey,
177 discv5_config: Config,
178 ) -> Result<(Self, mpsc::Receiver<discv5::Event>), Error> {
179 let (enr, local_node_record, fork_key, rlpx_ip_mode) = build_local_enr(sk, &discv5_config);
183
184 trace!(target: "net::discv5", ?enr, "local ENR");
185
186 let Config {
190 discv5_config,
191 bootstrap_nodes,
192 lookup_interval,
193 bootstrap_lookup_interval,
194 bootstrap_lookup_countdown,
195 discovered_peer_filter,
196 ..
197 } = discv5_config;
198
199 let EnrCombinedKeyWrapper(enr) = enr.into();
200 let sk = discv5::enr::CombinedKey::secp256k1_from_bytes(&mut sk.secret_bytes()).unwrap();
201 let mut discv5 = match discv5::Discv5::new(enr, sk, discv5_config) {
202 Ok(discv5) => discv5,
203 Err(err) => return Err(Error::InitFailure(err)),
204 };
205 discv5.start().await.map_err(Error::Discv5Error)?;
206
207 let discv5_updates = discv5.event_stream().await.map_err(Error::Discv5Error)?;
209
210 let discv5 = Arc::new(discv5);
211
212 bootstrap(bootstrap_nodes, &discv5).await?;
216
217 let metrics = Discv5Metrics::default();
218
219 spawn_populate_kbuckets_bg(
223 lookup_interval,
224 bootstrap_lookup_interval,
225 bootstrap_lookup_countdown,
226 metrics.clone(),
227 Arc::downgrade(&discv5),
228 );
229
230 Ok((
231 Self {
232 discv5,
233 rlpx_ip_mode,
234 fork_key,
235 discovered_peer_filter,
236 metrics,
237 local_node_record,
238 },
239 discv5_updates,
240 ))
241 }
242
243 pub fn on_discv5_update(&self, update: discv5::Event) -> Option<DiscoveredPeer> {
245 #[expect(clippy::match_same_arms)]
246 match update {
247 discv5::Event::SocketUpdated(_) | discv5::Event::TalkRequest(_) |
248 discv5::Event::Discovered(_) |
250 discv5::Event::UnrecognizedFrame(_) => None,
252 discv5::Event::NodeInserted { .. } => {
253
254 self.metrics.discovered_peers.increment_kbucket_insertions(1);
259
260 None
261 }
262 discv5::Event::SessionEstablished(enr, remote_socket) => {
263 self.metrics.discovered_peers.increment_established_sessions_raw(1);
271
272 self.on_discovered_peer(&enr, remote_socket)
273 }
274 discv5::Event::UnverifiableEnr {
275 enr,
276 socket,
277 node_id: _,
278 } => {
279 trace!(target: "net::discv5",
294 ?enr,
295 %socket,
296 "discovered unverifiable enr, source socket doesn't match socket advertised in ENR"
297 );
298
299 self.metrics.discovered_peers.increment_unverifiable_enrs_raw_total(1);
300
301 self.on_discovered_peer(&enr, socket)
302 }
303 _ => None
304 }
305 }
306
307 pub fn on_discovered_peer(
309 &self,
310 enr: &discv5::Enr,
311 socket: SocketAddr,
312 ) -> Option<DiscoveredPeer> {
313 self.metrics.discovered_peers_advertised_networks.increment_once_by_network_type(enr);
314
315 let node_record = match self.try_into_reachable(enr, socket) {
316 Ok(enr_bc) => enr_bc,
317 Err(err) => {
318 trace!(target: "net::discv5",
319 %err,
320 ?enr,
321 "discovered peer is unreachable"
322 );
323
324 self.metrics.discovered_peers.increment_established_sessions_unreachable_enr(1);
325
326 return None
327 }
328 };
329 if let FilterOutcome::Ignore { reason } = self.filter_discovered_peer(enr) {
330 trace!(target: "net::discv5",
331 ?enr,
332 reason,
333 "filtered out discovered peer"
334 );
335
336 self.metrics.discovered_peers.increment_established_sessions_filtered(1);
337
338 return None
339 }
340
341 let fork_id = self.get_fork_id(enr).ok();
342
343 trace!(target: "net::discv5",
344 ?fork_id,
345 ?enr,
346 "discovered peer"
347 );
348
349 Some(DiscoveredPeer { node_record, fork_id })
350 }
351
352 pub fn try_into_reachable(
362 &self,
363 enr: &discv5::Enr,
364 socket: SocketAddr,
365 ) -> Result<NodeRecord, Error> {
366 let address = socket.ip();
368 let udp_port = socket.port();
369
370 let id = enr_to_discv4_id(enr).ok_or(Error::IncompatibleKeyType)?;
371
372 let tcp_port = (match self.rlpx_ip_mode {
373 IpMode::Ip4 => enr.tcp4(),
374 IpMode::Ip6 => enr.tcp6(),
375 IpMode::DualStack => unimplemented!("dual-stack support not implemented for rlpx"),
376 })
377 .unwrap_or(
378 udp_port,
384 );
385
386 Ok(NodeRecord { address, tcp_port, udp_port, id })
387 }
388
389 pub fn filter_discovered_peer(&self, enr: &discv5::Enr) -> FilterOutcome {
392 self.discovered_peer_filter.filter(enr)
393 }
394
395 pub fn get_fork_id<K: discv5::enr::EnrKey>(
398 &self,
399 enr: &discv5::enr::Enr<K>,
400 ) -> Result<ForkId, Error> {
401 let Some(key) = self.fork_key else { return Err(Error::NetworkStackIdNotConfigured) };
402 let fork_id = enr
403 .get_decodable::<EnrForkIdEntry>(key)
404 .or_else(|| {
405 (key != NetworkStackId::ETH)
406 .then(|| {
407 trace!(target: "net::discv5",
409 key = %String::from_utf8_lossy(key),
410 "Fork id not found for key, trying 'eth'..."
411 );
412 enr.get_decodable::<EnrForkIdEntry>(NetworkStackId::ETH)
413 })
414 .flatten()
415 })
416 .ok_or({
417 trace!(target: "net::discv5", "Fork id not found for 'eth' network stack id");
418 Error::ForkMissing(key)
419 })?
420 .map(Into::into)?;
421
422 Ok(fork_id)
423 }
424
425 pub fn with_discv5<F, R>(&self, f: F) -> R
431 where
432 F: FnOnce(&discv5::Discv5) -> R,
433 {
434 f(&self.discv5)
435 }
436
437 pub const fn ip_mode(&self) -> IpMode {
443 self.rlpx_ip_mode
444 }
445
446 pub const fn fork_key(&self) -> Option<&[u8]> {
448 self.fork_key
449 }
450}
451
452impl fmt::Debug for Discv5 {
453 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
454 "{ .. }".fmt(f)
455 }
456}
457
458#[derive(Debug)]
460pub struct DiscoveredPeer {
461 pub node_record: NodeRecord,
463 pub fork_id: Option<ForkId>,
465}
466
467pub fn build_local_enr(
469 sk: &SecretKey,
470 config: &Config,
471) -> (Enr<SecretKey>, NodeRecord, Option<&'static [u8]>, IpMode) {
472 let mut builder = discv5::enr::Enr::builder();
473
474 let Config { discv5_config, fork, tcp_socket, advertised_ip, other_enr_kv_pairs, .. } = config;
475
476 let socket = {
477 let v4 = crate::config::ipv4(&discv5_config.listen_config);
478 let v6 = crate::config::ipv6(&discv5_config.listen_config);
479
480 if let Some(addr) = v4 {
483 if let Some(IpAddr::V4(ip)) = advertised_ip {
484 builder.ip4(*ip);
485 } else if *addr.ip() != Ipv4Addr::UNSPECIFIED {
486 builder.ip4(*addr.ip());
487 }
488 builder.udp4(addr.port());
489 }
490 if let Some(addr) = v6 {
491 if let Some(IpAddr::V6(ip)) = advertised_ip {
492 builder.ip6(*ip);
493 } else if *addr.ip() != Ipv6Addr::UNSPECIFIED {
494 builder.ip6(*addr.ip());
495 }
496 builder.udp6(addr.port());
497 }
498 if v4.is_some() {
500 builder.tcp4(tcp_socket.port());
501 } else if v6.is_some() {
502 builder.tcp6(tcp_socket.port());
503 }
504
505 v6.map(SocketAddr::V6)
507 .or_else(|| v4.map(SocketAddr::V4))
508 .unwrap_or_else(|| SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0)))
509 };
510
511 let rlpx_ip_mode = if tcp_socket.is_ipv4() { IpMode::Ip4 } else { IpMode::Ip6 };
512
513 let network_stack_id = fork.as_ref().map(|(network_stack_id, fork_value)| {
515 builder.add_value_rlp(network_stack_id, alloy_rlp::encode(fork_value).into());
516 *network_stack_id
517 });
518
519 for (key, value) in other_enr_kv_pairs {
521 builder.add_value_rlp(key, value.clone().into());
522 }
523
524 let enr = builder.build(sk).expect("should build enr v4");
527
528 let bc_enr = NodeRecord::from_secret_key(socket, sk);
530
531 (enr, bc_enr, network_stack_id, rlpx_ip_mode)
532}
533
534pub async fn bootstrap(
536 bootstrap_nodes: HashSet<BootNode>,
537 discv5: &Arc<discv5::Discv5>,
538) -> Result<(), Error> {
539 trace!(target: "net::discv5",
540 ?bootstrap_nodes,
541 "adding bootstrap nodes .."
542 );
543
544 let mut enr_requests = vec![];
545 for node in bootstrap_nodes {
546 match node {
547 BootNode::Enr(node) => {
548 if let Err(err) = discv5.add_enr(node) {
549 return Err(Error::AddNodeFailed(err))
550 }
551 }
552 BootNode::Enode(enode) => {
553 let discv5 = discv5.clone();
554 enr_requests.push(async move {
555 if let Err(err) = discv5.request_enr(enode.to_string()).await {
556 debug!(target: "net::discv5",
557 ?enode,
558 %err,
559 "failed adding boot node"
560 );
561 }
562 })
563 }
564 }
565 }
566
567 Ok(_ = join_all(enr_requests).await)
569}
570
571pub fn spawn_populate_kbuckets_bg(
573 lookup_interval: u64,
574 bootstrap_lookup_interval: u64,
575 bootstrap_lookup_countdown: u64,
576 metrics: Discv5Metrics,
577 discv5: std::sync::Weak<discv5::Discv5>,
578) {
579 let lookup_interval = Duration::from_secs(lookup_interval);
580 let metrics = metrics.discovered_peers;
581 let mut kbucket_index = MAX_KBUCKET_INDEX;
582 let pulse_lookup_interval = Duration::from_secs(bootstrap_lookup_interval);
583 task::spawn(async move {
584 let Some(discv5_handle) = discv5.upgrade() else {
585 return;
586 };
587 let local_node_id = discv5_handle.local_enr().node_id();
588 drop(discv5_handle);
589
590 for i in (0..bootstrap_lookup_countdown).rev() {
593 let target = discv5::enr::NodeId::random();
594
595 trace!(target: "net::discv5",
596 %target,
597 bootstrap_boost_runs_countdown=i,
598 lookup_interval=format!("{:#?}", pulse_lookup_interval),
599 "starting bootstrap boost lookup query"
600 );
601
602 {
603 let Some(discv5_handle) = discv5.upgrade() else {
604 return;
605 };
606 lookup(target, &discv5_handle, &metrics).await;
607 }
608
609 tokio::time::sleep(pulse_lookup_interval).await;
610 }
611
612 loop {
614 let target = get_lookup_target(kbucket_index, local_node_id);
617
618 trace!(target: "net::discv5",
619 %target,
620 lookup_interval=format!("{:#?}", lookup_interval),
621 "starting periodic lookup query"
622 );
623
624 {
625 let Some(discv5_handle) = discv5.upgrade() else {
626 return;
627 };
628 lookup(target, &discv5_handle, &metrics).await;
629 }
630
631 if kbucket_index > DEFAULT_MIN_TARGET_KBUCKET_INDEX {
632 kbucket_index -= 1
634 } else {
635 kbucket_index = MAX_KBUCKET_INDEX
637 }
638
639 tokio::time::sleep(lookup_interval).await;
640 }
641 });
642}
643
644pub fn get_lookup_target(
646 kbucket_index: usize,
647 local_node_id: discv5::enr::NodeId,
648) -> discv5::enr::NodeId {
649 let mut target = local_node_id.raw();
651
652 let bit_offset = MAX_KBUCKET_INDEX.saturating_sub(kbucket_index);
654 let (byte, bit) = (bit_offset / 8, bit_offset % 8);
655 target[byte] ^= 1 << (7 - bit);
657
658 let mut rng = rand::rng();
660 if bit < 7 {
662 let bits_to_randomize = 0xff >> (bit + 1);
664 target[byte] &= !bits_to_randomize;
666 target[byte] |= rng.random::<u8>() & bits_to_randomize;
668 }
669 rng.fill_bytes(&mut target[byte + 1..]);
671
672 target.into()
673}
674
675pub async fn lookup(
677 target: discv5::enr::NodeId,
678 discv5: &discv5::Discv5,
679 metrics: &DiscoveredPeersMetrics,
680) {
681 metrics.set_total_sessions(discv5.metrics().active_sessions);
682 metrics.set_total_kbucket_peers(
683 discv5.with_kbuckets(|kbuckets| kbuckets.read().iter_ref().count()),
684 );
685
686 match discv5.find_node(target).await {
687 Err(err) => trace!(target: "net::discv5",
688 %err,
689 "lookup query failed"
690 ),
691 Ok(peers) => trace!(target: "net::discv5",
692 target=format!("{:#?}", target),
693 peers_count=peers.len(),
694 peers=format!("[{:#}]", peers.iter()
695 .map(|enr| enr.node_id()
696 ).format(", ")),
697 "peers returned by lookup query"
698 ),
699 }
700
701 debug!(target: "net::discv5",
705 connected_peers=discv5.connected_peers(),
706 "connected peers in routing table"
707 );
708}
709
710#[cfg(test)]
711mod test {
712 #![allow(deprecated)]
713 use super::*;
714 use ::enr::{CombinedKey, EnrKey};
715 use discv5::ListenConfig;
716 use rand_08::thread_rng;
717 use reth_chainspec::MAINNET;
718 use std::{
719 net::UdpSocket,
720 time::{Duration, Instant},
721 };
722 use tracing::trace;
723
724 fn discv5_noop() -> Discv5 {
725 let sk = CombinedKey::generate_secp256k1();
726 Discv5 {
727 discv5: Arc::new(
728 discv5::Discv5::new(
729 Enr::empty(&sk).unwrap(),
730 sk,
731 discv5::ConfigBuilder::new(DEFAULT_DISCOVERY_V5_LISTEN_CONFIG).build(),
732 )
733 .unwrap(),
734 ),
735 rlpx_ip_mode: IpMode::Ip4,
736 fork_key: None,
737 discovered_peer_filter: MustNotIncludeKeys::default(),
738 metrics: Discv5Metrics::default(),
739 local_node_record: NodeRecord::new(
740 (Ipv4Addr::LOCALHOST, 30303).into(),
741 PeerId::random(),
742 ),
743 }
744 }
745
746 async fn start_discovery_node(udp_port_discv5: u16) -> (Discv5, mpsc::Receiver<discv5::Event>) {
747 let secret_key = SecretKey::new(&mut thread_rng());
748
749 let discv5_addr: SocketAddr = format!("127.0.0.1:{udp_port_discv5}").parse().unwrap();
750 let rlpx_addr: SocketAddr = "127.0.0.1:30303".parse().unwrap();
751
752 let discv5_listen_config = ListenConfig::from(discv5_addr);
753 let discv5_config = Config::builder(rlpx_addr)
754 .discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build())
755 .build();
756
757 Discv5::start(&secret_key, discv5_config).await.expect("should build discv5")
758 }
759
760 async fn start_discovery_node_with_key(
761 secret_key: &SecretKey,
762 udp_port_discv5: u16,
763 ) -> Result<(Discv5, mpsc::Receiver<discv5::Event>), Error> {
764 let discv5_addr: SocketAddr = format!("127.0.0.1:{udp_port_discv5}").parse().unwrap();
765 let rlpx_addr: SocketAddr = "127.0.0.1:30303".parse().unwrap();
766
767 let discv5_listen_config = ListenConfig::from(discv5_addr);
768 let discv5_config = Config::builder(rlpx_addr)
769 .discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build())
770 .build();
771
772 Discv5::start(secret_key, discv5_config).await
773 }
774
775 fn unused_udp_port() -> u16 {
776 UdpSocket::bind("127.0.0.1:0").unwrap().local_addr().unwrap().port()
777 }
778
779 async fn wait_for_udp_port_release(port: u16, timeout: Duration) {
780 let deadline = Instant::now() + timeout;
781
782 loop {
783 match UdpSocket::bind(("127.0.0.1", port)) {
784 Ok(socket) => {
785 drop(socket);
786 return;
787 }
788 Err(err) if Instant::now() < deadline => {
789 trace!(target: "net::discv5::test", %port, %err, "waiting for discv5 port release");
790 tokio::time::sleep(Duration::from_millis(10)).await;
791 }
792 Err(err) => panic!("discv5 did not release port {port} before timeout: {err}"),
793 }
794 }
795 }
796
797 #[tokio::test(flavor = "multi_thread")]
798 async fn discv5_releases_port_on_drop() {
799 reth_tracing::init_test_tracing();
800
801 let secret_key = SecretKey::new(&mut thread_rng());
802 let port = unused_udp_port();
803
804 let (node, updates) =
805 start_discovery_node_with_key(&secret_key, port).await.expect("should start discv5");
806 drop(updates);
807 drop(node);
808
809 wait_for_udp_port_release(port, Duration::from_secs(1)).await;
810
811 let restarted = start_discovery_node_with_key(&secret_key, port).await;
812 assert!(restarted.is_ok(), "discv5 failed to rebind dropped port: {restarted:?}");
813 }
814
815 #[tokio::test(flavor = "multi_thread")]
816 async fn discv5() {
817 reth_tracing::init_test_tracing();
818
819 let (node_1, mut stream_1) = start_discovery_node(30344).await;
823 let node_1_enr = node_1.with_discv5(|discv5| discv5.local_enr());
824
825 let (node_2, mut stream_2) = start_discovery_node(30355).await;
827 let node_2_enr = node_2.with_discv5(|discv5| discv5.local_enr());
828
829 trace!(target: "net::discv5::test",
830 node_1_node_id=format!("{:#}", node_1_enr.node_id()),
831 node_2_node_id=format!("{:#}", node_2_enr.node_id()),
832 "started nodes"
833 );
834
835 let node_2_enr_reth_compatible_ty: Enr<SecretKey> =
839 EnrCombinedKeyWrapper(node_2_enr.clone()).into();
840 node_1.add_node(node_2_enr_reth_compatible_ty).unwrap();
841
842 assert!(
844 node_1.with_discv5(|discv5| discv5.table_entries_id().contains(&node_2_enr.node_id()))
845 );
846
847 node_1.with_discv5(|discv5| discv5.send_ping(node_2_enr.clone())).await.unwrap();
849
850 let event_1_v5 = stream_1.recv().await.unwrap();
852
853 assert!(matches!(
854 event_1_v5,
855 discv5::Event::SessionEstablished(node, socket) if node == node_2_enr && socket == node_2_enr.udp4_socket().unwrap().into()
856 ));
857
858 let event_2_v5 = stream_2.recv().await.unwrap();
860 assert!(matches!(
861 event_2_v5,
862 discv5::Event::NodeInserted { node_id, replaced } if node_id == node_1_enr.node_id() && replaced.is_none()
863 ));
864 }
865
866 #[test]
867 fn discovered_enr_disc_socket_missing() {
868 reth_tracing::init_test_tracing();
869
870 const REMOTE_RLPX_PORT: u16 = 30303;
872 let remote_socket = "104.28.44.25:9000".parse().unwrap();
873 let remote_key = CombinedKey::generate_secp256k1();
874 let remote_enr = Enr::builder().tcp4(REMOTE_RLPX_PORT).build(&remote_key).unwrap();
875
876 let discv5 = discv5_noop();
877
878 let filtered_peer = discv5.on_discovered_peer(&remote_enr, remote_socket);
880
881 assert_eq!(
882 NodeRecord {
883 address: remote_socket.ip(),
884 udp_port: remote_socket.port(),
885 tcp_port: REMOTE_RLPX_PORT,
886 id: enr_to_discv4_id(&remote_enr).unwrap(),
887 },
888 filtered_peer.unwrap().node_record
889 )
890 }
891
892 #[expect(unreachable_pub)]
895 #[expect(unused)]
896 mod sigp {
897 use alloy_primitives::U256;
898 use enr::{
899 k256::sha2::digest::generic_array::{typenum::U32, GenericArray},
900 NodeId,
901 };
902
903 #[derive(Clone, Debug)]
913 pub struct Key<T> {
914 preimage: T,
915 hash: GenericArray<u8, U32>,
916 }
917
918 impl<T> PartialEq for Key<T> {
919 fn eq(&self, other: &Self) -> bool {
920 self.hash == other.hash
921 }
922 }
923
924 impl<T> Eq for Key<T> {}
925
926 impl<TPeerId> AsRef<Self> for Key<TPeerId> {
927 fn as_ref(&self) -> &Self {
928 self
929 }
930 }
931
932 impl<T> Key<T> {
933 pub const fn new_raw(preimage: T, hash: GenericArray<u8, U32>) -> Self {
935 Self { preimage, hash }
936 }
937
938 pub const fn preimage(&self) -> &T {
940 &self.preimage
941 }
942
943 pub fn into_preimage(self) -> T {
945 self.preimage
946 }
947
948 pub fn distance<U>(&self, other: &Key<U>) -> Distance {
950 let a = U256::from_be_slice(self.hash.as_slice());
951 let b = U256::from_be_slice(other.hash.as_slice());
952 Distance(a ^ b)
953 }
954
955 pub fn log2_distance<U>(&self, other: &Key<U>) -> Option<u64> {
959 let xor_dist = self.distance(other);
960 let log_dist = (256 - xor_dist.0.leading_zeros() as u64);
961 (log_dist != 0).then_some(log_dist)
962 }
963 }
964
965 impl From<NodeId> for Key<NodeId> {
966 fn from(node_id: NodeId) -> Self {
967 Self { preimage: node_id, hash: *GenericArray::from_slice(&node_id.raw()) }
968 }
969 }
970
971 #[derive(Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord, Debug)]
973 pub struct Distance(pub(super) U256);
974 }
975
976 #[test]
977 fn select_lookup_target() {
978 for bucket_index in 0..=MAX_KBUCKET_INDEX {
979 let sk = CombinedKey::generate_secp256k1();
980 let local_node_id = discv5::enr::NodeId::from(sk.public());
981 let target = get_lookup_target(bucket_index, local_node_id);
982
983 let local_node_id = sigp::Key::from(local_node_id);
984 let target = sigp::Key::from(target);
985
986 assert_eq!(local_node_id.log2_distance(&target), Some(bucket_index as u64 + 1));
987 }
988 }
989
990 #[test]
991 fn build_enr_from_config() {
992 const TCP_PORT: u16 = 30303;
993 let fork_id = MAINNET.latest_fork_id();
994
995 let config = Config::builder((Ipv4Addr::UNSPECIFIED, TCP_PORT).into())
996 .fork(NetworkStackId::ETH, fork_id)
997 .build();
998
999 let sk = SecretKey::new(&mut thread_rng());
1000 let (enr, _, _, _) = build_local_enr(&sk, &config);
1001
1002 let decoded_fork_id = enr
1003 .get_decodable::<EnrForkIdEntry>(NetworkStackId::ETH)
1004 .unwrap()
1005 .map(Into::into)
1006 .unwrap();
1007
1008 assert_eq!(fork_id, decoded_fork_id);
1009 assert_eq!(TCP_PORT, enr.tcp4().unwrap()); }
1011
1012 #[test]
1013 fn get_fork_id_with_different_network_stack_ids() {
1014 let fork_id = MAINNET.latest_fork_id();
1015 let sk = SecretKey::new(&mut thread_rng());
1016
1017 let enr_with_opel = Enr::builder()
1019 .add_value_rlp(
1020 NetworkStackId::OPEL,
1021 alloy_rlp::encode(EnrForkIdEntry::from(fork_id)).into(),
1022 )
1023 .build(&sk)
1024 .unwrap();
1025
1026 let mut discv5 = discv5_noop();
1027 discv5.fork_key = Some(NetworkStackId::OPEL);
1028 assert_eq!(discv5.get_fork_id(&enr_with_opel).unwrap(), fork_id);
1029
1030 let enr_with_eth = Enr::builder()
1032 .add_value_rlp(
1033 NetworkStackId::ETH,
1034 alloy_rlp::encode(EnrForkIdEntry::from(fork_id)).into(),
1035 )
1036 .build(&sk)
1037 .unwrap();
1038
1039 discv5.fork_key = Some(NetworkStackId::OPEL);
1040 assert_eq!(discv5.get_fork_id(&enr_with_eth).unwrap(), fork_id);
1041
1042 let enr_without_network_stack_id = Enr::empty(&sk).unwrap();
1044 discv5.fork_key = Some(NetworkStackId::OPEL);
1045 assert!(matches!(
1046 discv5.get_fork_id(&enr_without_network_stack_id),
1047 Err(Error::ForkMissing(NetworkStackId::OPEL))
1048 ));
1049
1050 let discv5 = discv5_noop();
1052 assert!(matches!(
1053 discv5.get_fork_id(&enr_without_network_stack_id),
1054 Err(Error::NetworkStackIdNotConfigured)
1055 ));
1056 }
1057}