reth_discv5/
lib.rs

1//! Wrapper around [`discv5::Discv5`].
2
3#![doc(
4    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
10
11use std::{
12    collections::HashSet,
13    fmt,
14    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
15    sync::Arc,
16    time::Duration,
17};
18
19use ::enr::Enr;
20use alloy_primitives::bytes::Bytes;
21use discv5::ListenConfig;
22use enr::{discv4_id_to_discv5_id, EnrCombinedKeyWrapper};
23use futures::future::join_all;
24use itertools::Itertools;
25use rand::{Rng, RngCore};
26use reth_ethereum_forks::{EnrForkIdEntry, ForkId};
27use reth_network_peers::{NodeRecord, PeerId};
28use secp256k1::SecretKey;
29use tokio::{sync::mpsc, task};
30use tracing::{debug, error, trace};
31
32pub mod config;
33pub mod enr;
34pub mod error;
35pub mod filter;
36pub mod metrics;
37pub mod network_stack_id;
38
39pub use discv5::{self, IpMode};
40
41pub use config::{
42    BootNode, Config, ConfigBuilder, DEFAULT_COUNT_BOOTSTRAP_LOOKUPS, DEFAULT_DISCOVERY_V5_ADDR,
43    DEFAULT_DISCOVERY_V5_ADDR_IPV6, DEFAULT_DISCOVERY_V5_LISTEN_CONFIG, DEFAULT_DISCOVERY_V5_PORT,
44    DEFAULT_SECONDS_BOOTSTRAP_LOOKUP_INTERVAL, DEFAULT_SECONDS_LOOKUP_INTERVAL,
45};
46pub use enr::enr_to_discv4_id;
47pub use error::Error;
48pub use filter::{FilterOutcome, MustNotIncludeKeys};
49pub use network_stack_id::NetworkStackId;
50
51use metrics::{DiscoveredPeersMetrics, Discv5Metrics};
52
53/// Max kbucket index is 255.
54///
55/// This is the max log2distance for 32 byte [`NodeId`](discv5::enr::NodeId) - 1. See <https://github.com/sigp/discv5/blob/e9e0d4f93ec35591832a9a8d937b4161127da87b/src/kbucket.rs#L586-L587>.
56pub const MAX_KBUCKET_INDEX: usize = 255;
57
58/// Default lowest kbucket index to attempt filling, in periodic look up query to populate kbuckets.
59///
60/// The peer at the 0th kbucket index is at log2distance 1 from the local node ID. See <https://github.com/sigp/discv5/blob/e9e0d4f93ec35591832a9a8d937b4161127da87b/src/kbucket.rs#L586-L587>.
61///
62/// Default is 0th index.
63pub const DEFAULT_MIN_TARGET_KBUCKET_INDEX: usize = 0;
64
65/// Transparent wrapper around [`discv5::Discv5`].
66#[derive(Clone)]
67pub struct Discv5 {
68    /// sigp/discv5 node.
69    discv5: Arc<discv5::Discv5>,
70    /// [`IpMode`] of the `RLPx` network.
71    rlpx_ip_mode: IpMode,
72    /// Key used in kv-pair to ID chain, e.g. 'opstack' or 'eth'.
73    fork_key: Option<&'static [u8]>,
74    /// Filter applied to a discovered peers before passing it up to app.
75    discovered_peer_filter: MustNotIncludeKeys,
76    /// Metrics for underlying [`discv5::Discv5`] node and filtered discovered peers.
77    metrics: Discv5Metrics,
78}
79
80impl Discv5 {
81    ////////////////////////////////////////////////////////////////////////////////////////////////
82    // Minimal interface with `reth_network::discovery`
83    ////////////////////////////////////////////////////////////////////////////////////////////////
84
85    /// Adds the node to the table, if it is not already present.
86    #[expect(clippy::result_large_err)]
87    pub fn add_node(&self, node_record: Enr<SecretKey>) -> Result<(), Error> {
88        let EnrCombinedKeyWrapper(enr) = node_record.into();
89        self.discv5.add_enr(enr).map_err(Error::AddNodeFailed)
90    }
91
92    /// Sets the pair in the EIP-868 [`Enr`] of the node.
93    ///
94    /// If the key already exists, this will update it.
95    ///
96    /// CAUTION: The value **must** be rlp encoded
97    pub fn set_eip868_in_local_enr(&self, key: Vec<u8>, rlp: Bytes) {
98        let Ok(key_str) = std::str::from_utf8(&key) else {
99            error!(target: "net::discv5",
100                err="key not utf-8",
101                "failed to update local enr"
102            );
103            return
104        };
105        if let Err(err) = self.discv5.enr_insert(key_str, &rlp) {
106            error!(target: "net::discv5",
107                %err,
108                "failed to update local enr"
109            );
110        }
111    }
112
113    /// Sets the pair in the EIP-868 [`Enr`] of the node.
114    ///
115    /// If the key already exists, this will update it.
116    pub fn encode_and_set_eip868_in_local_enr(
117        &self,
118        key: Vec<u8>,
119        value: impl alloy_rlp::Encodable,
120    ) {
121        let mut buf = Vec::new();
122        value.encode(&mut buf);
123        self.set_eip868_in_local_enr(key, buf.into())
124    }
125
126    /// Adds the peer and id to the ban list.
127    ///
128    /// This will prevent any future inclusion in the table
129    pub fn ban(&self, peer_id: PeerId, ip: IpAddr) {
130        match discv4_id_to_discv5_id(peer_id) {
131            Ok(node_id) => {
132                self.discv5.ban_node(&node_id, None);
133                self.ban_ip(ip);
134            }
135            Err(err) => error!(target: "net::discv5",
136                %err,
137                "failed to ban peer"
138            ),
139        }
140    }
141
142    /// Adds the ip to the ban list.
143    ///
144    /// This will prevent any future inclusion in the table
145    pub fn ban_ip(&self, ip: IpAddr) {
146        self.discv5.ban_ip(ip, None);
147    }
148
149    /// Returns the [`NodeRecord`] of the local node.
150    ///
151    /// This includes the currently tracked external IP address of the node.
152    ///
153    /// Returns `None` if the local ENR does not contain the required fields.
154    pub fn node_record(&self) -> Option<NodeRecord> {
155        let enr: Enr<_> = EnrCombinedKeyWrapper(self.discv5.local_enr()).into();
156        enr.try_into().ok()
157    }
158
159    /// Spawns [`discv5::Discv5`]. Returns [`discv5::Discv5`] handle in reth compatible wrapper type
160    /// [`Discv5`], a receiver of [`discv5::Event`]s from the underlying node, and the local
161    /// [`Enr`](discv5::Enr) converted into the reth compatible [`NodeRecord`] type.
162    pub async fn start(
163        sk: &SecretKey,
164        discv5_config: Config,
165    ) -> Result<(Self, mpsc::Receiver<discv5::Event>, NodeRecord), Error> {
166        //
167        // 1. make local enr from listen config
168        //
169        let (enr, bc_enr, fork_key, rlpx_ip_mode) = build_local_enr(sk, &discv5_config);
170
171        trace!(target: "net::discv5",
172            ?enr,
173            "local ENR"
174        );
175
176        //
177        // 2. start discv5
178        //
179        let Config {
180            discv5_config,
181            bootstrap_nodes,
182            lookup_interval,
183            bootstrap_lookup_interval,
184            bootstrap_lookup_countdown,
185            discovered_peer_filter,
186            ..
187        } = discv5_config;
188
189        let EnrCombinedKeyWrapper(enr) = enr.into();
190        let sk = discv5::enr::CombinedKey::secp256k1_from_bytes(&mut sk.secret_bytes()).unwrap();
191        let mut discv5 = match discv5::Discv5::new(enr, sk, discv5_config) {
192            Ok(discv5) => discv5,
193            Err(err) => return Err(Error::InitFailure(err)),
194        };
195        discv5.start().await.map_err(Error::Discv5Error)?;
196
197        // start discv5 updates stream
198        let discv5_updates = discv5.event_stream().await.map_err(Error::Discv5Error)?;
199
200        let discv5 = Arc::new(discv5);
201
202        //
203        // 3. add boot nodes
204        //
205        bootstrap(bootstrap_nodes, &discv5).await?;
206
207        let metrics = Discv5Metrics::default();
208
209        //
210        // 4. start bg kbuckets maintenance
211        //
212        spawn_populate_kbuckets_bg(
213            lookup_interval,
214            bootstrap_lookup_interval,
215            bootstrap_lookup_countdown,
216            metrics.clone(),
217            discv5.clone(),
218        );
219
220        Ok((
221            Self { discv5, rlpx_ip_mode, fork_key, discovered_peer_filter, metrics },
222            discv5_updates,
223            bc_enr,
224        ))
225    }
226
227    /// Process an event from the underlying [`discv5::Discv5`] node.
228    pub fn on_discv5_update(&self, update: discv5::Event) -> Option<DiscoveredPeer> {
229        #[expect(clippy::match_same_arms)]
230        match update {
231            discv5::Event::SocketUpdated(_) | discv5::Event::TalkRequest(_) |
232            // `Discovered` not unique discovered peers
233            discv5::Event::Discovered(_) => None,
234            discv5::Event::NodeInserted { replaced: _, .. } => {
235
236                // node has been inserted into kbuckets
237
238                // `replaced` partly covers `reth_discv4::DiscoveryUpdate::Removed(_)`
239
240                self.metrics.discovered_peers.increment_kbucket_insertions(1);
241
242                None
243            }
244            discv5::Event::SessionEstablished(enr, remote_socket) => {
245                // this branch is semantically similar to branches of
246                // `reth_discv4::DiscoveryUpdate`: `DiscoveryUpdate::Added(_)` and
247                // `DiscoveryUpdate::DiscoveredAtCapacity(_)
248
249                // peer has been discovered as part of query, or, by incoming session (peer has
250                // discovered us)
251
252                self.metrics.discovered_peers.increment_established_sessions_raw(1);
253
254                self.on_discovered_peer(&enr, remote_socket)
255            }
256            discv5::Event::UnverifiableEnr {
257                enr,
258                socket,
259                node_id: _,
260            } => {
261                // this branch is semantically similar to branches of
262                // `reth_discv4::DiscoveryUpdate`: `DiscoveryUpdate::Added(_)` and
263                // `DiscoveryUpdate::DiscoveredAtCapacity(_)
264
265                // peer has been discovered as part of query, or, by an outgoing session (but peer
266                // is behind NAT and responds from a different socket)
267
268                // NOTE: `discv5::Discv5` won't initiate a session with any peer with an
269                // unverifiable node record, for example one that advertises a reserved LAN IP
270                // address on a WAN network. This is in order to prevent DoS attacks, where some
271                // malicious peers may advertise a victim's socket. We will still try and connect
272                // to them over RLPx, to be compatible with EL discv5 implementations that don't
273                // enforce this security measure.
274
275                trace!(target: "net::discv5",
276                    ?enr,
277                    %socket,
278                    "discovered unverifiable enr, source socket doesn't match socket advertised in ENR"
279                );
280
281                self.metrics.discovered_peers.increment_unverifiable_enrs_raw_total(1);
282
283                self.on_discovered_peer(&enr, socket)
284            }
285            _ => None
286        }
287    }
288
289    /// Processes a discovered peer. Returns `true` if peer is added to
290    pub fn on_discovered_peer(
291        &self,
292        enr: &discv5::Enr,
293        socket: SocketAddr,
294    ) -> Option<DiscoveredPeer> {
295        self.metrics.discovered_peers_advertised_networks.increment_once_by_network_type(enr);
296
297        let node_record = match self.try_into_reachable(enr, socket) {
298            Ok(enr_bc) => enr_bc,
299            Err(err) => {
300                trace!(target: "net::discv5",
301                    %err,
302                    ?enr,
303                    "discovered peer is unreachable"
304                );
305
306                self.metrics.discovered_peers.increment_established_sessions_unreachable_enr(1);
307
308                return None
309            }
310        };
311        if let FilterOutcome::Ignore { reason } = self.filter_discovered_peer(enr) {
312            trace!(target: "net::discv5",
313                ?enr,
314                reason,
315                "filtered out discovered peer"
316            );
317
318            self.metrics.discovered_peers.increment_established_sessions_filtered(1);
319
320            return None
321        }
322
323        // todo: extend for all network stacks in reth-network rlpx logic
324        let fork_id = (self.fork_key == Some(NetworkStackId::ETH))
325            .then(|| self.get_fork_id(enr).ok())
326            .flatten();
327
328        trace!(target: "net::discv5",
329            ?fork_id,
330            ?enr,
331            "discovered peer"
332        );
333
334        Some(DiscoveredPeer { node_record, fork_id })
335    }
336
337    /// Tries to recover an unreachable [`Enr`](discv5::Enr) received via
338    /// [`discv5::Event::UnverifiableEnr`], into a [`NodeRecord`] usable by `RLPx`.
339    ///
340    /// NOTE: Fallback solution to be compatible with Geth which includes peers into the discv5
341    /// WAN topology which, for example, advertise in their ENR that localhost is their UDP IP
342    /// address. These peers are only discovered if they initiate a connection attempt, and we by
343    /// such means learn their reachable IP address. If we receive their ENR from any other peer
344    /// as part of a lookup query, we won't find a reachable IP address on which to dial them by
345    /// reading their ENR.
346    pub fn try_into_reachable(
347        &self,
348        enr: &discv5::Enr,
349        socket: SocketAddr,
350    ) -> Result<NodeRecord, Error> {
351        // ignore UDP socket advertised in ENR, use sender socket instead
352        let address = socket.ip();
353        let udp_port = socket.port();
354
355        let id = enr_to_discv4_id(enr).ok_or(Error::IncompatibleKeyType)?;
356
357        let tcp_port = (match self.rlpx_ip_mode {
358            IpMode::Ip4 => enr.tcp4(),
359            IpMode::Ip6 => enr.tcp6(),
360            IpMode::DualStack => unimplemented!("dual-stack support not implemented for rlpx"),
361        })
362        .unwrap_or(
363            // tcp socket is missing from ENR, or is wrong IP version.
364            //
365            // by default geth runs discv5 and discv4 behind the same udp port (the discv4 default
366            // port 30303), so rlpx has a chance of successfully dialing the peer on its discv5
367            // udp port if its running geth's p2p code.
368            udp_port,
369        );
370
371        Ok(NodeRecord { address, tcp_port, udp_port, id })
372    }
373
374    /// Applies filtering rules on an ENR. Returns [`Ok`](FilterOutcome::Ok) if peer should be
375    /// passed up to app, and [`Ignore`](FilterOutcome::Ignore) if peer should instead be dropped.
376    pub fn filter_discovered_peer(&self, enr: &discv5::Enr) -> FilterOutcome {
377        self.discovered_peer_filter.filter(enr)
378    }
379
380    /// Returns the [`ForkId`] of the given [`Enr`](discv5::Enr) w.r.t. the local node's network
381    /// stack, if field is set.
382    #[expect(clippy::result_large_err)]
383    pub fn get_fork_id<K: discv5::enr::EnrKey>(
384        &self,
385        enr: &discv5::enr::Enr<K>,
386    ) -> Result<ForkId, Error> {
387        let Some(key) = self.fork_key else { return Err(Error::NetworkStackIdNotConfigured) };
388        let fork_id = enr
389            .get_decodable::<EnrForkIdEntry>(key)
390            .ok_or(Error::ForkMissing(key))?
391            .map(Into::into)?;
392
393        Ok(fork_id)
394    }
395
396    ////////////////////////////////////////////////////////////////////////////////////////////////
397    // Interface with sigp/discv5
398    ////////////////////////////////////////////////////////////////////////////////////////////////
399
400    /// Exposes API of [`discv5::Discv5`].
401    pub fn with_discv5<F, R>(&self, f: F) -> R
402    where
403        F: FnOnce(&discv5::Discv5) -> R,
404    {
405        f(&self.discv5)
406    }
407
408    ////////////////////////////////////////////////////////////////////////////////////////////////
409    // Complementary
410    ////////////////////////////////////////////////////////////////////////////////////////////////
411
412    /// Returns the `RLPx` [`IpMode`] of the local node.
413    pub const fn ip_mode(&self) -> IpMode {
414        self.rlpx_ip_mode
415    }
416
417    /// Returns the key to use to identify the [`ForkId`] kv-pair on the [`Enr`](discv5::Enr).
418    pub const fn fork_key(&self) -> Option<&[u8]> {
419        self.fork_key
420    }
421}
422
423impl fmt::Debug for Discv5 {
424    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
425        "{ .. }".fmt(f)
426    }
427}
428
429/// Result of successfully processing a peer discovered by [`discv5::Discv5`].
430#[derive(Debug)]
431pub struct DiscoveredPeer {
432    /// A discovery v4 backwards compatible ENR.
433    pub node_record: NodeRecord,
434    /// [`ForkId`] extracted from ENR w.r.t. configured
435    pub fork_id: Option<ForkId>,
436}
437
438/// Builds the local ENR with the supplied key.
439pub fn build_local_enr(
440    sk: &SecretKey,
441    config: &Config,
442) -> (Enr<SecretKey>, NodeRecord, Option<&'static [u8]>, IpMode) {
443    let mut builder = discv5::enr::Enr::builder();
444
445    let Config { discv5_config, fork, tcp_socket, other_enr_kv_pairs, .. } = config;
446
447    let socket = match discv5_config.listen_config {
448        ListenConfig::Ipv4 { ip, port } => {
449            if ip != Ipv4Addr::UNSPECIFIED {
450                builder.ip4(ip);
451            }
452            builder.udp4(port);
453            builder.tcp4(tcp_socket.port());
454
455            (ip, port).into()
456        }
457        ListenConfig::Ipv6 { ip, port } => {
458            if ip != Ipv6Addr::UNSPECIFIED {
459                builder.ip6(ip);
460            }
461            builder.udp6(port);
462            builder.tcp6(tcp_socket.port());
463
464            (ip, port).into()
465        }
466        ListenConfig::DualStack { ipv4, ipv4_port, ipv6, ipv6_port } => {
467            if ipv4 != Ipv4Addr::UNSPECIFIED {
468                builder.ip4(ipv4);
469            }
470            builder.udp4(ipv4_port);
471            builder.tcp4(tcp_socket.port());
472
473            if ipv6 != Ipv6Addr::UNSPECIFIED {
474                builder.ip6(ipv6);
475            }
476            builder.udp6(ipv6_port);
477
478            (ipv6, ipv6_port).into()
479        }
480    };
481
482    let rlpx_ip_mode = if tcp_socket.is_ipv4() { IpMode::Ip4 } else { IpMode::Ip6 };
483
484    // identifies which network node is on
485    let network_stack_id = fork.as_ref().map(|(network_stack_id, fork_value)| {
486        builder.add_value_rlp(network_stack_id, alloy_rlp::encode(fork_value).into());
487        *network_stack_id
488    });
489
490    // add other data
491    for (key, value) in other_enr_kv_pairs {
492        builder.add_value_rlp(key, value.clone().into());
493    }
494
495    // enr v4 not to get confused with discv4, independent versioning enr and
496    // discovery
497    let enr = builder.build(sk).expect("should build enr v4");
498
499    // backwards compatible enr
500    let bc_enr = NodeRecord::from_secret_key(socket, sk);
501
502    (enr, bc_enr, network_stack_id, rlpx_ip_mode)
503}
504
505/// Bootstraps underlying [`discv5::Discv5`] node with configured peers.
506pub async fn bootstrap(
507    bootstrap_nodes: HashSet<BootNode>,
508    discv5: &Arc<discv5::Discv5>,
509) -> Result<(), Error> {
510    trace!(target: "net::discv5",
511        ?bootstrap_nodes,
512        "adding bootstrap nodes .."
513    );
514
515    let mut enr_requests = vec![];
516    for node in bootstrap_nodes {
517        match node {
518            BootNode::Enr(node) => {
519                if let Err(err) = discv5.add_enr(node) {
520                    return Err(Error::AddNodeFailed(err))
521                }
522            }
523            BootNode::Enode(enode) => {
524                let discv5 = discv5.clone();
525                enr_requests.push(async move {
526                    if let Err(err) = discv5.request_enr(enode.to_string()).await {
527                        debug!(target: "net::discv5",
528                            ?enode,
529                            %err,
530                            "failed adding boot node"
531                        );
532                    }
533                })
534            }
535        }
536    }
537
538    // If a session is established, the ENR is added straight away to discv5 kbuckets
539    Ok(_ = join_all(enr_requests).await)
540}
541
542/// Backgrounds regular look up queries, in order to keep kbuckets populated.
543pub fn spawn_populate_kbuckets_bg(
544    lookup_interval: u64,
545    bootstrap_lookup_interval: u64,
546    bootstrap_lookup_countdown: u64,
547    metrics: Discv5Metrics,
548    discv5: Arc<discv5::Discv5>,
549) {
550    task::spawn({
551        let local_node_id = discv5.local_enr().node_id();
552        let lookup_interval = Duration::from_secs(lookup_interval);
553        let metrics = metrics.discovered_peers;
554        let mut kbucket_index = MAX_KBUCKET_INDEX;
555        let pulse_lookup_interval = Duration::from_secs(bootstrap_lookup_interval);
556        // todo: graceful shutdown
557
558        async move {
559            // make many fast lookup queries at bootstrap, trying to fill kbuckets at furthest
560            // log2distance from local node
561            for i in (0..bootstrap_lookup_countdown).rev() {
562                let target = discv5::enr::NodeId::random();
563
564                trace!(target: "net::discv5",
565                    %target,
566                    bootstrap_boost_runs_countdown=i,
567                    lookup_interval=format!("{:#?}", pulse_lookup_interval),
568                    "starting bootstrap boost lookup query"
569                );
570
571                lookup(target, &discv5, &metrics).await;
572
573                tokio::time::sleep(pulse_lookup_interval).await;
574            }
575
576            // initiate regular lookups to populate kbuckets
577            loop {
578                // make sure node is connected to each subtree in the network by target
579                // selection (ref kademlia)
580                let target = get_lookup_target(kbucket_index, local_node_id);
581
582                trace!(target: "net::discv5",
583                    %target,
584                    lookup_interval=format!("{:#?}", lookup_interval),
585                    "starting periodic lookup query"
586                );
587
588                lookup(target, &discv5, &metrics).await;
589
590                if kbucket_index > DEFAULT_MIN_TARGET_KBUCKET_INDEX {
591                    // try to populate bucket one step closer
592                    kbucket_index -= 1
593                } else {
594                    // start over with bucket furthest away
595                    kbucket_index = MAX_KBUCKET_INDEX
596                }
597
598                tokio::time::sleep(lookup_interval).await;
599            }
600        }
601    });
602}
603
604/// Gets the next lookup target, based on which bucket is currently being targeted.
605pub fn get_lookup_target(
606    kbucket_index: usize,
607    local_node_id: discv5::enr::NodeId,
608) -> discv5::enr::NodeId {
609    // init target
610    let mut target = local_node_id.raw();
611
612    // make sure target has a 'log2distance'-long suffix that differs from local node id
613    let bit_offset = MAX_KBUCKET_INDEX.saturating_sub(kbucket_index);
614    let (byte, bit) = (bit_offset / 8, bit_offset % 8);
615    // Flip the target bit.
616    target[byte] ^= 1 << (7 - bit);
617
618    // Randomize the bits after the target.
619    let mut rng = rand::rng();
620    // Randomize remaining bits in the byte we modified.
621    if bit < 7 {
622        // Compute the mask of the bits that need to be randomized.
623        let bits_to_randomize = 0xff >> (bit + 1);
624        // Clear.
625        target[byte] &= !bits_to_randomize;
626        // Randomize.
627        target[byte] |= rng.random::<u8>() & bits_to_randomize;
628    }
629    // Randomize remaining bytes.
630    rng.fill_bytes(&mut target[byte + 1..]);
631
632    target.into()
633}
634
635/// Runs a [`discv5::Discv5`] lookup query.
636pub async fn lookup(
637    target: discv5::enr::NodeId,
638    discv5: &discv5::Discv5,
639    metrics: &DiscoveredPeersMetrics,
640) {
641    metrics.set_total_sessions(discv5.metrics().active_sessions);
642    metrics.set_total_kbucket_peers(
643        discv5.with_kbuckets(|kbuckets| kbuckets.read().iter_ref().count()),
644    );
645
646    match discv5.find_node(target).await {
647        Err(err) => trace!(target: "net::discv5",
648            %err,
649            "lookup query failed"
650        ),
651        Ok(peers) => trace!(target: "net::discv5",
652            target=format!("{:#?}", target),
653            peers_count=peers.len(),
654            peers=format!("[{:#}]", peers.iter()
655                .map(|enr| enr.node_id()
656            ).format(", ")),
657            "peers returned by lookup query"
658        ),
659    }
660
661    // `Discv5::connected_peers` can be subset of sessions, not all peers make it
662    // into kbuckets, e.g. incoming sessions from peers with
663    // unreachable enrs
664    debug!(target: "net::discv5",
665        connected_peers=discv5.connected_peers(),
666        "connected peers in routing table"
667    );
668}
669
670#[cfg(test)]
671mod test {
672    use super::*;
673    use ::enr::{CombinedKey, EnrKey};
674    use rand_08::thread_rng;
675    use reth_chainspec::MAINNET;
676    use tracing::trace;
677
678    fn discv5_noop() -> Discv5 {
679        let sk = CombinedKey::generate_secp256k1();
680        Discv5 {
681            discv5: Arc::new(
682                discv5::Discv5::new(
683                    Enr::empty(&sk).unwrap(),
684                    sk,
685                    discv5::ConfigBuilder::new(DEFAULT_DISCOVERY_V5_LISTEN_CONFIG).build(),
686                )
687                .unwrap(),
688            ),
689            rlpx_ip_mode: IpMode::Ip4,
690            fork_key: None,
691            discovered_peer_filter: MustNotIncludeKeys::default(),
692            metrics: Discv5Metrics::default(),
693        }
694    }
695
696    async fn start_discovery_node(
697        udp_port_discv5: u16,
698    ) -> (Discv5, mpsc::Receiver<discv5::Event>, NodeRecord) {
699        let secret_key = SecretKey::new(&mut thread_rng());
700
701        let discv5_addr: SocketAddr = format!("127.0.0.1:{udp_port_discv5}").parse().unwrap();
702        let rlpx_addr: SocketAddr = "127.0.0.1:30303".parse().unwrap();
703
704        let discv5_listen_config = ListenConfig::from(discv5_addr);
705        let discv5_config = Config::builder(rlpx_addr)
706            .discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build())
707            .build();
708
709        Discv5::start(&secret_key, discv5_config).await.expect("should build discv5")
710    }
711
712    #[tokio::test(flavor = "multi_thread")]
713    async fn discv5() {
714        reth_tracing::init_test_tracing();
715
716        // rig test
717
718        // rig node_1
719        let (node_1, mut stream_1, _) = start_discovery_node(30344).await;
720        let node_1_enr = node_1.with_discv5(|discv5| discv5.local_enr());
721
722        // rig node_2
723        let (node_2, mut stream_2, _) = start_discovery_node(30355).await;
724        let node_2_enr = node_2.with_discv5(|discv5| discv5.local_enr());
725
726        trace!(target: "net::discv5::test",
727            node_1_node_id=format!("{:#}", node_1_enr.node_id()),
728            node_2_node_id=format!("{:#}", node_2_enr.node_id()),
729            "started nodes"
730        );
731
732        // test
733
734        // add node_2 to discovery handle of node_1 (should add node to discv5 kbuckets)
735        let node_2_enr_reth_compatible_ty: Enr<SecretKey> =
736            EnrCombinedKeyWrapper(node_2_enr.clone()).into();
737        node_1.add_node(node_2_enr_reth_compatible_ty).unwrap();
738
739        // verify node_2 is in KBuckets of node_1:discv5
740        assert!(
741            node_1.with_discv5(|discv5| discv5.table_entries_id().contains(&node_2_enr.node_id()))
742        );
743
744        // manually trigger connection from node_1 to node_2
745        node_1.with_discv5(|discv5| discv5.send_ping(node_2_enr.clone())).await.unwrap();
746
747        // verify node_1:discv5 is connected to node_2:discv5 and vv
748        let event_1_v5 = stream_1.recv().await.unwrap();
749
750        assert!(matches!(
751            event_1_v5,
752            discv5::Event::SessionEstablished(node, socket) if node == node_2_enr && socket == node_2_enr.udp4_socket().unwrap().into()
753        ));
754
755        // verify node_1 is in KBuckets of node_2:discv5
756        let event_2_v5 = stream_2.recv().await.unwrap();
757        assert!(matches!(
758            event_2_v5,
759            discv5::Event::NodeInserted { node_id, replaced } if node_id == node_1_enr.node_id() && replaced.is_none()
760        ));
761    }
762
763    #[test]
764    fn discovered_enr_disc_socket_missing() {
765        reth_tracing::init_test_tracing();
766
767        // rig test
768        const REMOTE_RLPX_PORT: u16 = 30303;
769        let remote_socket = "104.28.44.25:9000".parse().unwrap();
770        let remote_key = CombinedKey::generate_secp256k1();
771        let remote_enr = Enr::builder().tcp4(REMOTE_RLPX_PORT).build(&remote_key).unwrap();
772
773        let discv5 = discv5_noop();
774
775        // test
776        let filtered_peer = discv5.on_discovered_peer(&remote_enr, remote_socket);
777
778        assert_eq!(
779            NodeRecord {
780                address: remote_socket.ip(),
781                udp_port: remote_socket.port(),
782                tcp_port: REMOTE_RLPX_PORT,
783                id: enr_to_discv4_id(&remote_enr).unwrap(),
784            },
785            filtered_peer.unwrap().node_record
786        )
787    }
788
789    // Copied from sigp/discv5 with slight modification (U256 type)
790    // <https://github.com/sigp/discv5/blob/master/src/kbucket/key.rs#L89-L101>
791    #[expect(unreachable_pub)]
792    #[expect(unused)]
793    mod sigp {
794        use alloy_primitives::U256;
795        use enr::{
796            k256::sha2::digest::generic_array::{typenum::U32, GenericArray},
797            NodeId,
798        };
799
800        /// A `Key` is a cryptographic hash, identifying both the nodes participating in
801        /// the Kademlia DHT, as well as records stored in the DHT.
802        ///
803        /// The set of all `Key`s defines the Kademlia keyspace.
804        ///
805        /// `Key`s have an XOR metric as defined in the Kademlia paper, i.e. the bitwise XOR of
806        /// the hash digests, interpreted as an integer. See [`Key::distance`].
807        ///
808        /// A `Key` preserves the preimage of type `T` of the hash function. See [`Key::preimage`].
809        #[derive(Clone, Debug)]
810        pub struct Key<T> {
811            preimage: T,
812            hash: GenericArray<u8, U32>,
813        }
814
815        impl<T> PartialEq for Key<T> {
816            fn eq(&self, other: &Self) -> bool {
817                self.hash == other.hash
818            }
819        }
820
821        impl<T> Eq for Key<T> {}
822
823        impl<TPeerId> AsRef<Self> for Key<TPeerId> {
824            fn as_ref(&self) -> &Self {
825                self
826            }
827        }
828
829        impl<T> Key<T> {
830            /// Construct a new `Key` by providing the raw 32 byte hash.
831            pub const fn new_raw(preimage: T, hash: GenericArray<u8, U32>) -> Self {
832                Self { preimage, hash }
833            }
834
835            /// Borrows the preimage of the key.
836            pub const fn preimage(&self) -> &T {
837                &self.preimage
838            }
839
840            /// Converts the key into its preimage.
841            pub fn into_preimage(self) -> T {
842                self.preimage
843            }
844
845            /// Computes the distance of the keys according to the XOR metric.
846            pub fn distance<U>(&self, other: &Key<U>) -> Distance {
847                let a = U256::from_be_slice(self.hash.as_slice());
848                let b = U256::from_be_slice(other.hash.as_slice());
849                Distance(a ^ b)
850            }
851
852            // Used in the FINDNODE query outside of the k-bucket implementation.
853            /// Computes the integer log-2 distance between two keys, assuming a 256-bit
854            /// key. The output returns None if the key's are identical. The range is 1-256.
855            pub fn log2_distance<U>(&self, other: &Key<U>) -> Option<u64> {
856                let xor_dist = self.distance(other);
857                let log_dist = (256 - xor_dist.0.leading_zeros() as u64);
858                (log_dist != 0).then_some(log_dist)
859            }
860        }
861
862        impl From<NodeId> for Key<NodeId> {
863            fn from(node_id: NodeId) -> Self {
864                Self { preimage: node_id, hash: *GenericArray::from_slice(&node_id.raw()) }
865            }
866        }
867
868        /// A distance between two `Key`s.
869        #[derive(Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord, Debug)]
870        pub struct Distance(pub(super) U256);
871    }
872
873    #[test]
874    fn select_lookup_target() {
875        for bucket_index in 0..=MAX_KBUCKET_INDEX {
876            let sk = CombinedKey::generate_secp256k1();
877            let local_node_id = discv5::enr::NodeId::from(sk.public());
878            let target = get_lookup_target(bucket_index, local_node_id);
879
880            let local_node_id = sigp::Key::from(local_node_id);
881            let target = sigp::Key::from(target);
882
883            assert_eq!(local_node_id.log2_distance(&target), Some(bucket_index as u64 + 1));
884        }
885    }
886
887    #[test]
888    fn build_enr_from_config() {
889        const TCP_PORT: u16 = 30303;
890        let fork_id = MAINNET.latest_fork_id();
891
892        let config = Config::builder((Ipv4Addr::UNSPECIFIED, TCP_PORT).into())
893            .fork(NetworkStackId::ETH, fork_id)
894            .build();
895
896        let sk = SecretKey::new(&mut thread_rng());
897        let (enr, _, _, _) = build_local_enr(&sk, &config);
898
899        let decoded_fork_id = enr
900            .get_decodable::<EnrForkIdEntry>(NetworkStackId::ETH)
901            .unwrap()
902            .map(Into::into)
903            .unwrap();
904
905        assert_eq!(fork_id, decoded_fork_id);
906        assert_eq!(TCP_PORT, enr.tcp4().unwrap()); // listen config is defaulting to ip mode ipv4
907    }
908}