Skip to main content

reth_discv5/
lib.rs

1//! Wrapper around [`discv5::Discv5`].
2
3#![doc(
4    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
5    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
6    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
7)]
8#![cfg_attr(not(test), warn(unused_crate_dependencies))]
9#![cfg_attr(docsrs, feature(doc_cfg))]
10
11use std::{
12    collections::HashSet,
13    fmt,
14    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr},
15    sync::Arc,
16    time::Duration,
17};
18
19use ::enr::Enr;
20use alloy_primitives::bytes::Bytes;
21use enr::{discv4_id_to_discv5_id, EnrCombinedKeyWrapper};
22use futures::future::join_all;
23use itertools::Itertools;
24use rand::{Rng, RngCore};
25use reth_ethereum_forks::{EnrForkIdEntry, ForkId};
26use reth_network_peers::{NodeRecord, PeerId};
27use secp256k1::SecretKey;
28use tokio::{sync::mpsc, task};
29use tracing::{debug, error, trace};
30
31pub mod config;
32pub mod enr;
33pub mod error;
34pub mod filter;
35pub mod metrics;
36pub mod network_stack_id;
37
38pub use discv5::{self, IpMode};
39
40pub use config::{
41    BootNode, Config, ConfigBuilder, DEFAULT_COUNT_BOOTSTRAP_LOOKUPS, DEFAULT_DISCOVERY_V5_ADDR,
42    DEFAULT_DISCOVERY_V5_ADDR_IPV6, DEFAULT_DISCOVERY_V5_LISTEN_CONFIG, DEFAULT_DISCOVERY_V5_PORT,
43    DEFAULT_SECONDS_BOOTSTRAP_LOOKUP_INTERVAL, DEFAULT_SECONDS_LOOKUP_INTERVAL,
44};
45pub use enr::enr_to_discv4_id;
46pub use error::Error;
47pub use filter::{FilterOutcome, MustNotIncludeKeys};
48pub use network_stack_id::NetworkStackId;
49
50use metrics::{DiscoveredPeersMetrics, Discv5Metrics};
51
52/// Max kbucket index is 255.
53///
54/// This is the max log2distance for 32 byte [`NodeId`](discv5::enr::NodeId) - 1. See <https://github.com/sigp/discv5/blob/e9e0d4f93ec35591832a9a8d937b4161127da87b/src/kbucket.rs#L586-L587>.
55pub const MAX_KBUCKET_INDEX: usize = 255;
56
57/// Default lowest kbucket index to attempt filling, in periodic look up query to populate kbuckets.
58///
59/// The peer at the 0th kbucket index is at log2distance 1 from the local node ID. See <https://github.com/sigp/discv5/blob/e9e0d4f93ec35591832a9a8d937b4161127da87b/src/kbucket.rs#L586-L587>.
60///
61/// Default is 0th index.
62pub const DEFAULT_MIN_TARGET_KBUCKET_INDEX: usize = 0;
63
64/// Transparent wrapper around [`discv5::Discv5`].
65#[derive(Clone)]
66pub struct Discv5 {
67    /// sigp/discv5 node.
68    discv5: Arc<discv5::Discv5>,
69    /// [`IpMode`] of the `RLPx` network.
70    rlpx_ip_mode: IpMode,
71    /// Key used in kv-pair to ID chain, e.g. 'opstack' or 'eth'.
72    fork_key: Option<&'static [u8]>,
73    /// Filter applied to a discovered peers before passing it up to app.
74    discovered_peer_filter: MustNotIncludeKeys,
75    /// Metrics for underlying [`discv5::Discv5`] node and filtered discovered peers.
76    metrics: Discv5Metrics,
77    /// Returns the _local_ [`NodeRecord`] this service was started with.
78    // Note: we must track this separately because the `discv5::Discv5` does not necessarily
79    // provide this via its [`local_enr`](discv5::Discv5::local_enr()). This is intended for
80    // obtaining the port this service was launched at
81    local_node_record: NodeRecord,
82}
83
84impl Discv5 {
85    ////////////////////////////////////////////////////////////////////////////////////////////////
86    // Minimal interface with `reth_network::discovery`
87    ////////////////////////////////////////////////////////////////////////////////////////////////
88
89    /// Adds the node to the table, if it is not already present.
90    pub fn add_node(&self, node_record: Enr<SecretKey>) -> Result<(), Error> {
91        let EnrCombinedKeyWrapper(enr) = node_record.into();
92        self.discv5.add_enr(enr).map_err(Error::AddNodeFailed)
93    }
94
95    /// Sets the pair in the EIP-868 [`Enr`] of the node.
96    ///
97    /// If the key already exists, this will update it.
98    ///
99    /// CAUTION: The value **must** be rlp encoded
100    pub fn set_eip868_in_local_enr(&self, key: Vec<u8>, rlp: Bytes) {
101        let Ok(key_str) = std::str::from_utf8(&key) else {
102            error!(target: "net::discv5",
103                err="key not utf-8",
104                "failed to update local enr"
105            );
106            return
107        };
108        if let Err(err) = self.discv5.enr_insert(key_str, &rlp) {
109            error!(target: "net::discv5",
110                %err,
111                "failed to update local enr"
112            );
113        }
114    }
115
116    /// Sets the pair in the EIP-868 [`Enr`] of the node.
117    ///
118    /// If the key already exists, this will update it.
119    pub fn encode_and_set_eip868_in_local_enr(
120        &self,
121        key: Vec<u8>,
122        value: impl alloy_rlp::Encodable,
123    ) {
124        let mut buf = Vec::new();
125        value.encode(&mut buf);
126        self.set_eip868_in_local_enr(key, buf.into())
127    }
128
129    /// Adds the peer and id to the ban list.
130    ///
131    /// This will prevent any future inclusion in the table
132    pub fn ban(&self, peer_id: PeerId, ip: IpAddr) {
133        match discv4_id_to_discv5_id(peer_id) {
134            Ok(node_id) => {
135                self.discv5.ban_node(&node_id, None);
136                self.ban_ip(ip);
137            }
138            Err(err) => error!(target: "net::discv5",
139                %err,
140                "failed to ban peer"
141            ),
142        }
143    }
144
145    /// Adds the ip to the ban list.
146    ///
147    /// This will prevent any future inclusion in the table
148    pub fn ban_ip(&self, ip: IpAddr) {
149        self.discv5.ban_ip(ip, None);
150    }
151
152    /// Returns the [`NodeRecord`] of the local node.
153    ///
154    /// This includes the currently tracked external IP address of the node.
155    ///
156    /// Returns `None` if the local ENR does not contain the required fields.
157    pub fn node_record(&self) -> Option<NodeRecord> {
158        let enr: Enr<_> = EnrCombinedKeyWrapper(self.discv5.local_enr()).into();
159        enr.try_into().ok()
160    }
161
162    /// Returns the local [`Enr`] of the service.
163    pub fn local_enr(&self) -> Enr<discv5::enr::CombinedKey> {
164        self.discv5.local_enr()
165    }
166
167    /// The port the discv5 service is listening on.
168    pub const fn local_port(&self) -> u16 {
169        self.local_node_record.udp_port
170    }
171
172    /// Spawns [`discv5::Discv5`]. Returns [`discv5::Discv5`] handle in reth compatible wrapper type
173    /// [`Discv5`], a receiver of [`discv5::Event`]s from the underlying node, and the local
174    /// [`Enr`](discv5::Enr) converted into the reth compatible [`NodeRecord`] type.
175    pub async fn start(
176        sk: &SecretKey,
177        discv5_config: Config,
178    ) -> Result<(Self, mpsc::Receiver<discv5::Event>), Error> {
179        //
180        // 1. make local enr from listen config
181        //
182        let (enr, local_node_record, fork_key, rlpx_ip_mode) = build_local_enr(sk, &discv5_config);
183
184        trace!(target: "net::discv5", ?enr, "local ENR");
185
186        //
187        // 2. start discv5
188        //
189        let Config {
190            discv5_config,
191            bootstrap_nodes,
192            lookup_interval,
193            bootstrap_lookup_interval,
194            bootstrap_lookup_countdown,
195            discovered_peer_filter,
196            ..
197        } = discv5_config;
198
199        let EnrCombinedKeyWrapper(enr) = enr.into();
200        let sk = discv5::enr::CombinedKey::secp256k1_from_bytes(&mut sk.secret_bytes()).unwrap();
201        let mut discv5 = match discv5::Discv5::new(enr, sk, discv5_config) {
202            Ok(discv5) => discv5,
203            Err(err) => return Err(Error::InitFailure(err)),
204        };
205        discv5.start().await.map_err(Error::Discv5Error)?;
206
207        // start discv5 updates stream
208        let discv5_updates = discv5.event_stream().await.map_err(Error::Discv5Error)?;
209
210        let discv5 = Arc::new(discv5);
211
212        //
213        // 3. add boot nodes
214        //
215        bootstrap(bootstrap_nodes, &discv5).await?;
216
217        let metrics = Discv5Metrics::default();
218
219        //
220        // 4. start bg kbuckets maintenance
221        //
222        spawn_populate_kbuckets_bg(
223            lookup_interval,
224            bootstrap_lookup_interval,
225            bootstrap_lookup_countdown,
226            metrics.clone(),
227            Arc::downgrade(&discv5),
228        );
229
230        Ok((
231            Self {
232                discv5,
233                rlpx_ip_mode,
234                fork_key,
235                discovered_peer_filter,
236                metrics,
237                local_node_record,
238            },
239            discv5_updates,
240        ))
241    }
242
243    /// Process an event from the underlying [`discv5::Discv5`] node.
244    pub fn on_discv5_update(&self, update: discv5::Event) -> Option<DiscoveredPeer> {
245        #[expect(clippy::match_same_arms)]
246        match update {
247            discv5::Event::SocketUpdated(_) | discv5::Event::TalkRequest(_) |
248            // `Discovered` not unique discovered peers
249            discv5::Event::Discovered(_) |
250            // Unrecognized frames are handled separately by the discovery layer
251            discv5::Event::UnrecognizedFrame(_) => None,
252            discv5::Event::NodeInserted { .. } => {
253
254                // node has been inserted into kbuckets
255
256                // `replaced` partly covers `reth_discv4::DiscoveryUpdate::Removed(_)`
257
258                self.metrics.discovered_peers.increment_kbucket_insertions(1);
259
260                None
261            }
262            discv5::Event::SessionEstablished(enr, remote_socket) => {
263                // this branch is semantically similar to branches of
264                // `reth_discv4::DiscoveryUpdate`: `DiscoveryUpdate::Added(_)` and
265                // `DiscoveryUpdate::DiscoveredAtCapacity(_)
266
267                // peer has been discovered as part of query, or, by incoming session (peer has
268                // discovered us)
269
270                self.metrics.discovered_peers.increment_established_sessions_raw(1);
271
272                self.on_discovered_peer(&enr, remote_socket)
273            }
274            discv5::Event::UnverifiableEnr {
275                enr,
276                socket,
277                node_id: _,
278            } => {
279                // this branch is semantically similar to branches of
280                // `reth_discv4::DiscoveryUpdate`: `DiscoveryUpdate::Added(_)` and
281                // `DiscoveryUpdate::DiscoveredAtCapacity(_)
282
283                // peer has been discovered as part of query, or, by an outgoing session (but peer
284                // is behind NAT and responds from a different socket)
285
286                // NOTE: `discv5::Discv5` won't initiate a session with any peer with an
287                // unverifiable node record, for example one that advertises a reserved LAN IP
288                // address on a WAN network. This is in order to prevent DoS attacks, where some
289                // malicious peers may advertise a victim's socket. We will still try and connect
290                // to them over RLPx, to be compatible with EL discv5 implementations that don't
291                // enforce this security measure.
292
293                trace!(target: "net::discv5",
294                    ?enr,
295                    %socket,
296                    "discovered unverifiable enr, source socket doesn't match socket advertised in ENR"
297                );
298
299                self.metrics.discovered_peers.increment_unverifiable_enrs_raw_total(1);
300
301                self.on_discovered_peer(&enr, socket)
302            }
303            _ => None
304        }
305    }
306
307    /// Processes a discovered peer. Returns `true` if peer is added to
308    pub fn on_discovered_peer(
309        &self,
310        enr: &discv5::Enr,
311        socket: SocketAddr,
312    ) -> Option<DiscoveredPeer> {
313        self.metrics.discovered_peers_advertised_networks.increment_once_by_network_type(enr);
314
315        let node_record = match self.try_into_reachable(enr, socket) {
316            Ok(enr_bc) => enr_bc,
317            Err(err) => {
318                trace!(target: "net::discv5",
319                    %err,
320                    ?enr,
321                    "discovered peer is unreachable"
322                );
323
324                self.metrics.discovered_peers.increment_established_sessions_unreachable_enr(1);
325
326                return None
327            }
328        };
329        if let FilterOutcome::Ignore { reason } = self.filter_discovered_peer(enr) {
330            trace!(target: "net::discv5",
331                ?enr,
332                reason,
333                "filtered out discovered peer"
334            );
335
336            self.metrics.discovered_peers.increment_established_sessions_filtered(1);
337
338            return None
339        }
340
341        let fork_id = self.get_fork_id(enr).ok();
342
343        trace!(target: "net::discv5",
344            ?fork_id,
345            ?enr,
346            "discovered peer"
347        );
348
349        Some(DiscoveredPeer { node_record, fork_id })
350    }
351
352    /// Tries to recover an unreachable [`Enr`](discv5::Enr) received via
353    /// [`discv5::Event::UnverifiableEnr`], into a [`NodeRecord`] usable by `RLPx`.
354    ///
355    /// NOTE: Fallback solution to be compatible with Geth which includes peers into the discv5
356    /// WAN topology which, for example, advertise in their ENR that localhost is their UDP IP
357    /// address. These peers are only discovered if they initiate a connection attempt, and we by
358    /// such means learn their reachable IP address. If we receive their ENR from any other peer
359    /// as part of a lookup query, we won't find a reachable IP address on which to dial them by
360    /// reading their ENR.
361    pub fn try_into_reachable(
362        &self,
363        enr: &discv5::Enr,
364        socket: SocketAddr,
365    ) -> Result<NodeRecord, Error> {
366        // ignore UDP socket advertised in ENR, use sender socket instead
367        let address = socket.ip();
368        let udp_port = socket.port();
369
370        let id = enr_to_discv4_id(enr).ok_or(Error::IncompatibleKeyType)?;
371
372        let tcp_port = (match self.rlpx_ip_mode {
373            IpMode::Ip4 => enr.tcp4(),
374            IpMode::Ip6 => enr.tcp6(),
375            IpMode::DualStack => unimplemented!("dual-stack support not implemented for rlpx"),
376        })
377        .unwrap_or(
378            // tcp socket is missing from ENR, or is wrong IP version.
379            //
380            // by default geth runs discv5 and discv4 behind the same udp port (the discv4 default
381            // port 30303), so rlpx has a chance of successfully dialing the peer on its discv5
382            // udp port if its running geth's p2p code.
383            udp_port,
384        );
385
386        Ok(NodeRecord { address, tcp_port, udp_port, id })
387    }
388
389    /// Applies filtering rules on an ENR. Returns [`Ok`](FilterOutcome::Ok) if peer should be
390    /// passed up to app, and [`Ignore`](FilterOutcome::Ignore) if peer should instead be dropped.
391    pub fn filter_discovered_peer(&self, enr: &discv5::Enr) -> FilterOutcome {
392        self.discovered_peer_filter.filter(enr)
393    }
394
395    /// Returns the [`ForkId`] of the given [`Enr`](discv5::Enr) w.r.t. the local node's network
396    /// stack, if field is set.
397    pub fn get_fork_id<K: discv5::enr::EnrKey>(
398        &self,
399        enr: &discv5::enr::Enr<K>,
400    ) -> Result<ForkId, Error> {
401        let Some(key) = self.fork_key else { return Err(Error::NetworkStackIdNotConfigured) };
402        let fork_id = enr
403            .get_decodable::<EnrForkIdEntry>(key)
404            .or_else(|| {
405                (key != NetworkStackId::ETH)
406                    .then(|| {
407                        // Fallback: trying to get fork id from Enr with 'eth' as network stack id
408                        trace!(target: "net::discv5",
409                            key = %String::from_utf8_lossy(key),
410                            "Fork id not found for key, trying 'eth'..."
411                        );
412                        enr.get_decodable::<EnrForkIdEntry>(NetworkStackId::ETH)
413                    })
414                    .flatten()
415            })
416            .ok_or({
417                trace!(target: "net::discv5", "Fork id not found for 'eth' network stack id");
418                Error::ForkMissing(key)
419            })?
420            .map(Into::into)?;
421
422        Ok(fork_id)
423    }
424
425    ////////////////////////////////////////////////////////////////////////////////////////////////
426    // Interface with sigp/discv5
427    ////////////////////////////////////////////////////////////////////////////////////////////////
428
429    /// Exposes API of [`discv5::Discv5`].
430    pub fn with_discv5<F, R>(&self, f: F) -> R
431    where
432        F: FnOnce(&discv5::Discv5) -> R,
433    {
434        f(&self.discv5)
435    }
436
437    ////////////////////////////////////////////////////////////////////////////////////////////////
438    // Complementary
439    ////////////////////////////////////////////////////////////////////////////////////////////////
440
441    /// Returns the `RLPx` [`IpMode`] of the local node.
442    pub const fn ip_mode(&self) -> IpMode {
443        self.rlpx_ip_mode
444    }
445
446    /// Returns the key to use to identify the [`ForkId`] kv-pair on the [`Enr`](discv5::Enr).
447    pub const fn fork_key(&self) -> Option<&[u8]> {
448        self.fork_key
449    }
450}
451
452impl fmt::Debug for Discv5 {
453    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
454        "{ .. }".fmt(f)
455    }
456}
457
458/// Result of successfully processing a peer discovered by [`discv5::Discv5`].
459#[derive(Debug)]
460pub struct DiscoveredPeer {
461    /// A discovery v4 backwards compatible ENR.
462    pub node_record: NodeRecord,
463    /// [`ForkId`] extracted from ENR w.r.t. configured
464    pub fork_id: Option<ForkId>,
465}
466
467/// Builds the local ENR with the supplied key.
468pub fn build_local_enr(
469    sk: &SecretKey,
470    config: &Config,
471) -> (Enr<SecretKey>, NodeRecord, Option<&'static [u8]>, IpMode) {
472    let mut builder = discv5::enr::Enr::builder();
473
474    let Config { discv5_config, fork, tcp_socket, advertised_ip, other_enr_kv_pairs, .. } = config;
475
476    let socket = {
477        let v4 = crate::config::ipv4(&discv5_config.listen_config);
478        let v6 = crate::config::ipv6(&discv5_config.listen_config);
479
480        // Prefer an explicit advertised IP for ENR IP fields. Listen sockets still supply UDP
481        // ports and determine which address-family fields are emitted.
482        if let Some(addr) = v4 {
483            if let Some(IpAddr::V4(ip)) = advertised_ip {
484                builder.ip4(*ip);
485            } else if *addr.ip() != Ipv4Addr::UNSPECIFIED {
486                builder.ip4(*addr.ip());
487            }
488            builder.udp4(addr.port());
489        }
490        if let Some(addr) = v6 {
491            if let Some(IpAddr::V6(ip)) = advertised_ip {
492                builder.ip6(*ip);
493            } else if *addr.ip() != Ipv6Addr::UNSPECIFIED {
494                builder.ip6(*addr.ip());
495            }
496            builder.udp6(addr.port());
497        }
498        // Advertise tcp4 when v4 is configured, else tcp6.
499        if v4.is_some() {
500            builder.tcp4(tcp_socket.port());
501        } else if v6.is_some() {
502            builder.tcp6(tcp_socket.port());
503        }
504
505        // Prefer v6 when both are configured
506        v6.map(SocketAddr::V6)
507            .or_else(|| v4.map(SocketAddr::V4))
508            .unwrap_or_else(|| SocketAddr::from((Ipv4Addr::UNSPECIFIED, 0)))
509    };
510
511    let rlpx_ip_mode = if tcp_socket.is_ipv4() { IpMode::Ip4 } else { IpMode::Ip6 };
512
513    // identifies which network node is on
514    let network_stack_id = fork.as_ref().map(|(network_stack_id, fork_value)| {
515        builder.add_value_rlp(network_stack_id, alloy_rlp::encode(fork_value).into());
516        *network_stack_id
517    });
518
519    // add other data
520    for (key, value) in other_enr_kv_pairs {
521        builder.add_value_rlp(key, value.clone().into());
522    }
523
524    // enr v4 not to get confused with discv4, independent versioning enr and
525    // discovery
526    let enr = builder.build(sk).expect("should build enr v4");
527
528    // backwards compatible enr
529    let bc_enr = NodeRecord::from_secret_key(socket, sk);
530
531    (enr, bc_enr, network_stack_id, rlpx_ip_mode)
532}
533
534/// Bootstraps underlying [`discv5::Discv5`] node with configured peers.
535pub async fn bootstrap(
536    bootstrap_nodes: HashSet<BootNode>,
537    discv5: &Arc<discv5::Discv5>,
538) -> Result<(), Error> {
539    trace!(target: "net::discv5",
540        ?bootstrap_nodes,
541        "adding bootstrap nodes .."
542    );
543
544    let mut enr_requests = vec![];
545    for node in bootstrap_nodes {
546        match node {
547            BootNode::Enr(node) => {
548                if let Err(err) = discv5.add_enr(node) {
549                    return Err(Error::AddNodeFailed(err))
550                }
551            }
552            BootNode::Enode(enode) => {
553                let discv5 = discv5.clone();
554                enr_requests.push(async move {
555                    if let Err(err) = discv5.request_enr(enode.to_string()).await {
556                        debug!(target: "net::discv5",
557                            ?enode,
558                            %err,
559                            "failed adding boot node"
560                        );
561                    }
562                })
563            }
564        }
565    }
566
567    // If a session is established, the ENR is added straight away to discv5 kbuckets
568    Ok(_ = join_all(enr_requests).await)
569}
570
571/// Backgrounds regular look up queries, in order to keep kbuckets populated.
572pub fn spawn_populate_kbuckets_bg(
573    lookup_interval: u64,
574    bootstrap_lookup_interval: u64,
575    bootstrap_lookup_countdown: u64,
576    metrics: Discv5Metrics,
577    discv5: std::sync::Weak<discv5::Discv5>,
578) {
579    let lookup_interval = Duration::from_secs(lookup_interval);
580    let metrics = metrics.discovered_peers;
581    let mut kbucket_index = MAX_KBUCKET_INDEX;
582    let pulse_lookup_interval = Duration::from_secs(bootstrap_lookup_interval);
583    task::spawn(async move {
584        let Some(discv5_handle) = discv5.upgrade() else {
585            return;
586        };
587        let local_node_id = discv5_handle.local_enr().node_id();
588        drop(discv5_handle);
589
590        // make many fast lookup queries at bootstrap, trying to fill kbuckets at furthest
591        // log2distance from local node
592        for i in (0..bootstrap_lookup_countdown).rev() {
593            let target = discv5::enr::NodeId::random();
594
595            trace!(target: "net::discv5",
596                %target,
597                bootstrap_boost_runs_countdown=i,
598                lookup_interval=format!("{:#?}", pulse_lookup_interval),
599                "starting bootstrap boost lookup query"
600            );
601
602            {
603                let Some(discv5_handle) = discv5.upgrade() else {
604                    return;
605                };
606                lookup(target, &discv5_handle, &metrics).await;
607            }
608
609            tokio::time::sleep(pulse_lookup_interval).await;
610        }
611
612        // initiate regular lookups to populate kbuckets
613        loop {
614            // make sure node is connected to each subtree in the network by target
615            // selection (ref kademlia)
616            let target = get_lookup_target(kbucket_index, local_node_id);
617
618            trace!(target: "net::discv5",
619                %target,
620                lookup_interval=format!("{:#?}", lookup_interval),
621                "starting periodic lookup query"
622            );
623
624            {
625                let Some(discv5_handle) = discv5.upgrade() else {
626                    return;
627                };
628                lookup(target, &discv5_handle, &metrics).await;
629            }
630
631            if kbucket_index > DEFAULT_MIN_TARGET_KBUCKET_INDEX {
632                // try to populate bucket one step closer
633                kbucket_index -= 1
634            } else {
635                // start over with bucket furthest away
636                kbucket_index = MAX_KBUCKET_INDEX
637            }
638
639            tokio::time::sleep(lookup_interval).await;
640        }
641    });
642}
643
644/// Gets the next lookup target, based on which bucket is currently being targeted.
645pub fn get_lookup_target(
646    kbucket_index: usize,
647    local_node_id: discv5::enr::NodeId,
648) -> discv5::enr::NodeId {
649    // init target
650    let mut target = local_node_id.raw();
651
652    // make sure target has a 'log2distance'-long suffix that differs from local node id
653    let bit_offset = MAX_KBUCKET_INDEX.saturating_sub(kbucket_index);
654    let (byte, bit) = (bit_offset / 8, bit_offset % 8);
655    // Flip the target bit.
656    target[byte] ^= 1 << (7 - bit);
657
658    // Randomize the bits after the target.
659    let mut rng = rand::rng();
660    // Randomize remaining bits in the byte we modified.
661    if bit < 7 {
662        // Compute the mask of the bits that need to be randomized.
663        let bits_to_randomize = 0xff >> (bit + 1);
664        // Clear.
665        target[byte] &= !bits_to_randomize;
666        // Randomize.
667        target[byte] |= rng.random::<u8>() & bits_to_randomize;
668    }
669    // Randomize remaining bytes.
670    rng.fill_bytes(&mut target[byte + 1..]);
671
672    target.into()
673}
674
675/// Runs a [`discv5::Discv5`] lookup query.
676pub async fn lookup(
677    target: discv5::enr::NodeId,
678    discv5: &discv5::Discv5,
679    metrics: &DiscoveredPeersMetrics,
680) {
681    metrics.set_total_sessions(discv5.metrics().active_sessions);
682    metrics.set_total_kbucket_peers(
683        discv5.with_kbuckets(|kbuckets| kbuckets.read().iter_ref().count()),
684    );
685
686    match discv5.find_node(target).await {
687        Err(err) => trace!(target: "net::discv5",
688            %err,
689            "lookup query failed"
690        ),
691        Ok(peers) => trace!(target: "net::discv5",
692            target=format!("{:#?}", target),
693            peers_count=peers.len(),
694            peers=format!("[{:#}]", peers.iter()
695                .map(|enr| enr.node_id()
696            ).format(", ")),
697            "peers returned by lookup query"
698        ),
699    }
700
701    // `Discv5::connected_peers` can be subset of sessions, not all peers make it
702    // into kbuckets, e.g. incoming sessions from peers with
703    // unreachable enrs
704    debug!(target: "net::discv5",
705        connected_peers=discv5.connected_peers(),
706        "connected peers in routing table"
707    );
708}
709
710#[cfg(test)]
711mod test {
712    #![allow(deprecated)]
713    use super::*;
714    use ::enr::{CombinedKey, EnrKey};
715    use discv5::ListenConfig;
716    use rand_08::thread_rng;
717    use reth_chainspec::MAINNET;
718    use std::{
719        net::UdpSocket,
720        time::{Duration, Instant},
721    };
722    use tracing::trace;
723
724    fn discv5_noop() -> Discv5 {
725        let sk = CombinedKey::generate_secp256k1();
726        Discv5 {
727            discv5: Arc::new(
728                discv5::Discv5::new(
729                    Enr::empty(&sk).unwrap(),
730                    sk,
731                    discv5::ConfigBuilder::new(DEFAULT_DISCOVERY_V5_LISTEN_CONFIG).build(),
732                )
733                .unwrap(),
734            ),
735            rlpx_ip_mode: IpMode::Ip4,
736            fork_key: None,
737            discovered_peer_filter: MustNotIncludeKeys::default(),
738            metrics: Discv5Metrics::default(),
739            local_node_record: NodeRecord::new(
740                (Ipv4Addr::LOCALHOST, 30303).into(),
741                PeerId::random(),
742            ),
743        }
744    }
745
746    async fn start_discovery_node(udp_port_discv5: u16) -> (Discv5, mpsc::Receiver<discv5::Event>) {
747        let secret_key = SecretKey::new(&mut thread_rng());
748
749        let discv5_addr: SocketAddr = format!("127.0.0.1:{udp_port_discv5}").parse().unwrap();
750        let rlpx_addr: SocketAddr = "127.0.0.1:30303".parse().unwrap();
751
752        let discv5_listen_config = ListenConfig::from(discv5_addr);
753        let discv5_config = Config::builder(rlpx_addr)
754            .discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build())
755            .build();
756
757        Discv5::start(&secret_key, discv5_config).await.expect("should build discv5")
758    }
759
760    async fn start_discovery_node_with_key(
761        secret_key: &SecretKey,
762        udp_port_discv5: u16,
763    ) -> Result<(Discv5, mpsc::Receiver<discv5::Event>), Error> {
764        let discv5_addr: SocketAddr = format!("127.0.0.1:{udp_port_discv5}").parse().unwrap();
765        let rlpx_addr: SocketAddr = "127.0.0.1:30303".parse().unwrap();
766
767        let discv5_listen_config = ListenConfig::from(discv5_addr);
768        let discv5_config = Config::builder(rlpx_addr)
769            .discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build())
770            .build();
771
772        Discv5::start(secret_key, discv5_config).await
773    }
774
775    fn unused_udp_port() -> u16 {
776        UdpSocket::bind("127.0.0.1:0").unwrap().local_addr().unwrap().port()
777    }
778
779    async fn wait_for_udp_port_release(port: u16, timeout: Duration) {
780        let deadline = Instant::now() + timeout;
781
782        loop {
783            match UdpSocket::bind(("127.0.0.1", port)) {
784                Ok(socket) => {
785                    drop(socket);
786                    return;
787                }
788                Err(err) if Instant::now() < deadline => {
789                    trace!(target: "net::discv5::test", %port, %err, "waiting for discv5 port release");
790                    tokio::time::sleep(Duration::from_millis(10)).await;
791                }
792                Err(err) => panic!("discv5 did not release port {port} before timeout: {err}"),
793            }
794        }
795    }
796
797    #[tokio::test(flavor = "multi_thread")]
798    async fn discv5_releases_port_on_drop() {
799        reth_tracing::init_test_tracing();
800
801        let secret_key = SecretKey::new(&mut thread_rng());
802        let port = unused_udp_port();
803
804        let (node, updates) =
805            start_discovery_node_with_key(&secret_key, port).await.expect("should start discv5");
806        drop(updates);
807        drop(node);
808
809        wait_for_udp_port_release(port, Duration::from_secs(1)).await;
810
811        let restarted = start_discovery_node_with_key(&secret_key, port).await;
812        assert!(restarted.is_ok(), "discv5 failed to rebind dropped port: {restarted:?}");
813    }
814
815    #[tokio::test(flavor = "multi_thread")]
816    async fn discv5() {
817        reth_tracing::init_test_tracing();
818
819        // rig test
820
821        // rig node_1
822        let (node_1, mut stream_1) = start_discovery_node(30344).await;
823        let node_1_enr = node_1.with_discv5(|discv5| discv5.local_enr());
824
825        // rig node_2
826        let (node_2, mut stream_2) = start_discovery_node(30355).await;
827        let node_2_enr = node_2.with_discv5(|discv5| discv5.local_enr());
828
829        trace!(target: "net::discv5::test",
830            node_1_node_id=format!("{:#}", node_1_enr.node_id()),
831            node_2_node_id=format!("{:#}", node_2_enr.node_id()),
832            "started nodes"
833        );
834
835        // test
836
837        // add node_2 to discovery handle of node_1 (should add node to discv5 kbuckets)
838        let node_2_enr_reth_compatible_ty: Enr<SecretKey> =
839            EnrCombinedKeyWrapper(node_2_enr.clone()).into();
840        node_1.add_node(node_2_enr_reth_compatible_ty).unwrap();
841
842        // verify node_2 is in KBuckets of node_1:discv5
843        assert!(
844            node_1.with_discv5(|discv5| discv5.table_entries_id().contains(&node_2_enr.node_id()))
845        );
846
847        // manually trigger connection from node_1 to node_2
848        node_1.with_discv5(|discv5| discv5.send_ping(node_2_enr.clone())).await.unwrap();
849
850        // verify node_1:discv5 is connected to node_2:discv5 and vv
851        let event_1_v5 = stream_1.recv().await.unwrap();
852
853        assert!(matches!(
854            event_1_v5,
855            discv5::Event::SessionEstablished(node, socket) if node == node_2_enr && socket == node_2_enr.udp4_socket().unwrap().into()
856        ));
857
858        // verify node_1 is in KBuckets of node_2:discv5
859        let event_2_v5 = stream_2.recv().await.unwrap();
860        assert!(matches!(
861            event_2_v5,
862            discv5::Event::NodeInserted { node_id, replaced } if node_id == node_1_enr.node_id() && replaced.is_none()
863        ));
864    }
865
866    #[test]
867    fn discovered_enr_disc_socket_missing() {
868        reth_tracing::init_test_tracing();
869
870        // rig test
871        const REMOTE_RLPX_PORT: u16 = 30303;
872        let remote_socket = "104.28.44.25:9000".parse().unwrap();
873        let remote_key = CombinedKey::generate_secp256k1();
874        let remote_enr = Enr::builder().tcp4(REMOTE_RLPX_PORT).build(&remote_key).unwrap();
875
876        let discv5 = discv5_noop();
877
878        // test
879        let filtered_peer = discv5.on_discovered_peer(&remote_enr, remote_socket);
880
881        assert_eq!(
882            NodeRecord {
883                address: remote_socket.ip(),
884                udp_port: remote_socket.port(),
885                tcp_port: REMOTE_RLPX_PORT,
886                id: enr_to_discv4_id(&remote_enr).unwrap(),
887            },
888            filtered_peer.unwrap().node_record
889        )
890    }
891
892    // Copied from sigp/discv5 with slight modification (U256 type)
893    // <https://github.com/sigp/discv5/blob/master/src/kbucket/key.rs#L89-L101>
894    #[expect(unreachable_pub)]
895    #[expect(unused)]
896    mod sigp {
897        use alloy_primitives::U256;
898        use enr::{
899            k256::sha2::digest::generic_array::{typenum::U32, GenericArray},
900            NodeId,
901        };
902
903        /// A `Key` is a cryptographic hash, identifying both the nodes participating in
904        /// the Kademlia DHT, as well as records stored in the DHT.
905        ///
906        /// The set of all `Key`s defines the Kademlia keyspace.
907        ///
908        /// `Key`s have an XOR metric as defined in the Kademlia paper, i.e. the bitwise XOR of
909        /// the hash digests, interpreted as an integer. See [`Key::distance`].
910        ///
911        /// A `Key` preserves the preimage of type `T` of the hash function. See [`Key::preimage`].
912        #[derive(Clone, Debug)]
913        pub struct Key<T> {
914            preimage: T,
915            hash: GenericArray<u8, U32>,
916        }
917
918        impl<T> PartialEq for Key<T> {
919            fn eq(&self, other: &Self) -> bool {
920                self.hash == other.hash
921            }
922        }
923
924        impl<T> Eq for Key<T> {}
925
926        impl<TPeerId> AsRef<Self> for Key<TPeerId> {
927            fn as_ref(&self) -> &Self {
928                self
929            }
930        }
931
932        impl<T> Key<T> {
933            /// Construct a new `Key` by providing the raw 32 byte hash.
934            pub const fn new_raw(preimage: T, hash: GenericArray<u8, U32>) -> Self {
935                Self { preimage, hash }
936            }
937
938            /// Borrows the preimage of the key.
939            pub const fn preimage(&self) -> &T {
940                &self.preimage
941            }
942
943            /// Converts the key into its preimage.
944            pub fn into_preimage(self) -> T {
945                self.preimage
946            }
947
948            /// Computes the distance of the keys according to the XOR metric.
949            pub fn distance<U>(&self, other: &Key<U>) -> Distance {
950                let a = U256::from_be_slice(self.hash.as_slice());
951                let b = U256::from_be_slice(other.hash.as_slice());
952                Distance(a ^ b)
953            }
954
955            // Used in the FINDNODE query outside of the k-bucket implementation.
956            /// Computes the integer log-2 distance between two keys, assuming a 256-bit
957            /// key. The output returns None if the key's are identical. The range is 1-256.
958            pub fn log2_distance<U>(&self, other: &Key<U>) -> Option<u64> {
959                let xor_dist = self.distance(other);
960                let log_dist = (256 - xor_dist.0.leading_zeros() as u64);
961                (log_dist != 0).then_some(log_dist)
962            }
963        }
964
965        impl From<NodeId> for Key<NodeId> {
966            fn from(node_id: NodeId) -> Self {
967                Self { preimage: node_id, hash: *GenericArray::from_slice(&node_id.raw()) }
968            }
969        }
970
971        /// A distance between two `Key`s.
972        #[derive(Copy, Clone, PartialEq, Eq, Default, PartialOrd, Ord, Debug)]
973        pub struct Distance(pub(super) U256);
974    }
975
976    #[test]
977    fn select_lookup_target() {
978        for bucket_index in 0..=MAX_KBUCKET_INDEX {
979            let sk = CombinedKey::generate_secp256k1();
980            let local_node_id = discv5::enr::NodeId::from(sk.public());
981            let target = get_lookup_target(bucket_index, local_node_id);
982
983            let local_node_id = sigp::Key::from(local_node_id);
984            let target = sigp::Key::from(target);
985
986            assert_eq!(local_node_id.log2_distance(&target), Some(bucket_index as u64 + 1));
987        }
988    }
989
990    #[test]
991    fn build_enr_from_config() {
992        const TCP_PORT: u16 = 30303;
993        let fork_id = MAINNET.latest_fork_id();
994
995        let config = Config::builder((Ipv4Addr::UNSPECIFIED, TCP_PORT).into())
996            .fork(NetworkStackId::ETH, fork_id)
997            .build();
998
999        let sk = SecretKey::new(&mut thread_rng());
1000        let (enr, _, _, _) = build_local_enr(&sk, &config);
1001
1002        let decoded_fork_id = enr
1003            .get_decodable::<EnrForkIdEntry>(NetworkStackId::ETH)
1004            .unwrap()
1005            .map(Into::into)
1006            .unwrap();
1007
1008        assert_eq!(fork_id, decoded_fork_id);
1009        assert_eq!(TCP_PORT, enr.tcp4().unwrap()); // listen config is defaulting to ip mode ipv4
1010    }
1011
1012    #[test]
1013    fn get_fork_id_with_different_network_stack_ids() {
1014        let fork_id = MAINNET.latest_fork_id();
1015        let sk = SecretKey::new(&mut thread_rng());
1016
1017        // Test 1: ENR with OPEL fork ID, Discv5 configured for OPEL
1018        let enr_with_opel = Enr::builder()
1019            .add_value_rlp(
1020                NetworkStackId::OPEL,
1021                alloy_rlp::encode(EnrForkIdEntry::from(fork_id)).into(),
1022            )
1023            .build(&sk)
1024            .unwrap();
1025
1026        let mut discv5 = discv5_noop();
1027        discv5.fork_key = Some(NetworkStackId::OPEL);
1028        assert_eq!(discv5.get_fork_id(&enr_with_opel).unwrap(), fork_id);
1029
1030        // Test 2: ENR with ETH fork ID, Discv5 configured for OPEL (fallback to ETH)
1031        let enr_with_eth = Enr::builder()
1032            .add_value_rlp(
1033                NetworkStackId::ETH,
1034                alloy_rlp::encode(EnrForkIdEntry::from(fork_id)).into(),
1035            )
1036            .build(&sk)
1037            .unwrap();
1038
1039        discv5.fork_key = Some(NetworkStackId::OPEL);
1040        assert_eq!(discv5.get_fork_id(&enr_with_eth).unwrap(), fork_id);
1041
1042        // Test 3: ENR with neither OPEL nor ETH fork ID (should fail)
1043        let enr_without_network_stack_id = Enr::empty(&sk).unwrap();
1044        discv5.fork_key = Some(NetworkStackId::OPEL);
1045        assert!(matches!(
1046            discv5.get_fork_id(&enr_without_network_stack_id),
1047            Err(Error::ForkMissing(NetworkStackId::OPEL))
1048        ));
1049
1050        // Test 4: discv5 without network stack id configured (should fail)
1051        let discv5 = discv5_noop();
1052        assert!(matches!(
1053            discv5.get_fork_id(&enr_without_network_stack_id),
1054            Err(Error::NetworkStackIdNotConfigured)
1055        ));
1056    }
1057}