reth_discv5/
lib.rs

1//! Wrapper around [`discv5::Discv5`].
2
3#![doc(
4    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10
11use std::{
12    collections::HashSet,
13    fmt,
14    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
15    sync::Arc,
16    time::Duration,
17};
18
19use ::enr::Enr;
20use alloy_primitives::bytes::Bytes;
21use discv5::ListenConfig;
22use enr::{discv4_id_to_discv5_id, EnrCombinedKeyWrapper};
23use futures::future::join_all;
24use itertools::Itertools;
25use rand::{Rng, RngCore};
26use reth_ethereum_forks::{EnrForkIdEntry, ForkId};
27use reth_network_peers::{NodeRecord, PeerId};
28use secp256k1::SecretKey;
29use tokio::{sync::mpsc, task};
30use tracing::{debug, error, trace};
31
32pub mod config;
33pub mod enr;
34pub mod error;
35pub mod filter;
36pub mod metrics;
37pub mod network_stack_id;
38
39pub use discv5::{self, IpMode};
40
41pub use config::{
42    BootNode, Config, ConfigBuilder, DEFAULT_COUNT_BOOTSTRAP_LOOKUPS, DEFAULT_DISCOVERY_V5_ADDR,
43    DEFAULT_DISCOVERY_V5_ADDR_IPV6, DEFAULT_DISCOVERY_V5_LISTEN_CONFIG, DEFAULT_DISCOVERY_V5_PORT,
44    DEFAULT_SECONDS_BOOTSTRAP_LOOKUP_INTERVAL, DEFAULT_SECONDS_LOOKUP_INTERVAL,
45};
46pub use enr::enr_to_discv4_id;
47pub use error::Error;
48pub use filter::{FilterOutcome, MustNotIncludeKeys};
49pub use network_stack_id::NetworkStackId;
50
51use metrics::{DiscoveredPeersMetrics, Discv5Metrics};
52
53/// Max kbucket index is 255.
54///
55/// This is the max log2distance for 32 byte [`NodeId`](discv5::enr::NodeId) - 1. See <https://github.com/sigp/discv5/blob/e9e0d4f93ec35591832a9a8d937b4161127da87b/src/kbucket.rs#L586-L587>.
56pub const MAX_KBUCKET_INDEX: usize = 255;
57
58/// Default lowest kbucket index to attempt filling, in periodic look up query to populate kbuckets.
59///
60/// The peer at the 0th kbucket index is at log2distance 1 from the local node ID. See <https://github.com/sigp/discv5/blob/e9e0d4f93ec35591832a9a8d937b4161127da87b/src/kbucket.rs#L586-L587>.
61///
62/// Default is 0th index.
63pub const DEFAULT_MIN_TARGET_KBUCKET_INDEX: usize = 0;
64
65/// Transparent wrapper around [`discv5::Discv5`].
66#[derive(Clone)]
67pub struct Discv5 {
68    /// sigp/discv5 node.
69    discv5: Arc<discv5::Discv5>,
70    /// [`IpMode`] of the `RLPx` network.
71    rlpx_ip_mode: IpMode,
72    /// Key used in kv-pair to ID chain, e.g. 'opstack' or 'eth'.
73    fork_key: Option<&'static [u8]>,
74    /// Filter applied to a discovered peers before passing it up to app.
75    discovered_peer_filter: MustNotIncludeKeys,
76    /// Metrics for underlying [`discv5::Discv5`] node and filtered discovered peers.
77    metrics: Discv5Metrics,
78}
79
80impl Discv5 {
81    ////////////////////////////////////////////////////////////////////////////////////////////////
82    // Minimal interface with `reth_network::discovery`
83    ////////////////////////////////////////////////////////////////////////////////////////////////
84
85    /// Adds the node to the table, if it is not already present.
86    pub fn add_node(&self, node_record: Enr<SecretKey>) -> Result<(), Error> {
87        let EnrCombinedKeyWrapper(enr) = node_record.into();
88        self.discv5.add_enr(enr).map_err(Error::AddNodeFailed)
89    }
90
91    /// Sets the pair in the EIP-868 [`Enr`] of the node.
92    ///
93    /// If the key already exists, this will update it.
94    ///
95    /// CAUTION: The value **must** be rlp encoded
96    pub fn set_eip868_in_local_enr(&self, key: Vec<u8>, rlp: Bytes) {
97        let Ok(key_str) = std::str::from_utf8(&key) else {
98            error!(target: "net::discv5",
99                err="key not utf-8",
100                "failed to update local enr"
101            );
102            return
103        };
104        if let Err(err) = self.discv5.enr_insert(key_str, &rlp) {
105            error!(target: "net::discv5",
106                %err,
107                "failed to update local enr"
108            );
109        }
110    }
111
112    /// Sets the pair in the EIP-868 [`Enr`] of the node.
113    ///
114    /// If the key already exists, this will update it.
115    pub fn encode_and_set_eip868_in_local_enr(
116        &self,
117        key: Vec<u8>,
118        value: impl alloy_rlp::Encodable,
119    ) {
120        let mut buf = Vec::new();
121        value.encode(&mut buf);
122        self.set_eip868_in_local_enr(key, buf.into())
123    }
124
125    /// Adds the peer and id to the ban list.
126    ///
127    /// This will prevent any future inclusion in the table
128    pub fn ban(&self, peer_id: PeerId, ip: IpAddr) {
129        match discv4_id_to_discv5_id(peer_id) {
130            Ok(node_id) => {
131                self.discv5.ban_node(&node_id, None);
132                self.ban_ip(ip);
133            }
134            Err(err) => error!(target: "net::discv5",
135                %err,
136                "failed to ban peer"
137            ),
138        }
139    }
140
141    /// Adds the ip to the ban list.
142    ///
143    /// This will prevent any future inclusion in the table
144    pub fn ban_ip(&self, ip: IpAddr) {
145        self.discv5.ban_ip(ip, None);
146    }
147
148    /// Returns the [`NodeRecord`] of the local node.
149    ///
150    /// This includes the currently tracked external IP address of the node.
151    ///
152    /// Returns `None` if the local ENR does not contain the required fields.
153    pub fn node_record(&self) -> Option<NodeRecord> {
154        let enr: Enr<_> = EnrCombinedKeyWrapper(self.discv5.local_enr()).into();
155        enr.try_into().ok()
156    }
157
158    /// Spawns [`discv5::Discv5`]. Returns [`discv5::Discv5`] handle in reth compatible wrapper type
159    /// [`Discv5`], a receiver of [`discv5::Event`]s from the underlying node, and the local
160    /// [`Enr`](discv5::Enr) converted into the reth compatible [`NodeRecord`] type.
161    pub async fn start(
162        sk: &SecretKey,
163        discv5_config: Config,
164    ) -> Result<(Self, mpsc::Receiver<discv5::Event>, NodeRecord), Error> {
165        //
166        // 1. make local enr from listen config
167        //
168        let (enr, bc_enr, fork_key, rlpx_ip_mode) = build_local_enr(sk, &discv5_config);
169
170        trace!(target: "net::discv5",
171            ?enr,
172            "local ENR"
173        );
174
175        //
176        // 2. start discv5
177        //
178        let Config {
179            discv5_config,
180            bootstrap_nodes,
181            lookup_interval,
182            bootstrap_lookup_interval,
183            bootstrap_lookup_countdown,
184            discovered_peer_filter,
185            ..
186        } = discv5_config;
187
188        let EnrCombinedKeyWrapper(enr) = enr.into();
189        let sk = discv5::enr::CombinedKey::secp256k1_from_bytes(&mut sk.secret_bytes()).unwrap();
190        let mut discv5 = match discv5::Discv5::new(enr, sk, discv5_config) {
191            Ok(discv5) => discv5,
192            Err(err) => return Err(Error::InitFailure(err)),
193        };
194        discv5.start().await.map_err(Error::Discv5Error)?;
195
196        // start discv5 updates stream
197        let discv5_updates = discv5.event_stream().await.map_err(Error::Discv5Error)?;
198
199        let discv5 = Arc::new(discv5);
200
201        //
202        // 3. add boot nodes
203        //
204        bootstrap(bootstrap_nodes, &discv5).await?;
205
206        let metrics = Discv5Metrics::default();
207
208        //
209        // 4. start bg kbuckets maintenance
210        //
211        spawn_populate_kbuckets_bg(
212            lookup_interval,
213            bootstrap_lookup_interval,
214            bootstrap_lookup_countdown,
215            metrics.clone(),
216            discv5.clone(),
217        );
218
219        Ok((
220            Self { discv5, rlpx_ip_mode, fork_key, discovered_peer_filter, metrics },
221            discv5_updates,
222            bc_enr,
223        ))
224    }
225
226    /// Process an event from the underlying [`discv5::Discv5`] node.
227    pub fn on_discv5_update(&self, update: discv5::Event) -> Option<DiscoveredPeer> {
228        #[expect(clippy::match_same_arms)]
229        match update {
230            discv5::Event::SocketUpdated(_) | discv5::Event::TalkRequest(_) |
231            // `Discovered` not unique discovered peers
232            discv5::Event::Discovered(_) => None,
233            discv5::Event::NodeInserted { replaced: _, .. } => {
234
235                // node has been inserted into kbuckets
236
237                // `replaced` partly covers `reth_discv4::DiscoveryUpdate::Removed(_)`
238
239                self.metrics.discovered_peers.increment_kbucket_insertions(1);
240
241                None
242            }
243            discv5::Event::SessionEstablished(enr, remote_socket) => {
244                // this branch is semantically similar to branches of
245                // `reth_discv4::DiscoveryUpdate`: `DiscoveryUpdate::Added(_)` and
246                // `DiscoveryUpdate::DiscoveredAtCapacity(_)
247
248                // peer has been discovered as part of query, or, by incoming session (peer has
249                // discovered us)
250
251                self.metrics.discovered_peers.increment_established_sessions_raw(1);
252
253                self.on_discovered_peer(&enr, remote_socket)
254            }
255            discv5::Event::UnverifiableEnr {
256                enr,
257                socket,
258                node_id: _,
259            } => {
260                // this branch is semantically similar to branches of
261                // `reth_discv4::DiscoveryUpdate`: `DiscoveryUpdate::Added(_)` and
262                // `DiscoveryUpdate::DiscoveredAtCapacity(_)
263
264                // peer has been discovered as part of query, or, by an outgoing session (but peer
265                // is behind NAT and responds from a different socket)
266
267                // NOTE: `discv5::Discv5` won't initiate a session with any peer with an
268                // unverifiable node record, for example one that advertises a reserved LAN IP
269                // address on a WAN network. This is in order to prevent DoS attacks, where some
270                // malicious peers may advertise a victim's socket. We will still try and connect
271                // to them over RLPx, to be compatible with EL discv5 implementations that don't
272                // enforce this security measure.
273
274                trace!(target: "net::discv5",
275                    ?enr,
276                    %socket,
277                    "discovered unverifiable enr, source socket doesn't match socket advertised in ENR"
278                );
279
280                self.metrics.discovered_peers.increment_unverifiable_enrs_raw_total(1);
281
282                self.on_discovered_peer(&enr, socket)
283            }
284            _ => None
285        }
286    }
287
288    /// Processes a discovered peer. Returns `true` if peer is added to
289    pub fn on_discovered_peer(
290        &self,
291        enr: &discv5::Enr,
292        socket: SocketAddr,
293    ) -> Option<DiscoveredPeer> {
294        self.metrics.discovered_peers_advertised_networks.increment_once_by_network_type(enr);
295
296        let node_record = match self.try_into_reachable(enr, socket) {
297            Ok(enr_bc) => enr_bc,
298            Err(err) => {
299                trace!(target: "net::discv5",
300                    %err,
301                    ?enr,
302                    "discovered peer is unreachable"
303                );
304
305                self.metrics.discovered_peers.increment_established_sessions_unreachable_enr(1);
306
307                return None
308            }
309        };
310        if let FilterOutcome::Ignore { reason } = self.filter_discovered_peer(enr) {
311            trace!(target: "net::discv5",
312                ?enr,
313                reason,
314                "filtered out discovered peer"
315            );
316
317            self.metrics.discovered_peers.increment_established_sessions_filtered(1);
318
319            return None
320        }
321
322        let fork_id = self.get_fork_id(enr).ok();
323
324        trace!(target: "net::discv5",
325            ?fork_id,
326            ?enr,
327            "discovered peer"
328        );
329
330        Some(DiscoveredPeer { node_record, fork_id })
331    }
332
333    /// Tries to recover an unreachable [`Enr`](discv5::Enr) received via
334    /// [`discv5::Event::UnverifiableEnr`], into a [`NodeRecord`] usable by `RLPx`.
335    ///
336    /// NOTE: Fallback solution to be compatible with Geth which includes peers into the discv5
337    /// WAN topology which, for example, advertise in their ENR that localhost is their UDP IP
338    /// address. These peers are only discovered if they initiate a connection attempt, and we by
339    /// such means learn their reachable IP address. If we receive their ENR from any other peer
340    /// as part of a lookup query, we won't find a reachable IP address on which to dial them by
341    /// reading their ENR.
342    pub fn try_into_reachable(
343        &self,
344        enr: &discv5::Enr,
345        socket: SocketAddr,
346    ) -> Result<NodeRecord, Error> {
347        // ignore UDP socket advertised in ENR, use sender socket instead
348        let address = socket.ip();
349        let udp_port = socket.port();
350
351        let id = enr_to_discv4_id(enr).ok_or(Error::IncompatibleKeyType)?;
352
353        let tcp_port = (match self.rlpx_ip_mode {
354            IpMode::Ip4 => enr.tcp4(),
355            IpMode::Ip6 => enr.tcp6(),
356            IpMode::DualStack => unimplemented!("dual-stack support not implemented for rlpx"),
357        })
358        .unwrap_or(
359            // tcp socket is missing from ENR, or is wrong IP version.
360            //
361            // by default geth runs discv5 and discv4 behind the same udp port (the discv4 default
362            // port 30303), so rlpx has a chance of successfully dialing the peer on its discv5
363            // udp port if its running geth's p2p code.
364            udp_port,
365        );
366
367        Ok(NodeRecord { address, tcp_port, udp_port, id })
368    }
369
370    /// Applies filtering rules on an ENR. Returns [`Ok`](FilterOutcome::Ok) if peer should be
371    /// passed up to app, and [`Ignore`](FilterOutcome::Ignore) if peer should instead be dropped.
372    pub fn filter_discovered_peer(&self, enr: &discv5::Enr) -> FilterOutcome {
373        self.discovered_peer_filter.filter(enr)
374    }
375
376    /// Returns the [`ForkId`] of the given [`Enr`](discv5::Enr) w.r.t. the local node's network
377    /// stack, if field is set.
378    pub fn get_fork_id<K: discv5::enr::EnrKey>(
379        &self,
380        enr: &discv5::enr::Enr<K>,
381    ) -> Result<ForkId, Error> {
382        let Some(key) = self.fork_key else { return Err(Error::NetworkStackIdNotConfigured) };
383        let fork_id = enr
384            .get_decodable::<EnrForkIdEntry>(key)
385            .or_else(|| {
386                (key != NetworkStackId::ETH)
387                    .then(|| {
388                        // Fallback: trying to get fork id from Enr with 'eth' as network stack id
389                        trace!(target: "net::discv5",
390                            key = %String::from_utf8_lossy(key),
391                            "Fork id not found for key, trying 'eth'..."
392                        );
393                        enr.get_decodable::<EnrForkIdEntry>(NetworkStackId::ETH)
394                    })
395                    .flatten()
396            })
397            .ok_or({
398                trace!(target: "net::discv5", "Fork id not found for 'eth' network stack id");
399                Error::ForkMissing(key)
400            })?
401            .map(Into::into)?;
402
403        Ok(fork_id)
404    }
405
406    ////////////////////////////////////////////////////////////////////////////////////////////////
407    // Interface with sigp/discv5
408    ////////////////////////////////////////////////////////////////////////////////////////////////
409
410    /// Exposes API of [`discv5::Discv5`].
411    pub fn with_discv5<F, R>(&self, f: F) -> R
412    where
413        F: FnOnce(&discv5::Discv5) -> R,
414    {
415        f(&self.discv5)
416    }
417
418    ////////////////////////////////////////////////////////////////////////////////////////////////
419    // Complementary
420    ////////////////////////////////////////////////////////////////////////////////////////////////
421
422    /// Returns the `RLPx` [`IpMode`] of the local node.
423    pub const fn ip_mode(&self) -> IpMode {
424        self.rlpx_ip_mode
425    }
426
427    /// Returns the key to use to identify the [`ForkId`] kv-pair on the [`Enr`](discv5::Enr).
428    pub const fn fork_key(&self) -> Option<&[u8]> {
429        self.fork_key
430    }
431}
432
433impl fmt::Debug for Discv5 {
434    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
435        "{ .. }".fmt(f)
436    }
437}
438
439/// Result of successfully processing a peer discovered by [`discv5::Discv5`].
440#[derive(Debug)]
441pub struct DiscoveredPeer {
442    /// A discovery v4 backwards compatible ENR.
443    pub node_record: NodeRecord,
444    /// [`ForkId`] extracted from ENR w.r.t. configured
445    pub fork_id: Option<ForkId>,
446}
447
448/// Builds the local ENR with the supplied key.
449pub fn build_local_enr(
450    sk: &SecretKey,
451    config: &Config,
452) -> (Enr<SecretKey>, NodeRecord, Option<&'static [u8]>, IpMode) {
453    let mut builder = discv5::enr::Enr::builder();
454
455    let Config { discv5_config, fork, tcp_socket, other_enr_kv_pairs, .. } = config;
456
457    let socket = match discv5_config.listen_config {
458        ListenConfig::Ipv4 { ip, port } => {
459            if ip != Ipv4Addr::UNSPECIFIED {
460                builder.ip4(ip);
461            }
462            builder.udp4(port);
463            builder.tcp4(tcp_socket.port());
464
465            (ip, port).into()
466        }
467        ListenConfig::Ipv6 { ip, port } => {
468            if ip != Ipv6Addr::UNSPECIFIED {
469                builder.ip6(ip);
470            }
471            builder.udp6(port);
472            builder.tcp6(tcp_socket.port());
473
474            (ip, port).into()
475        }
476        ListenConfig::DualStack { ipv4, ipv4_port, ipv6, ipv6_port } => {
477            if ipv4 != Ipv4Addr::UNSPECIFIED {
478                builder.ip4(ipv4);
479            }
480            builder.udp4(ipv4_port);
481            builder.tcp4(tcp_socket.port());
482
483            if ipv6 != Ipv6Addr::UNSPECIFIED {
484                builder.ip6(ipv6);
485            }
486            builder.udp6(ipv6_port);
487
488            (ipv6, ipv6_port).into()
489        }
490    };
491
492    let rlpx_ip_mode = if tcp_socket.is_ipv4() { IpMode::Ip4 } else { IpMode::Ip6 };
493
494    // identifies which network node is on
495    let network_stack_id = fork.as_ref().map(|(network_stack_id, fork_value)| {
496        builder.add_value_rlp(network_stack_id, alloy_rlp::encode(fork_value).into());
497        *network_stack_id
498    });
499
500    // add other data
501    for (key, value) in other_enr_kv_pairs {
502        builder.add_value_rlp(key, value.clone().into());
503    }
504
505    // enr v4 not to get confused with discv4, independent versioning enr and
506    // discovery
507    let enr = builder.build(sk).expect("should build enr v4");
508
509    // backwards compatible enr
510    let bc_enr = NodeRecord::from_secret_key(socket, sk);
511
512    (enr, bc_enr, network_stack_id, rlpx_ip_mode)
513}
514
515/// Bootstraps underlying [`discv5::Discv5`] node with configured peers.
516pub async fn bootstrap(
517    bootstrap_nodes: HashSet<BootNode>,
518    discv5: &Arc<discv5::Discv5>,
519) -> Result<(), Error> {
520    trace!(target: "net::discv5",
521        ?bootstrap_nodes,
522        "adding bootstrap nodes .."
523    );
524
525    let mut enr_requests = vec![];
526    for node in bootstrap_nodes {
527        match node {
528            BootNode::Enr(node) => {
529                if let Err(err) = discv5.add_enr(node) {
530                    return Err(Error::AddNodeFailed(err))
531                }
532            }
533            BootNode::Enode(enode) => {
534                let discv5 = discv5.clone();
535                enr_requests.push(async move {
536                    if let Err(err) = discv5.request_enr(enode.to_string()).await {
537                        debug!(target: "net::discv5",
538                            ?enode,
539                            %err,
540                            "failed adding boot node"
541                        );
542                    }
543                })
544            }
545        }
546    }
547
548    // If a session is established, the ENR is added straight away to discv5 kbuckets
549    Ok(_ = join_all(enr_requests).await)
550}
551
552/// Backgrounds regular look up queries, in order to keep kbuckets populated.
553pub fn spawn_populate_kbuckets_bg(
554    lookup_interval: u64,
555    bootstrap_lookup_interval: u64,
556    bootstrap_lookup_countdown: u64,
557    metrics: Discv5Metrics,
558    discv5: Arc<discv5::Discv5>,
559) {
560    let local_node_id = discv5.local_enr().node_id();
561    let lookup_interval = Duration::from_secs(lookup_interval);
562    let metrics = metrics.discovered_peers;
563    let mut kbucket_index = MAX_KBUCKET_INDEX;
564    let pulse_lookup_interval = Duration::from_secs(bootstrap_lookup_interval);
565    task::spawn(Box::pin(async move {
566        // make many fast lookup queries at bootstrap, trying to fill kbuckets at furthest
567        // log2distance from local node
568        for i in (0..bootstrap_lookup_countdown).rev() {
569            let target = discv5::enr::NodeId::random();
570
571            trace!(target: "net::discv5",
572                %target,
573                bootstrap_boost_runs_countdown=i,
574                lookup_interval=format!("{:#?}", pulse_lookup_interval),
575                "starting bootstrap boost lookup query"
576            );
577
578            lookup(target, &discv5, &metrics).await;
579
580            tokio::time::sleep(pulse_lookup_interval).await;
581        }
582
583        // initiate regular lookups to populate kbuckets
584        loop {
585            // make sure node is connected to each subtree in the network by target
586            // selection (ref kademlia)
587            let target = get_lookup_target(kbucket_index, local_node_id);
588
589            trace!(target: "net::discv5",
590                %target,
591                lookup_interval=format!("{:#?}", lookup_interval),
592                "starting periodic lookup query"
593            );
594
595            lookup(target, &discv5, &metrics).await;
596
597            if kbucket_index > DEFAULT_MIN_TARGET_KBUCKET_INDEX {
598                // try to populate bucket one step closer
599                kbucket_index -= 1
600            } else {
601                // start over with bucket furthest away
602                kbucket_index = MAX_KBUCKET_INDEX
603            }
604
605            tokio::time::sleep(lookup_interval).await;
606        }
607    }));
608}
609
610/// Gets the next lookup target, based on which bucket is currently being targeted.
611pub fn get_lookup_target(
612    kbucket_index: usize,
613    local_node_id: discv5::enr::NodeId,
614) -> discv5::enr::NodeId {
615    // init target
616    let mut target = local_node_id.raw();
617
618    // make sure target has a 'log2distance'-long suffix that differs from local node id
619    let bit_offset = MAX_KBUCKET_INDEX.saturating_sub(kbucket_index);
620    let (byte, bit) = (bit_offset / 8, bit_offset % 8);
621    // Flip the target bit.
622    target[byte] ^= 1 << (7 - bit);
623
624    // Randomize the bits after the target.
625    let mut rng = rand::rng();
626    // Randomize remaining bits in the byte we modified.
627    if bit < 7 {
628        // Compute the mask of the bits that need to be randomized.
629        let bits_to_randomize = 0xff >> (bit + 1);
630        // Clear.
631        target[byte] &= !bits_to_randomize;
632        // Randomize.
633        target[byte] |= rng.random::<u8>() & bits_to_randomize;
634    }
635    // Randomize remaining bytes.
636    rng.fill_bytes(&mut target[byte + 1..]);
637
638    target.into()
639}
640
641/// Runs a [`discv5::Discv5`] lookup query.
642pub async fn lookup(
643    target: discv5::enr::NodeId,
644    discv5: &discv5::Discv5,
645    metrics: &DiscoveredPeersMetrics,
646) {
647    metrics.set_total_sessions(discv5.metrics().active_sessions);
648    metrics.set_total_kbucket_peers(
649        discv5.with_kbuckets(|kbuckets| kbuckets.read().iter_ref().count()),
650    );
651
652    match discv5.find_node(target).await {
653        Err(err) => trace!(target: "net::discv5",
654            %err,
655            "lookup query failed"
656        ),
657        Ok(peers) => trace!(target: "net::discv5",
658            target=format!("{:#?}", target),
659            peers_count=peers.len(),
660            peers=format!("[{:#}]", peers.iter()
661                .map(|enr| enr.node_id()
662            ).format(", ")),
663            "peers returned by lookup query"
664        ),
665    }
666
667    // `Discv5::connected_peers` can be subset of sessions, not all peers make it
668    // into kbuckets, e.g. incoming sessions from peers with
669    // unreachable enrs
670    debug!(target: "net::discv5",
671        connected_peers=discv5.connected_peers(),
672        "connected peers in routing table"
673    );
674}
675
676#[cfg(test)]
677mod test {
678    use super::*;
679    use ::enr::{CombinedKey, EnrKey};
680    use rand_08::thread_rng;
681    use reth_chainspec::MAINNET;
682    use reth_tracing::init_test_tracing;
683    use std::env;
684    use tracing::trace;
685
686    fn discv5_noop() -> Discv5 {
687        let sk = CombinedKey::generate_secp256k1();
688        Discv5 {
689            discv5: Arc::new(
690                discv5::Discv5::new(
691                    Enr::empty(&sk).unwrap(),
692                    sk,
693                    discv5::ConfigBuilder::new(DEFAULT_DISCOVERY_V5_LISTEN_CONFIG).build(),
694                )
695                .unwrap(),
696            ),
697            rlpx_ip_mode: IpMode::Ip4,
698            fork_key: None,
699            discovered_peer_filter: MustNotIncludeKeys::default(),
700            metrics: Discv5Metrics::default(),
701        }
702    }
703
704    async fn start_discovery_node(
705        udp_port_discv5: u16,
706    ) -> (Discv5, mpsc::Receiver<discv5::Event>, NodeRecord) {
707        let secret_key = SecretKey::new(&mut thread_rng());
708
709        let discv5_addr: SocketAddr = format!("127.0.0.1:{udp_port_discv5}").parse().unwrap();
710        let rlpx_addr: SocketAddr = "127.0.0.1:30303".parse().unwrap();
711
712        let discv5_listen_config = ListenConfig::from(discv5_addr);
713        let discv5_config = Config::builder(rlpx_addr)
714            .discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build())
715            .build();
716
717        Discv5::start(&secret_key, discv5_config).await.expect("should build discv5")
718    }
719
720    #[tokio::test(flavor = "multi_thread")]
721    async fn discv5() {
722        reth_tracing::init_test_tracing();
723
724        // rig test
725
726        // rig node_1
727        let (node_1, mut stream_1, _) = start_discovery_node(30344).await;
728        let node_1_enr = node_1.with_discv5(|discv5| discv5.local_enr());
729
730        // rig node_2
731        let (node_2, mut stream_2, _) = start_discovery_node(30355).await;
732        let node_2_enr = node_2.with_discv5(|discv5| discv5.local_enr());
733
734        trace!(target: "net::discv5::test",
735            node_1_node_id=format!("{:#}", node_1_enr.node_id()),
736            node_2_node_id=format!("{:#}", node_2_enr.node_id()),
737            "started nodes"
738        );
739
740        // test
741
742        // add node_2 to discovery handle of node_1 (should add node to discv5 kbuckets)
743        let node_2_enr_reth_compatible_ty: Enr<SecretKey> =
744            EnrCombinedKeyWrapper(node_2_enr.clone()).into();
745        node_1.add_node(node_2_enr_reth_compatible_ty).unwrap();
746
747        // verify node_2 is in KBuckets of node_1:discv5
748        assert!(
749            node_1.with_discv5(|discv5| discv5.table_entries_id().contains(&node_2_enr.node_id()))
750        );
751
752        // manually trigger connection from node_1 to node_2
753        node_1.with_discv5(|discv5| discv5.send_ping(node_2_enr.clone())).await.unwrap();
754
755        // verify node_1:discv5 is connected to node_2:discv5 and vv
756        let event_1_v5 = stream_1.recv().await.unwrap();
757
758        assert!(matches!(
759            event_1_v5,
760            discv5::Event::SessionEstablished(node, socket) if node == node_2_enr && socket == node_2_enr.udp4_socket().unwrap().into()
761        ));
762
763        // verify node_1 is in KBuckets of node_2:discv5
764        let event_2_v5 = stream_2.recv().await.unwrap();
765        assert!(matches!(
766            event_2_v5,
767            discv5::Event::NodeInserted { node_id, replaced } if node_id == node_1_enr.node_id() && replaced.is_none()
768        ));
769    }
770
771    #[test]
772    fn discovered_enr_disc_socket_missing() {
773        reth_tracing::init_test_tracing();
774
775        // rig test
776        const REMOTE_RLPX_PORT: u16 = 30303;
777        let remote_socket = "104.28.44.25:9000".parse().unwrap();
778        let remote_key = CombinedKey::generate_secp256k1();
779        let remote_enr = Enr::builder().tcp4(REMOTE_RLPX_PORT).build(&remote_key).unwrap();
780
781        let discv5 = discv5_noop();
782
783        // test
784        let filtered_peer = discv5.on_discovered_peer(&remote_enr, remote_socket);
785
786        assert_eq!(
787            NodeRecord {
788                address: remote_socket.ip(),
789                udp_port: remote_socket.port(),
790                tcp_port: REMOTE_RLPX_PORT,
791                id: enr_to_discv4_id(&remote_enr).unwrap(),
792            },
793            filtered_peer.unwrap().node_record
794        )
795    }
796
797    // Copied from sigp/discv5 with slight modification (U256 type)
798    // <https://github.com/sigp/discv5/blob/master/src/kbucket/key.rs#L89-L101>
799    #[expect(unreachable_pub)]
800    #[expect(unused)]
801    mod sigp {
802        use alloy_primitives::U256;
803        use enr::{
804            k256::sha2::digest::generic_array::{typenum::U32, GenericArray},
805            NodeId,
806        };
807
808        /// A `Key` is a cryptographic hash, identifying both the nodes participating in
809        /// the Kademlia DHT, as well as records stored in the DHT.
810        ///
811        /// The set of all `Key`s defines the Kademlia keyspace.
812        ///
813        /// `Key`s have an XOR metric as defined in the Kademlia paper, i.e. the bitwise XOR of
814        /// the hash digests, interpreted as an integer. See [`Key::distance`].
815        ///
816        /// A `Key` preserves the preimage of type `T` of the hash function. See [`Key::preimage`].
817        #[derive(Clone, Debug)]
818        pub struct Key<T> {
819            preimage: T,
820            hash: GenericArray<u8, U32>,
821        }
822
823        impl<T> PartialEq for Key<T> {
824            fn eq(&self, other: &Self) -> bool {
825                self.hash == other.hash
826            }
827        }
828
829        impl<T> Eq for Key<T> {}
830
831        impl<TPeerId> AsRef<Self> for Key<TPeerId> {
832            fn as_ref(&self) -> &Self {
833                self
834            }
835        }
836
837        impl<T> Key<T> {
838            /// Construct a new `Key` by providing the raw 32 byte hash.
839            pub const fn new_raw(preimage: T, hash: GenericArray<u8, U32>) -> Self {
840                Self { preimage, hash }
841            }
842
843            /// Borrows the preimage of the key.
844            pub const fn preimage(&self) -> &T {
845                &self.preimage
846            }
847
848            /// Converts the key into its preimage.
849            pub fn into_preimage(self) -> T {
850                self.preimage
851            }
852
853            /// Computes the distance of the keys according to the XOR metric.
854            pub fn distance<U>(&self, other: &Key<U>) -> Distance {
855                let a = U256::from_be_slice(self.hash.as_slice());
856                let b = U256::from_be_slice(other.hash.as_slice());
857                Distance(a ^ b)
858            }
859
860            // Used in the FINDNODE query outside of the k-bucket implementation.
861            /// Computes the integer log-2 distance between two keys, assuming a 256-bit
862            /// key. The output returns None if the key's are identical. The range is 1-256.
863            pub fn log2_distance<U>(&self, other: &Key<U>) -> Option<u64> {
864                let xor_dist = self.distance(other);
865                let log_dist = (256 - xor_dist.0.leading_zeros() as u64);
866                (log_dist != 0).then_some(log_dist)
867            }
868        }
869
870        impl From<NodeId> for Key<NodeId> {
871            fn from(node_id: NodeId) -> Self {
872                Self { preimage: node_id, hash: *GenericArray::from_slice(&node_id.raw()) }
873            }
874        }
875
876        /// A distance between two `Key`s.
877        #[derive(Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord, Debug)]
878        pub struct Distance(pub(super) U256);
879    }
880
881    #[test]
882    fn select_lookup_target() {
883        for bucket_index in 0..=MAX_KBUCKET_INDEX {
884            let sk = CombinedKey::generate_secp256k1();
885            let local_node_id = discv5::enr::NodeId::from(sk.public());
886            let target = get_lookup_target(bucket_index, local_node_id);
887
888            let local_node_id = sigp::Key::from(local_node_id);
889            let target = sigp::Key::from(target);
890
891            assert_eq!(local_node_id.log2_distance(&target), Some(bucket_index as u64 + 1));
892        }
893    }
894
895    #[test]
896    fn build_enr_from_config() {
897        const TCP_PORT: u16 = 30303;
898        let fork_id = MAINNET.latest_fork_id();
899
900        let config = Config::builder((Ipv4Addr::UNSPECIFIED, TCP_PORT).into())
901            .fork(NetworkStackId::ETH, fork_id)
902            .build();
903
904        let sk = SecretKey::new(&mut thread_rng());
905        let (enr, _, _, _) = build_local_enr(&sk, &config);
906
907        let decoded_fork_id = enr
908            .get_decodable::<EnrForkIdEntry>(NetworkStackId::ETH)
909            .unwrap()
910            .map(Into::into)
911            .unwrap();
912
913        assert_eq!(fork_id, decoded_fork_id);
914        assert_eq!(TCP_PORT, enr.tcp4().unwrap()); // listen config is defaulting to ip mode ipv4
915    }
916
917    #[test]
918    fn get_fork_id_with_different_network_stack_ids() {
919        unsafe {
920            env::set_var("RUST_LOG", "net::discv5=trace");
921        }
922        init_test_tracing();
923
924        let fork_id = MAINNET.latest_fork_id();
925        let sk = SecretKey::new(&mut thread_rng());
926
927        // Test 1: ENR with OPEL fork ID, Discv5 configured for OPEL
928        let enr_with_opel = Enr::builder()
929            .add_value_rlp(
930                NetworkStackId::OPEL,
931                alloy_rlp::encode(EnrForkIdEntry::from(fork_id)).into(),
932            )
933            .build(&sk)
934            .unwrap();
935
936        let mut discv5 = discv5_noop();
937        discv5.fork_key = Some(NetworkStackId::OPEL);
938        assert_eq!(discv5.get_fork_id(&enr_with_opel).unwrap(), fork_id);
939
940        // Test 2: ENR with ETH fork ID, Discv5 configured for OPEL (fallback to ETH)
941        let enr_with_eth = Enr::builder()
942            .add_value_rlp(
943                NetworkStackId::ETH,
944                alloy_rlp::encode(EnrForkIdEntry::from(fork_id)).into(),
945            )
946            .build(&sk)
947            .unwrap();
948
949        discv5.fork_key = Some(NetworkStackId::OPEL);
950        assert_eq!(discv5.get_fork_id(&enr_with_eth).unwrap(), fork_id);
951
952        // Test 3: ENR with neither OPEL nor ETH fork ID (should fail)
953        let enr_without_network_stack_id = Enr::empty(&sk).unwrap();
954        discv5.fork_key = Some(NetworkStackId::OPEL);
955        assert!(matches!(
956            discv5.get_fork_id(&enr_without_network_stack_id),
957            Err(Error::ForkMissing(NetworkStackId::OPEL))
958        ));
959
960        // Test 4: discv5 without network stack id configured (should fail)
961        let discv5 = discv5_noop();
962        assert!(matches!(
963            discv5.get_fork_id(&enr_without_network_stack_id),
964            Err(Error::NetworkStackIdNotConfigured)
965        ));
966    }
967}