Skip to main content

reth_discv4/
lib.rs

1//! Discovery v4 implementation: <https://github.com/ethereum/devp2p/blob/master/discv4.md>
2//!
3//! Discv4 employs a kademlia-like routing table to store and manage discovered peers and topics.
4//! The protocol allows for external IP discovery in NAT environments through regular PING/PONG's
5//! with discovered nodes. Nodes return the external IP address that they have received and a simple
6//! majority is chosen as our external IP address. If an external IP address is updated, this is
7//! produced as an event to notify the swarm (if one is used for this behaviour).
8//!
9//! This implementation consists of a [`Discv4`] and [`Discv4Service`] pair. The service manages the
10//! state and drives the UDP socket. The (optional) [`Discv4`] serves as the frontend to interact
11//! with the service via a channel. Whenever the underlying table changes service produces a
12//! [`DiscoveryUpdate`] that listeners will receive.
13//!
14//! ## Feature Flags
15//!
16//! - `serde` (default): Enable serde support
17//! - `test-utils`: Export utilities for testing
18
19#![doc(
20    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
21    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
22    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
23)]
24#![cfg_attr(not(test), warn(unused_crate_dependencies))]
25#![cfg_attr(docsrs, feature(doc_cfg))]
26
27use crate::{
28    error::{DecodePacketError, Discv4Error},
29    proto::{FindNode, Message, Neighbours, Packet, Ping, Pong},
30};
31use alloy_primitives::{bytes::Bytes, hex, B256};
32use discv5::{
33    kbucket,
34    kbucket::{
35        BucketInsertResult, Distance, Entry as BucketEntry, InsertResult, KBucketsTable,
36        NodeStatus, MAX_NODES_PER_BUCKET,
37    },
38    ConnectionDirection, ConnectionState,
39};
40use enr::Enr;
41use itertools::Itertools;
42use parking_lot::Mutex;
43use proto::{EnrRequest, EnrResponse};
44use reth_ethereum_forks::ForkId;
45use reth_network_peers::{pk2id, PeerId};
46use secp256k1::SecretKey;
47use std::{
48    cell::RefCell,
49    collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque},
50    fmt,
51    future::poll_fn,
52    io,
53    net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
54    pin::Pin,
55    rc::Rc,
56    sync::Arc,
57    task::{ready, Context, Poll},
58    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
59};
60use tokio::{
61    net::UdpSocket,
62    sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::Sender as OneshotSender},
63    task::{JoinHandle, JoinSet},
64    time::Interval,
65};
66use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
67use tracing::{debug, trace};
68
69pub mod error;
70pub mod proto;
71
72mod config;
73pub use config::{Discv4Config, Discv4ConfigBuilder};
74
75mod node;
76use node::{kad_key, NodeKey};
77
78mod table;
79
80// reexport NodeRecord primitive
81pub use reth_network_peers::NodeRecord;
82
83#[cfg(any(test, feature = "test-utils"))]
84pub mod test_utils;
85
86use crate::table::PongTable;
87use reth_net_nat::ResolveNatInterval;
88/// reexport to get public ip.
89pub use reth_net_nat::{external_ip, NatResolver};
90
91/// The default address for discv4 via UDP
92///
93/// Note: the default TCP address is the same.
94pub const DEFAULT_DISCOVERY_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
95
96/// The default port for discv4 via UDP
97///
98/// Note: the default TCP port is the same.
99pub const DEFAULT_DISCOVERY_PORT: u16 = 30303;
100
101/// The default address for discv4 via UDP: "0.0.0.0:30303"
102///
103/// Note: The default TCP address is the same.
104pub const DEFAULT_DISCOVERY_ADDRESS: SocketAddr =
105    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT));
106
107/// The maximum size of any packet is 1280 bytes.
108const MAX_PACKET_SIZE: usize = 1280;
109
110/// Length of the UDP datagram packet-header: Hash(32b) + Signature(65b) + Packet Type(1b)
111const MIN_PACKET_SIZE: usize = 32 + 65 + 1;
112
113/// Concurrency factor for `FindNode` requests to pick `ALPHA` closest nodes, <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>
114const ALPHA: usize = 3;
115
116/// Maximum number of nodes to ping at concurrently.
117///
118/// This corresponds to 2 full `Neighbours` responses with 16 _new_ nodes. This will apply some
119/// backpressure in recursive lookups.
120const MAX_NODES_PING: usize = 2 * MAX_NODES_PER_BUCKET;
121
122/// Maximum number of pings to keep queued.
123///
124/// If we are currently sending too many pings, any new pings will be queued. To prevent unbounded
125/// growth of the queue, the queue has a maximum capacity, after which any additional pings will be
126/// discarded.
127///
128/// This corresponds to 2 full `Neighbours` responses with 16 new nodes.
129const MAX_QUEUED_PINGS: usize = 2 * MAX_NODES_PER_BUCKET;
130
131/// The size of the datagram is limited [`MAX_PACKET_SIZE`], 16 nodes, as the discv4 specifies don't
132/// fit in one datagram. The safe number of nodes that always fit in a datagram is 12, with worst
133/// case all of them being IPv6 nodes. This is calculated by `(MAX_PACKET_SIZE - (header + expire +
134/// rlp overhead) / size(rlp(Node_IPv6))`
135/// Even in the best case where all nodes are IPv4, only 14 nodes fit into one packet.
136const SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS: usize = (MAX_PACKET_SIZE - 109) / 91;
137
138/// The timeout used to identify expired nodes, 24h
139///
140/// Mirrors geth's `bondExpiration` of 24h
141const ENDPOINT_PROOF_EXPIRATION: Duration = Duration::from_secs(24 * 60 * 60);
142
143/// Duration used to expire nodes from the routing table 1hr
144const EXPIRE_DURATION: Duration = Duration::from_secs(60 * 60);
145
146// Restricts how many udp messages can be processed in a single [Discv4Service::poll] call.
147//
148// This will act as a manual yield point when draining the socket messages where the most CPU
149// expensive part is handling outgoing messages: encoding and hashing the packet
150const UDP_MESSAGE_POLL_LOOP_BUDGET: i32 = 4;
151
152type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>;
153type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>;
154
155pub(crate) type IngressSender = mpsc::Sender<IngressEvent>;
156pub(crate) type IngressReceiver = mpsc::Receiver<IngressEvent>;
157
158type NodeRecordSender = OneshotSender<Vec<NodeRecord>>;
159
160/// The Discv4 frontend.
161///
162/// This is a cloneable type that communicates with the [`Discv4Service`] by sending commands over a
163/// shared channel.
164///
165/// See also [`Discv4::spawn`]
166#[derive(Debug, Clone)]
167pub struct Discv4 {
168    /// The address of the udp socket
169    local_addr: SocketAddr,
170    /// channel to send commands over to the service
171    to_service: mpsc::UnboundedSender<Discv4Command>,
172    /// Tracks the local node record.
173    ///
174    /// This includes the currently tracked external IP address of the node.
175    node_record: Arc<Mutex<NodeRecord>>,
176}
177
178impl Discv4 {
179    /// Same as [`Self::bind`] but also spawns the service onto a new task.
180    ///
181    /// See also: [`Discv4Service::spawn()`]
182    pub async fn spawn(
183        local_address: SocketAddr,
184        local_enr: NodeRecord,
185        secret_key: SecretKey,
186        config: Discv4Config,
187    ) -> io::Result<Self> {
188        let (discv4, service) = Self::bind(local_address, local_enr, secret_key, config).await?;
189
190        service.spawn();
191
192        Ok(discv4)
193    }
194
195    /// Returns a new instance with the given channel directly
196    ///
197    /// NOTE: this is only intended for test setups.
198    #[cfg(feature = "test-utils")]
199    pub fn noop() -> Self {
200        let (to_service, _rx) = mpsc::unbounded_channel();
201        let local_addr =
202            (IpAddr::from(std::net::Ipv4Addr::UNSPECIFIED), DEFAULT_DISCOVERY_PORT).into();
203        Self {
204            local_addr,
205            to_service,
206            node_record: Arc::new(Mutex::new(NodeRecord::new(
207                "127.0.0.1:3030".parse().unwrap(),
208                PeerId::random(),
209            ))),
210        }
211    }
212
213    /// Binds a new `UdpSocket` and creates the service
214    ///
215    /// ```
216    /// use reth_discv4::{Discv4, Discv4Config};
217    /// use reth_network_peers::{pk2id, NodeRecord, PeerId};
218    /// use secp256k1::SECP256K1;
219    /// use std::{net::SocketAddr, str::FromStr};
220    /// # async fn t() -> std:: io::Result<()> {
221    ///
222    /// // generate a (random) keypair
223    /// let (secret_key, pk) = SECP256K1.generate_keypair(&mut rand_08::thread_rng());
224    /// let id = pk2id(&pk);
225    ///
226    /// let socket = SocketAddr::from_str("0.0.0.0:0").unwrap();
227    /// let local_enr =
228    ///     NodeRecord { address: socket.ip(), tcp_port: socket.port(), udp_port: socket.port(), id };
229    /// let config = Discv4Config::default();
230    ///
231    /// let (discv4, mut service) = Discv4::bind(socket, local_enr, secret_key, config).await.unwrap();
232    ///
233    /// // get an update strea
234    /// let updates = service.update_stream();
235    ///
236    /// let _handle = service.spawn();
237    ///
238    /// // lookup the local node in the DHT
239    /// let _discovered = discv4.lookup_self().await.unwrap();
240    ///
241    /// # Ok(())
242    /// # }
243    /// ```
244    pub async fn bind(
245        local_address: SocketAddr,
246        mut local_node_record: NodeRecord,
247        secret_key: SecretKey,
248        config: Discv4Config,
249    ) -> io::Result<(Self, Discv4Service)> {
250        let socket = UdpSocket::bind(local_address).await?;
251        let local_addr = socket.local_addr()?;
252        local_node_record.udp_port = local_addr.port();
253        trace!(target: "discv4", ?local_addr,"opened UDP socket");
254
255        let mut service =
256            Discv4Service::new(socket, local_addr, local_node_record, secret_key, config);
257
258        // resolve the external address immediately
259        service.resolve_external_ip();
260
261        let discv4 = service.handle();
262        Ok((discv4, service))
263    }
264
265    /// Returns the address of the UDP socket.
266    pub const fn local_addr(&self) -> SocketAddr {
267        self.local_addr
268    }
269
270    /// Returns the [`NodeRecord`] of the local node.
271    ///
272    /// This includes the currently tracked external IP address of the node.
273    pub fn node_record(&self) -> NodeRecord {
274        *self.node_record.lock()
275    }
276
277    /// Returns the currently tracked external IP of the node.
278    pub fn external_ip(&self) -> IpAddr {
279        self.node_record.lock().address
280    }
281
282    /// Sets the [Interval] used for periodically looking up targets over the network
283    pub fn set_lookup_interval(&self, duration: Duration) {
284        self.send_to_service(Discv4Command::SetLookupInterval(duration))
285    }
286
287    /// Starts a `FindNode` recursive lookup that locates the closest nodes to the given node id. See also: <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>
288    ///
289    /// The lookup initiator starts by picking α closest nodes to the target it knows of. The
290    /// initiator then sends concurrent `FindNode` packets to those nodes. α is a system-wide
291    /// concurrency parameter, such as 3. In the recursive step, the initiator resends `FindNode` to
292    /// nodes it has learned about from previous queries. Of the k nodes the initiator has heard of
293    /// closest to the target, it picks α that it has not yet queried and resends `FindNode` to
294    /// them. Nodes that fail to respond quickly are removed from consideration until and unless
295    /// they do respond.
296    //
297    // If a round of FindNode queries fails to return a node any closer than the closest already
298    // seen, the initiator resends the find node to all of the k closest nodes it has not already
299    // queried. The lookup terminates when the initiator has queried and gotten responses from the k
300    // closest nodes it has seen.
301    pub async fn lookup_self(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
302        self.lookup_node(None).await
303    }
304
305    /// Looks up the given node id.
306    ///
307    /// Returning the closest nodes to the given node id.
308    pub async fn lookup(&self, node_id: PeerId) -> Result<Vec<NodeRecord>, Discv4Error> {
309        self.lookup_node(Some(node_id)).await
310    }
311
312    /// Performs a random lookup for node records.
313    pub async fn lookup_random(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
314        let target = PeerId::random();
315        self.lookup_node(Some(target)).await
316    }
317
318    /// Sends a message to the service to lookup the closest nodes
319    pub fn send_lookup(&self, node_id: PeerId) {
320        let cmd = Discv4Command::Lookup { node_id: Some(node_id), tx: None };
321        self.send_to_service(cmd);
322    }
323
324    async fn lookup_node(&self, node_id: Option<PeerId>) -> Result<Vec<NodeRecord>, Discv4Error> {
325        let (tx, rx) = oneshot::channel();
326        let cmd = Discv4Command::Lookup { node_id, tx: Some(tx) };
327        self.to_service.send(cmd)?;
328        Ok(rx.await?)
329    }
330
331    /// Triggers a new self lookup without expecting a response
332    pub fn send_lookup_self(&self) {
333        let cmd = Discv4Command::Lookup { node_id: None, tx: None };
334        self.send_to_service(cmd);
335    }
336
337    /// Removes the peer from the table, if it exists.
338    pub fn remove_peer(&self, node_id: PeerId) {
339        let cmd = Discv4Command::Remove(node_id);
340        self.send_to_service(cmd);
341    }
342
343    /// Adds the node to the table, if it is not already present.
344    pub fn add_node(&self, node_record: NodeRecord) {
345        let cmd = Discv4Command::Add(node_record);
346        self.send_to_service(cmd);
347    }
348
349    /// Adds the node as a bootnode.
350    ///
351    /// This registers the node in the configured bootstrap set and inserts it into the routing
352    /// table, pinging it to establish the endpoint proof, same as the nodes provided at startup.
353    pub fn add_boot_node(&self, node_record: NodeRecord) {
354        let cmd = Discv4Command::AddBootNode(node_record);
355        self.send_to_service(cmd);
356    }
357
358    /// Adds the peer and id to the ban list.
359    ///
360    /// This will prevent any future inclusion in the table
361    pub fn ban(&self, node_id: PeerId, ip: IpAddr) {
362        let cmd = Discv4Command::Ban(node_id, ip);
363        self.send_to_service(cmd);
364    }
365
366    /// Adds the ip to the ban list.
367    ///
368    /// This will prevent any future inclusion in the table
369    pub fn ban_ip(&self, ip: IpAddr) {
370        let cmd = Discv4Command::BanIp(ip);
371        self.send_to_service(cmd);
372    }
373
374    /// Adds the peer to the ban list.
375    ///
376    /// This will prevent any future inclusion in the table
377    pub fn ban_node(&self, node_id: PeerId) {
378        let cmd = Discv4Command::BanPeer(node_id);
379        self.send_to_service(cmd);
380    }
381
382    /// Sets the tcp port
383    ///
384    /// This will update our [`NodeRecord`]'s tcp port.
385    pub fn set_tcp_port(&self, port: u16) {
386        let cmd = Discv4Command::SetTcpPort(port);
387        self.send_to_service(cmd);
388    }
389
390    /// Sets the pair in the EIP-868 [`Enr`] of the node.
391    ///
392    /// If the key already exists, this will update it.
393    ///
394    /// CAUTION: The value **must** be rlp encoded
395    pub fn set_eip868_rlp_pair(&self, key: Vec<u8>, rlp: Bytes) {
396        let cmd = Discv4Command::SetEIP868RLPPair { key, rlp };
397        self.send_to_service(cmd);
398    }
399
400    /// Sets the pair in the EIP-868 [`Enr`] of the node.
401    ///
402    /// If the key already exists, this will update it.
403    pub fn set_eip868_rlp(&self, key: Vec<u8>, value: impl alloy_rlp::Encodable) {
404        self.set_eip868_rlp_pair(key, Bytes::from(alloy_rlp::encode(&value)))
405    }
406
407    #[inline]
408    fn send_to_service(&self, cmd: Discv4Command) {
409        let _ = self.to_service.send(cmd).map_err(|err| {
410            debug!(
411                target: "discv4",
412                %err,
413                "channel capacity reached, dropping command",
414            )
415        });
416    }
417
418    /// Returns the receiver half of new listener channel that streams [`DiscoveryUpdate`]s.
419    pub async fn update_stream(&self) -> Result<ReceiverStream<DiscoveryUpdate>, Discv4Error> {
420        let (tx, rx) = oneshot::channel();
421        let cmd = Discv4Command::Updates(tx);
422        self.to_service.send(cmd)?;
423        Ok(rx.await?)
424    }
425
426    /// Terminates the spawned [`Discv4Service`].
427    pub fn terminate(&self) {
428        self.send_to_service(Discv4Command::Terminated);
429    }
430}
431
432/// Manages discv4 peer discovery over UDP.
433///
434/// This is a [Stream] to handles incoming and outgoing discv4 messages and emits updates via:
435/// [`Discv4Service::update_stream`].
436///
437/// This type maintains the discv Kademlia routing table and is responsible for performing lookups.
438///
439/// ## Lookups
440///
441/// See also [Recursive Lookups](https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup).
442/// Lookups are either triggered periodically or performaned on demand: [`Discv4::lookup`]
443/// Newly discovered nodes are emitted as [`DiscoveryUpdate::Added`] event to all subscribers:
444/// [`Discv4Service::update_stream`].
445#[must_use = "Stream does nothing unless polled"]
446pub struct Discv4Service {
447    /// Local address of the UDP socket.
448    local_address: SocketAddr,
449    /// The local ENR for EIP-868 <https://eips.ethereum.org/EIPS/eip-868>
450    local_eip_868_enr: Enr<SecretKey>,
451    /// Local ENR of the server.
452    local_node_record: NodeRecord,
453    /// Keeps track of the node record of the local node.
454    shared_node_record: Arc<Mutex<NodeRecord>>,
455    /// The secret key used to sign payloads
456    secret_key: SecretKey,
457    /// The UDP socket for sending and receiving messages.
458    _socket: Arc<UdpSocket>,
459    /// The spawned UDP tasks.
460    ///
461    /// Note: If dropped, the spawned send+receive tasks are aborted.
462    _tasks: JoinSet<()>,
463    /// The routing table.
464    kbuckets: KBucketsTable<NodeKey, NodeEntry>,
465    /// Receiver for incoming messages
466    ///
467    /// Receives incoming messages from the UDP task.
468    ingress: IngressReceiver,
469    /// Sender for sending outgoing messages
470    ///
471    /// Sends outgoing messages to the UDP task.
472    egress: EgressSender,
473    /// Buffered pending pings to apply backpressure.
474    ///
475    /// Lookups behave like bursts of requests: Endpoint proof followed by `FindNode` request. [Recursive lookups](https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup) can trigger multiple followup Pings+FindNode requests.
476    /// A cap on concurrent `Ping` prevents escalation where: A large number of new nodes
477    /// discovered via `FindNode` in a recursive lookup triggers a large number of `Ping`s, and
478    /// followup `FindNode` requests.... Buffering them effectively prevents high `Ping` peaks.
479    queued_pings: VecDeque<(NodeRecord, PingReason)>,
480    /// Currently active pings to specific nodes.
481    pending_pings: HashMap<PeerId, PingRequest>,
482    /// Currently active endpoint proof verification lookups to specific nodes.
483    ///
484    /// Entries here means we've proven the peer's endpoint but haven't completed our end of the
485    /// endpoint proof
486    pending_lookup: HashMap<PeerId, (Instant, LookupContext)>,
487    /// Currently active `FindNode` requests
488    pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
489    /// Currently active ENR requests
490    pending_enr_requests: HashMap<PeerId, EnrRequestState>,
491    /// Copy of the sender half of the commands channel for [Discv4]
492    to_service: mpsc::UnboundedSender<Discv4Command>,
493    /// Receiver half of the commands channel for [Discv4]
494    commands_rx: mpsc::UnboundedReceiver<Discv4Command>,
495    /// All subscribers for table updates
496    update_listeners: Vec<mpsc::Sender<DiscoveryUpdate>>,
497    /// The interval when to trigger random lookups
498    lookup_interval: Interval,
499    /// Used to rotate targets to lookup
500    lookup_rotator: LookupTargetRotator,
501    /// Whether we still need to reset the lookup interval on the first bootnode pong.
502    pending_lookup_reset: bool,
503    /// Interval when to recheck active requests
504    evict_expired_requests_interval: Interval,
505    /// Interval when to resend pings.
506    ping_interval: Interval,
507    /// The interval at which to attempt resolving external IP again.
508    resolve_external_ip_interval: Option<ResolveNatInterval>,
509    /// How this services is configured
510    config: Discv4Config,
511    /// Buffered events populated during poll.
512    queued_events: VecDeque<Discv4Event>,
513    /// Keeps track of nodes from which we have received a `Pong` message.
514    received_pongs: PongTable,
515    /// Interval used to expire additionally tracked nodes
516    expire_interval: Interval,
517    /// Cached signed `FindNode` packet to avoid redundant ECDSA signing during lookups.
518    cached_find_node: Option<CachedFindNode>,
519}
520
521impl Discv4Service {
522    /// Create a new instance for a bound [`UdpSocket`].
523    pub(crate) fn new(
524        socket: UdpSocket,
525        local_address: SocketAddr,
526        local_node_record: NodeRecord,
527        secret_key: SecretKey,
528        config: Discv4Config,
529    ) -> Self {
530        let socket = Arc::new(socket);
531        let (ingress_tx, ingress_rx) = mpsc::channel(config.udp_ingress_message_buffer);
532        let (egress_tx, egress_rx) = mpsc::channel(config.udp_egress_message_buffer);
533        let mut tasks = JoinSet::<()>::new();
534
535        let udp = Arc::clone(&socket);
536        tasks.spawn(receive_loop(udp, ingress_tx, local_node_record.id));
537
538        let udp = Arc::clone(&socket);
539        tasks.spawn(send_loop(udp, egress_rx));
540
541        let kbuckets = KBucketsTable::new(
542            NodeKey::from(&local_node_record).into(),
543            Duration::from_secs(60),
544            MAX_NODES_PER_BUCKET,
545            None,
546            None,
547        );
548
549        let self_lookup_interval = tokio::time::interval(config.lookup_interval);
550
551        // Wait `ping_interval` and then start pinging every `ping_interval` because we want to wait
552        // for
553        let ping_interval = tokio::time::interval_at(
554            tokio::time::Instant::now() + config.ping_interval,
555            config.ping_interval,
556        );
557
558        let evict_expired_requests_interval = tokio::time::interval_at(
559            tokio::time::Instant::now() + config.request_timeout,
560            config.request_timeout,
561        );
562
563        let lookup_rotator = if config.enable_dht_random_walk {
564            LookupTargetRotator::default()
565        } else {
566            LookupTargetRotator::local_only()
567        };
568
569        // for EIP-868 construct an ENR
570        let local_eip_868_enr = {
571            let mut builder = Enr::builder();
572            builder.ip(local_node_record.address);
573            if local_node_record.address.is_ipv4() {
574                builder.udp4(local_node_record.udp_port);
575                builder.tcp4(local_node_record.tcp_port);
576            } else {
577                builder.udp6(local_node_record.udp_port);
578                builder.tcp6(local_node_record.tcp_port);
579            }
580
581            for (key, val) in &config.additional_eip868_rlp_pairs {
582                builder.add_value_rlp(key, val.clone());
583            }
584            builder.build(&secret_key).expect("v4 is set")
585        };
586
587        let (to_service, commands_rx) = mpsc::unbounded_channel();
588
589        let shared_node_record = Arc::new(Mutex::new(local_node_record));
590
591        Self {
592            local_address,
593            local_eip_868_enr,
594            local_node_record,
595            shared_node_record,
596            _socket: socket,
597            kbuckets,
598            secret_key,
599            _tasks: tasks,
600            ingress: ingress_rx,
601            egress: egress_tx,
602            queued_pings: VecDeque::with_capacity(MAX_QUEUED_PINGS),
603            pending_pings: Default::default(),
604            pending_lookup: Default::default(),
605            pending_find_nodes: Default::default(),
606            pending_enr_requests: Default::default(),
607            commands_rx,
608            to_service,
609            update_listeners: Vec::with_capacity(1),
610            lookup_interval: self_lookup_interval,
611            ping_interval,
612            evict_expired_requests_interval,
613            lookup_rotator,
614            pending_lookup_reset: config.enable_lookup,
615            resolve_external_ip_interval: config.resolve_external_ip_interval(),
616            config,
617            queued_events: Default::default(),
618            received_pongs: Default::default(),
619            expire_interval: tokio::time::interval(EXPIRE_DURATION),
620            cached_find_node: None,
621        }
622    }
623
624    /// Returns the frontend handle that can communicate with the service via commands.
625    pub fn handle(&self) -> Discv4 {
626        Discv4 {
627            local_addr: self.local_address,
628            to_service: self.to_service.clone(),
629            node_record: self.shared_node_record.clone(),
630        }
631    }
632
633    /// Returns the current enr sequence of the local record.
634    fn enr_seq(&self) -> Option<u64> {
635        self.config.enable_eip868.then(|| self.local_eip_868_enr.seq())
636    }
637
638    /// Sets the [Interval] used for periodically looking up targets over the network
639    pub fn set_lookup_interval(&mut self, duration: Duration) {
640        self.lookup_interval = tokio::time::interval(duration);
641    }
642
643    /// Sets the external Ip to the configured external IP if [`NatResolver::ExternalIp`] or
644    /// [`NatResolver::ExternalAddr`]. In the case of [`NatResolver::ExternalAddr`], it will return
645    /// the first IP address found for the domain associated with the discv4 UDP port.
646    fn resolve_external_ip(&mut self) {
647        if let Some(r) = &self.resolve_external_ip_interval &&
648            let Some(external_ip) =
649                r.resolver().clone().as_external_ip(self.local_node_record.udp_port)
650        {
651            self.set_external_ip_addr(external_ip);
652        }
653    }
654
655    /// Sets the given ip address as the node's external IP in the node record announced in
656    /// discovery
657    pub fn set_external_ip_addr(&mut self, external_ip: IpAddr) {
658        if self.local_node_record.address != external_ip {
659            debug!(target: "discv4", ?external_ip, "Updating external ip");
660            self.local_node_record.address = external_ip;
661            let _ = self.local_eip_868_enr.set_ip(external_ip, &self.secret_key);
662            let mut lock = self.shared_node_record.lock();
663            *lock = self.local_node_record;
664            debug!(target: "discv4", enr=?self.local_eip_868_enr, "Updated local ENR");
665        }
666    }
667
668    /// Returns the [`PeerId`] that identifies this node
669    pub const fn local_peer_id(&self) -> &PeerId {
670        &self.local_node_record.id
671    }
672
673    /// Returns the address of the UDP socket
674    pub const fn local_addr(&self) -> SocketAddr {
675        self.local_address
676    }
677
678    /// Returns the ENR of this service.
679    ///
680    /// Note: this will include the external address if resolved.
681    pub const fn local_enr(&self) -> NodeRecord {
682        self.local_node_record
683    }
684
685    /// Returns mutable reference to ENR for testing.
686    #[cfg(test)]
687    pub const fn local_enr_mut(&mut self) -> &mut NodeRecord {
688        &mut self.local_node_record
689    }
690
691    /// Returns true if the given `PeerId` is currently in the bucket
692    pub fn contains_node(&self, id: PeerId) -> bool {
693        let key = kad_key(id);
694        self.kbuckets.get_index(&key).is_some()
695    }
696
697    /// Bootstraps the local node to join the DHT.
698    ///
699    /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
700    /// own ID in the DHT. This introduces the local node to the other nodes
701    /// in the DHT and populates its routing table with the closest proven neighbours.
702    ///
703    /// This inserts the configured bootnodes into the routing table and pings them. Once the
704    /// endpoint proof succeeds (pong received), a [`DiscoveryUpdate::Added`] event is emitted,
705    /// same as with [`Self::add_node`].
706    ///
707    /// **Note:** This is a noop if there are no bootnodes.
708    pub fn bootstrap(&mut self) {
709        for record in self.config.bootstrap_nodes.clone() {
710            debug!(target: "discv4", ?record, "pinging boot node");
711            let key = kad_key(record.id);
712            let entry = NodeEntry::new(record);
713
714            // insert the boot node in the table
715            match self.kbuckets.insert_or_update(
716                &key,
717                entry,
718                NodeStatus {
719                    state: ConnectionState::Disconnected,
720                    direction: ConnectionDirection::Outgoing,
721                },
722            ) {
723                InsertResult::Failed(_) => {}
724                _ => {
725                    self.try_ping(record, PingReason::InitialInsert);
726                }
727            }
728        }
729    }
730
731    /// Adds the node to the bootstrap set and to the routing table.
732    ///
733    /// Behaves like [`Self::add_node`] but also registers the node in the configured bootstrap
734    /// set so it is used for subsequent bootstrap attempts.
735    pub fn add_boot_node(&mut self, record: NodeRecord) -> bool {
736        self.config.bootstrap_nodes.insert(record);
737        self.add_node(record)
738    }
739
740    /// Spawns this services onto a new task
741    ///
742    /// Note: requires a running tokio runtime
743    pub fn spawn(mut self) -> JoinHandle<()> {
744        tokio::task::spawn(async move {
745            self.bootstrap();
746
747            while let Some(event) = self.next().await {
748                trace!(target: "discv4", ?event, "processed");
749            }
750            trace!(target: "discv4", "service terminated");
751        })
752    }
753
754    /// Creates a new bounded channel for [`DiscoveryUpdate`]s.
755    pub fn update_stream(&mut self) -> ReceiverStream<DiscoveryUpdate> {
756        let (tx, rx) = mpsc::channel(512);
757        self.update_listeners.push(tx);
758        ReceiverStream::new(rx)
759    }
760
761    /// Looks up the local node in the DHT.
762    pub fn lookup_self(&mut self) {
763        self.lookup(self.local_node_record.id)
764    }
765
766    /// Looks up the given node in the DHT
767    ///
768    /// A `FindNode` packet requests information about nodes close to target. The target is a
769    /// 64-byte secp256k1 public key. When `FindNode` is received, the recipient should reply
770    /// with Neighbors packets containing the closest 16 nodes to target found in its local
771    /// table.
772    //
773    // To guard against traffic amplification attacks, Neighbors replies should only be sent if the
774    // sender of FindNode has been verified by the endpoint proof procedure.
775    pub fn lookup(&mut self, target: PeerId) {
776        self.lookup_with(target, None)
777    }
778
779    /// Starts the recursive lookup process for the given target, <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>.
780    ///
781    /// At first the `ALPHA` (==3, defined concurrency factor) nodes that are closest to the target
782    /// in the underlying DHT are selected to seed the lookup via `FindNode` requests. In the
783    /// recursive step, the initiator resends `FindNode` to nodes it has learned about from previous
784    /// queries.
785    ///
786    /// This takes an optional Sender through which all successfully discovered nodes are sent once
787    /// the request has finished.
788    fn lookup_with(&mut self, target: PeerId, tx: Option<NodeRecordSender>) {
789        trace!(target: "discv4", ?target, "Starting lookup");
790        let target_key = kad_key(target);
791
792        // Start a lookup context with the 16 (MAX_NODES_PER_BUCKET) closest nodes to which we have
793        // a valid endpoint proof
794        let ctx = LookupContext::new(
795            target_key.clone(),
796            self.kbuckets
797                .closest_values(&target_key)
798                .filter(|node| {
799                    node.value.has_endpoint_proof &&
800                        !self.pending_find_nodes.contains_key(&node.key.preimage().0)
801                })
802                .take(MAX_NODES_PER_BUCKET)
803                .map(|n| (target_key.distance(&n.key), n.value.record)),
804            tx,
805        );
806
807        // From those 16, pick the 3 closest to start the concurrent lookup.
808        let closest = ctx.closest(ALPHA);
809
810        if closest.is_empty() && self.pending_find_nodes.is_empty() {
811            // no closest nodes, and no lookup in progress: table is empty.
812            // This could happen if all records were deleted from the table due to missed pongs
813            // (e.g. connectivity problems over a long period of time, or issues during initial
814            // bootstrapping) so we attempt to bootstrap again
815            self.bootstrap();
816            return
817        }
818
819        trace!(target: "discv4", ?target, num = closest.len(), "Start lookup closest nodes");
820
821        for node in closest {
822            // here we still want to check against previous request failures and if necessary
823            // re-establish a new endpoint proof because it can be the case that the other node lost
824            // our entry and no longer has an endpoint proof on their end
825            self.find_node_checked(&node, ctx.clone());
826        }
827    }
828
829    /// Sends a new `FindNode` packet to the node with `target` as the lookup target.
830    ///
831    /// CAUTION: This expects there's a valid Endpoint proof to the given `node`.
832    fn find_node(&mut self, node: &NodeRecord, ctx: LookupContext) {
833        trace!(target: "discv4", ?node, lookup=?ctx.target(), "Sending FindNode");
834        ctx.mark_queried(node.id);
835        let (payload, hash) = self.find_node_packet(ctx.target());
836        let to = node.udp_addr();
837        trace!(target: "discv4", ?to, ?hash, "sending FindNode packet");
838        let _ = self.egress.try_send((payload, to)).map_err(|err| {
839            debug!(target: "discv4", %err, "dropped outgoing packet");
840        });
841        self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx));
842    }
843
844    /// Sends a new `FindNode` packet to the node with `target` as the lookup target but checks
845    /// whether we should send a new ping first to renew the endpoint proof by checking the
846    /// previously failed findNode requests. It could be that the node is no longer reachable or
847    /// lost our entry.
848    fn find_node_checked(&mut self, node: &NodeRecord, ctx: LookupContext) {
849        let max_failures = self.config.max_find_node_failures;
850        let needs_ping = self
851            .on_entry(node.id, |entry| entry.exceeds_find_node_failures(max_failures))
852            .unwrap_or(true);
853        if needs_ping {
854            self.try_ping(*node, PingReason::Lookup(*node, ctx))
855        } else {
856            self.find_node(node, ctx)
857        }
858    }
859
860    /// Notifies all listeners.
861    ///
862    /// Removes all listeners that are closed.
863    fn notify(&mut self, update: DiscoveryUpdate) {
864        self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) {
865            Ok(()) => true,
866            Err(err) => match err {
867                TrySendError::Full(_) => true,
868                TrySendError::Closed(_) => false,
869            },
870        });
871    }
872
873    /// Adds the ip to the ban list indefinitely
874    pub fn ban_ip(&mut self, ip: IpAddr) {
875        self.config.ban_list.ban_ip(ip);
876    }
877
878    /// Adds the peer to the ban list indefinitely.
879    pub fn ban_node(&mut self, node_id: PeerId) {
880        self.remove_node(node_id);
881        self.config.ban_list.ban_peer(node_id);
882    }
883
884    /// Adds the ip to the ban list until the given timestamp.
885    pub fn ban_ip_until(&mut self, ip: IpAddr, until: Instant) {
886        self.config.ban_list.ban_ip_until(ip, until);
887    }
888
889    /// Adds the peer to the ban list and bans it until the given timestamp
890    pub fn ban_node_until(&mut self, node_id: PeerId, until: Instant) {
891        self.remove_node(node_id);
892        self.config.ban_list.ban_peer_until(node_id, until);
893    }
894
895    /// Removes a `node_id` from the routing table.
896    ///
897    /// This allows applications, for whatever reason, to remove nodes from the local routing
898    /// table. Returns `true` if the node was in the table and `false` otherwise.
899    pub fn remove_node(&mut self, node_id: PeerId) -> bool {
900        let key = kad_key(node_id);
901        self.remove_key(node_id, key)
902    }
903
904    /// Removes a `node_id` from the routing table but only if there are enough other nodes in the
905    /// bucket (bucket must be at least half full)
906    ///
907    /// Returns `true` if the node was removed
908    pub fn soft_remove_node(&mut self, node_id: PeerId) -> bool {
909        let key = kad_key(node_id);
910        let Some(bucket) = self.kbuckets.get_bucket(&key) else { return false };
911        if bucket.num_entries() < MAX_NODES_PER_BUCKET / 2 {
912            // skip half empty bucket
913            return false
914        }
915        self.remove_key(node_id, key)
916    }
917
918    fn remove_key(&mut self, node_id: PeerId, key: discv5::Key<NodeKey>) -> bool {
919        let removed = self.kbuckets.remove(&key);
920        if removed {
921            trace!(target: "discv4", ?node_id, "removed node");
922            self.notify(DiscoveryUpdate::Removed(node_id));
923        }
924        removed
925    }
926
927    /// Gets the number of entries that are considered connected.
928    pub fn num_connected(&self) -> usize {
929        self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected())
930    }
931
932    /// Check if the peer has an active bond.
933    fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool {
934        if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) &&
935            timestamp.elapsed() < self.config.bond_expiration
936        {
937            return true
938        }
939        false
940    }
941
942    /// Applies a closure on the pending or present [`NodeEntry`].
943    fn on_entry<F, R>(&mut self, peer_id: PeerId, f: F) -> Option<R>
944    where
945        F: FnOnce(&NodeEntry) -> R,
946    {
947        let key = kad_key(peer_id);
948        match self.kbuckets.entry(&key) {
949            BucketEntry::Present(entry, _) => Some(f(entry.value())),
950            BucketEntry::Pending(mut entry, _) => Some(f(entry.value())),
951            _ => None,
952        }
953    }
954
955    /// Update the entry on RE-ping.
956    ///
957    /// Invoked when we received the Pong to our [`PingReason::RePing`] ping.
958    ///
959    /// On re-ping we check for a changed `enr_seq` if eip868 is enabled and when it changed we sent
960    /// a followup request to retrieve the updated ENR
961    fn update_on_reping(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
962        if record.id == self.local_node_record.id {
963            return
964        }
965
966        // If EIP868 extension is disabled then we want to ignore this
967        if !self.config.enable_eip868 {
968            last_enr_seq = None;
969        }
970
971        let key = kad_key(record.id);
972        let old_enr = match self.kbuckets.entry(&key) {
973            kbucket::Entry::Present(mut entry, _) => {
974                entry.value_mut().update_with_enr(last_enr_seq)
975            }
976            kbucket::Entry::Pending(mut entry, _) => entry.value().update_with_enr(last_enr_seq),
977            _ => return,
978        };
979
980        // Check if ENR was updated
981        match (last_enr_seq, old_enr) {
982            (Some(new), Some(old)) if new > old => {
983                self.send_enr_request(record);
984            }
985            (Some(_), None) => {
986                // got an ENR
987                self.send_enr_request(record);
988            }
989            _ => {}
990        };
991    }
992
993    /// Callback invoked when we receive a pong from the peer.
994    fn update_on_pong(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
995        if record.id == *self.local_peer_id() {
996            return
997        }
998
999        // If EIP868 extension is disabled then we want to ignore this
1000        if !self.config.enable_eip868 {
1001            last_enr_seq = None;
1002        }
1003
1004        // if the peer included a enr seq in the pong then we can try to request the ENR of that
1005        // node
1006        let has_enr_seq = last_enr_seq.is_some();
1007
1008        let key = kad_key(record.id);
1009        match self.kbuckets.entry(&key) {
1010            kbucket::Entry::Present(mut entry, old_status) => {
1011                // endpoint is now proven
1012                entry.value_mut().establish_proof();
1013                entry.value_mut().update_with_enr(last_enr_seq);
1014
1015                if !old_status.is_connected() {
1016                    let _ = entry.update(ConnectionState::Connected, Some(old_status.direction));
1017                    trace!(target: "discv4", ?record, "added after successful endpoint proof");
1018                    self.notify(DiscoveryUpdate::Added(record));
1019
1020                    if has_enr_seq {
1021                        // request the ENR of the node
1022                        self.send_enr_request(record);
1023                    }
1024                }
1025            }
1026            kbucket::Entry::Pending(mut entry, mut status) => {
1027                // endpoint is now proven
1028                entry.value().establish_proof();
1029                entry.value().update_with_enr(last_enr_seq);
1030
1031                if !status.is_connected() {
1032                    status.state = ConnectionState::Connected;
1033                    let _ = entry.update(status);
1034                    trace!(target: "discv4", ?record, "added after successful endpoint proof");
1035                    self.notify(DiscoveryUpdate::Added(record));
1036
1037                    if has_enr_seq {
1038                        // request the ENR of the node
1039                        self.send_enr_request(record);
1040                    }
1041                }
1042            }
1043            _ => {}
1044        };
1045    }
1046
1047    /// Adds all nodes
1048    ///
1049    /// See [`Self::add_node`]
1050    pub fn add_all_nodes(&mut self, records: impl IntoIterator<Item = NodeRecord>) {
1051        for record in records {
1052            self.add_node(record);
1053        }
1054    }
1055
1056    /// If the node's not in the table yet, this will add it to the table and start the endpoint
1057    /// proof by sending a ping to the node.
1058    ///
1059    /// Returns `true` if the record was added successfully, and `false` if the node is either
1060    /// already in the table or the record's bucket is full.
1061    pub fn add_node(&mut self, record: NodeRecord) -> bool {
1062        let key = kad_key(record.id);
1063        match self.kbuckets.entry(&key) {
1064            kbucket::Entry::Absent(entry) => {
1065                let node = NodeEntry::new(record);
1066                match entry.insert(
1067                    node,
1068                    NodeStatus {
1069                        direction: ConnectionDirection::Outgoing,
1070                        state: ConnectionState::Disconnected,
1071                    },
1072                ) {
1073                    BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1074                        trace!(target: "discv4", ?record, "inserted new record");
1075                    }
1076                    _ => return false,
1077                }
1078            }
1079            _ => return false,
1080        }
1081
1082        // send the initial ping to the _new_ node
1083        self.try_ping(record, PingReason::InitialInsert);
1084        true
1085    }
1086
1087    /// Encodes the packet, sends it and returns the hash.
1088    pub(crate) fn send_packet(&self, msg: Message, to: SocketAddr) -> B256 {
1089        let (payload, hash) = msg.encode(&self.secret_key);
1090        trace!(target: "discv4", r#type=?msg.msg_type(), ?to, ?hash, "sending packet");
1091        let _ = self.egress.try_send((payload, to)).map_err(|err| {
1092            debug!(
1093                target: "discv4",
1094                %err,
1095                "dropped outgoing packet",
1096            );
1097        });
1098        hash
1099    }
1100
1101    /// Returns a signed `FindNode` packet for `target`, reusing a cached payload when possible.
1102    fn find_node_packet(&mut self, target: PeerId) -> (Bytes, B256) {
1103        let expire = self.find_node_expiration();
1104        let cache_ttl = self.config.request_timeout / 4;
1105        CachedFindNode::get_or_sign(
1106            &mut self.cached_find_node,
1107            target,
1108            cache_ttl,
1109            &self.secret_key,
1110            expire,
1111        )
1112    }
1113
1114    /// Message handler for an incoming `Ping`
1115    fn on_ping(&mut self, ping: Ping, remote_addr: SocketAddr, remote_id: PeerId, hash: B256) {
1116        if self.is_expired(ping.expire) {
1117            // ping's expiration timestamp is in the past
1118            return
1119        }
1120
1121        // create the record
1122        let record = NodeRecord {
1123            address: remote_addr.ip(),
1124            udp_port: remote_addr.port(),
1125            tcp_port: ping.from.tcp_port,
1126            id: remote_id,
1127        }
1128        .into_ipv4_mapped();
1129
1130        let key = kad_key(record.id);
1131
1132        // See also <https://github.com/ethereum/devp2p/blob/master/discv4.md#ping-packet-0x01>:
1133        // > If no communication with the sender of this ping has occurred within the last 12h, a
1134        // > ping should be sent in addition to pong in order to receive an endpoint proof.
1135        //
1136        // Note: we only mark if the node is absent because the `last 12h` condition is handled by
1137        // the ping interval
1138        let mut is_new_insert = false;
1139        let mut needs_bond = false;
1140        let mut is_proven = false;
1141
1142        let old_enr = match self.kbuckets.entry(&key) {
1143            kbucket::Entry::Present(mut entry, _) => {
1144                if entry.value().is_expired() {
1145                    // If no communication with the sender has occurred within the last 12h, a ping
1146                    // should be sent in addition to pong in order to receive an endpoint proof.
1147                    needs_bond = true;
1148                } else {
1149                    is_proven = entry.value().has_endpoint_proof;
1150                }
1151                entry.value_mut().update_with_enr(ping.enr_sq)
1152            }
1153            kbucket::Entry::Pending(mut entry, _) => {
1154                if entry.value().is_expired() {
1155                    // If no communication with the sender has occurred within the last 12h, a ping
1156                    // should be sent in addition to pong in order to receive an endpoint proof.
1157                    needs_bond = true;
1158                } else {
1159                    is_proven = entry.value().has_endpoint_proof;
1160                }
1161                entry.value().update_with_enr(ping.enr_sq)
1162            }
1163            kbucket::Entry::Absent(entry) => {
1164                let mut node = NodeEntry::new(record);
1165                node.last_enr_seq = ping.enr_sq;
1166
1167                match entry.insert(
1168                    node,
1169                    NodeStatus {
1170                        direction: ConnectionDirection::Incoming,
1171                        // mark as disconnected until endpoint proof established on pong
1172                        state: ConnectionState::Disconnected,
1173                    },
1174                ) {
1175                    BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1176                        // mark as new insert if insert was successful
1177                        is_new_insert = true;
1178                    }
1179                    BucketInsertResult::Full => {
1180                        // we received a ping but the corresponding bucket for the peer is already
1181                        // full, we can't add any additional peers to that bucket, but we still want
1182                        // to emit an event that we discovered the node
1183                        trace!(target: "discv4", ?record, "discovered new record but bucket is full");
1184                        self.notify(DiscoveryUpdate::DiscoveredAtCapacity(record));
1185                        needs_bond = true;
1186                    }
1187                    BucketInsertResult::TooManyIncoming | BucketInsertResult::NodeExists => {
1188                        needs_bond = true;
1189                        // insert unsuccessful but we still want to send the pong
1190                    }
1191                    BucketInsertResult::FailedFilter => return,
1192                }
1193
1194                None
1195            }
1196            kbucket::Entry::SelfEntry => return,
1197        };
1198
1199        // send the pong first, but the PONG and optionally PING don't need to be send in a
1200        // particular order
1201        let pong = Message::Pong(Pong {
1202            // we use the actual address of the peer
1203            to: record.into(),
1204            echo: hash,
1205            expire: ping.expire,
1206            enr_sq: self.enr_seq(),
1207        });
1208        self.send_packet(pong, remote_addr);
1209
1210        // if node was absent also send a ping to establish the endpoint proof from our end
1211        if is_new_insert {
1212            self.try_ping(record, PingReason::InitialInsert);
1213        } else if needs_bond {
1214            self.try_ping(record, PingReason::EstablishBond);
1215        } else if is_proven {
1216            // if node has been proven, this means we've received a pong and verified its endpoint
1217            // proof. We've also sent a pong above to verify our endpoint proof, so we can now
1218            // send our find_nodes request if PingReason::Lookup
1219            if let Some((_, ctx)) = self.pending_lookup.remove(&record.id) {
1220                if self.pending_find_nodes.contains_key(&record.id) {
1221                    // there's already another pending request, unmark it so the next round can
1222                    // try to send it
1223                    ctx.unmark_queried(record.id);
1224                } else {
1225                    // we just received a ping from that peer so we can send a find node request
1226                    // directly
1227                    self.find_node(&record, ctx);
1228                }
1229            }
1230        } else {
1231            // Request ENR if included in the ping
1232            match (ping.enr_sq, old_enr) {
1233                (Some(new), Some(old)) if new > old => {
1234                    self.send_enr_request(record);
1235                }
1236                (Some(_), None) => {
1237                    self.send_enr_request(record);
1238                }
1239                _ => {}
1240            };
1241        }
1242    }
1243
1244    // Guarding function for [`Self::send_ping`] that applies pre-checks
1245    fn try_ping(&mut self, node: NodeRecord, reason: PingReason) {
1246        if node.id == *self.local_peer_id() {
1247            // don't ping ourselves
1248            return
1249        }
1250
1251        if self.pending_pings.contains_key(&node.id) ||
1252            self.pending_find_nodes.contains_key(&node.id)
1253        {
1254            return
1255        }
1256
1257        if self.queued_pings.iter().any(|(n, _)| n.id == node.id) {
1258            return
1259        }
1260
1261        if self.pending_pings.len() < MAX_NODES_PING {
1262            self.send_ping(node, reason);
1263        } else if self.queued_pings.len() < MAX_QUEUED_PINGS {
1264            self.queued_pings.push_back((node, reason));
1265        }
1266    }
1267
1268    /// Sends a ping message to the node's UDP address.
1269    ///
1270    /// Returns the echo hash of the ping message.
1271    pub(crate) fn send_ping(&mut self, node: NodeRecord, reason: PingReason) -> B256 {
1272        let remote_addr = node.udp_addr();
1273        let id = node.id;
1274        let ping = Ping {
1275            from: self.local_node_record.into(),
1276            to: node.into(),
1277            expire: self.ping_expiration(),
1278            enr_sq: self.enr_seq(),
1279        };
1280        trace!(target: "discv4", ?ping, "sending ping");
1281        let echo_hash = self.send_packet(Message::Ping(ping), remote_addr);
1282
1283        self.pending_pings
1284            .insert(id, PingRequest { sent_at: Instant::now(), node, echo_hash, reason });
1285        echo_hash
1286    }
1287
1288    /// Sends an enr request message to the node's UDP address.
1289    ///
1290    /// Returns the echo hash of the ping message.
1291    pub(crate) fn send_enr_request(&mut self, node: NodeRecord) {
1292        if !self.config.enable_eip868 {
1293            return
1294        }
1295        let remote_addr = node.udp_addr();
1296        let enr_request = EnrRequest { expire: self.enr_request_expiration() };
1297
1298        trace!(target: "discv4", ?enr_request, "sending enr request");
1299        let echo_hash = self.send_packet(Message::EnrRequest(enr_request), remote_addr);
1300
1301        self.pending_enr_requests
1302            .insert(node.id, EnrRequestState { sent_at: Instant::now(), echo_hash });
1303    }
1304
1305    /// Message handler for an incoming `Pong`.
1306    fn on_pong(&mut self, pong: Pong, remote_addr: SocketAddr, remote_id: PeerId) {
1307        if self.is_expired(pong.expire) {
1308            return
1309        }
1310
1311        let PingRequest { node, reason, .. } = match self.pending_pings.entry(remote_id) {
1312            Entry::Occupied(entry) => {
1313                {
1314                    let request = entry.get();
1315                    if request.echo_hash != pong.echo {
1316                        trace!(target: "discv4", from=?remote_addr, expected=?request.echo_hash, echo_hash=?pong.echo,"Got unexpected Pong");
1317                        return
1318                    }
1319                }
1320                entry.remove()
1321            }
1322            Entry::Vacant(_) => return,
1323        };
1324
1325        // keep track of the pong
1326        self.received_pongs.on_pong(remote_id, remote_addr.ip());
1327
1328        match reason {
1329            PingReason::InitialInsert => {
1330                self.update_on_pong(node, pong.enr_sq);
1331                // Reset the lookup interval so the next poll_tick fires immediately,
1332                // rather than waiting the full ~20s for the first lookup.
1333                if self.pending_lookup_reset && self.config.bootstrap_nodes.contains(&node) {
1334                    self.pending_lookup_reset = false;
1335                    self.lookup_interval.reset();
1336                }
1337            }
1338            PingReason::EstablishBond => {
1339                // no initial lookup needed here since the node was already in the table.
1340                self.update_on_pong(node, pong.enr_sq);
1341            }
1342            PingReason::RePing => {
1343                self.update_on_reping(node, pong.enr_sq);
1344            }
1345            PingReason::Lookup(node, ctx) => {
1346                self.update_on_pong(node, pong.enr_sq);
1347                // insert node and assoc. lookup_context into the pending_lookup table to complete
1348                // our side of the endpoint proof verification.
1349                // Start the lookup timer here - and evict accordingly. Note that this is a separate
1350                // timer than the ping_request timer.
1351                self.pending_lookup.insert(node.id, (Instant::now(), ctx));
1352            }
1353        }
1354    }
1355
1356    /// Handler for an incoming `FindNode` message
1357    fn on_find_node(&mut self, msg: FindNode, remote_addr: SocketAddr, node_id: PeerId) {
1358        if self.is_expired(msg.expire) {
1359            // expiration timestamp is in the past
1360            return
1361        }
1362        if node_id == *self.local_peer_id() {
1363            // ignore find node requests to ourselves
1364            return
1365        }
1366
1367        if self.has_bond(node_id, remote_addr.ip()) {
1368            self.respond_closest(msg.id, remote_addr)
1369        }
1370    }
1371
1372    /// Handler for incoming `EnrResponse` message
1373    fn on_enr_response(&mut self, msg: EnrResponse, remote_addr: SocketAddr, id: PeerId) {
1374        trace!(target: "discv4", ?remote_addr, ?msg, "received ENR response");
1375        if let Some(resp) = self.pending_enr_requests.remove(&id) {
1376            // ensure the ENR's public key matches the expected node id
1377            let enr_id = pk2id(&msg.enr.public_key());
1378            if id != enr_id {
1379                return
1380            }
1381
1382            if resp.echo_hash == msg.request_hash {
1383                let key = kad_key(id);
1384                let fork_id = msg.eth_fork_id();
1385                let (record, old_fork_id) = match self.kbuckets.entry(&key) {
1386                    kbucket::Entry::Present(mut entry, _) => {
1387                        let id = entry.value_mut().update_with_fork_id(fork_id);
1388                        (entry.value().record, id)
1389                    }
1390                    kbucket::Entry::Pending(mut entry, _) => {
1391                        let id = entry.value().update_with_fork_id(fork_id);
1392                        (entry.value().record, id)
1393                    }
1394                    _ => return,
1395                };
1396                match (fork_id, old_fork_id) {
1397                    (Some(new), Some(old)) if new != old => {
1398                        self.notify(DiscoveryUpdate::EnrForkId(record, new))
1399                    }
1400                    (Some(new), None) => self.notify(DiscoveryUpdate::EnrForkId(record, new)),
1401                    _ => {}
1402                }
1403            }
1404        }
1405    }
1406
1407    /// Handler for incoming `EnrRequest` message
1408    fn on_enr_request(
1409        &self,
1410        msg: EnrRequest,
1411        remote_addr: SocketAddr,
1412        id: PeerId,
1413        request_hash: B256,
1414    ) {
1415        if !self.config.enable_eip868 || self.is_expired(msg.expire) {
1416            return
1417        }
1418
1419        if self.has_bond(id, remote_addr.ip()) {
1420            self.send_packet(
1421                Message::EnrResponse(EnrResponse {
1422                    request_hash,
1423                    enr: self.local_eip_868_enr.clone(),
1424                }),
1425                remote_addr,
1426            );
1427        }
1428    }
1429
1430    /// Handler for incoming `Neighbours` messages that are handled if they're responses to
1431    /// `FindNode` requests.
1432    fn on_neighbours(&mut self, msg: Neighbours, remote_addr: SocketAddr, node_id: PeerId) {
1433        if self.is_expired(msg.expire) {
1434            // response is expired
1435            return
1436        }
1437        // check if this request was expected
1438        let ctx = match self.pending_find_nodes.entry(node_id) {
1439            Entry::Occupied(mut entry) => {
1440                {
1441                    let request = entry.get_mut();
1442                    // Mark the request as answered
1443                    request.answered = true;
1444                    let total = request.response_count + msg.nodes.len();
1445
1446                    // Neighbours response is exactly 1 bucket (16 entries).
1447                    if total <= MAX_NODES_PER_BUCKET {
1448                        request.response_count = total;
1449                    } else {
1450                        trace!(target: "discv4", total, from=?remote_addr, "Received neighbors packet entries exceeds max nodes per bucket");
1451                        return
1452                    }
1453                };
1454
1455                if entry.get().response_count == MAX_NODES_PER_BUCKET {
1456                    // node responding with a full bucket of records
1457                    let ctx = entry.remove().lookup_context;
1458                    ctx.mark_responded(node_id);
1459                    ctx
1460                } else {
1461                    entry.get().lookup_context.clone()
1462                }
1463            }
1464            Entry::Vacant(_) => {
1465                // received neighbours response without requesting it
1466                trace!(target: "discv4", from=?remote_addr, "Received unsolicited Neighbours");
1467                return
1468            }
1469        };
1470
1471        // log the peers we discovered
1472        trace!(target: "discv4",
1473            target=format!("{:#?}", node_id),
1474            peers_count=msg.nodes.len(),
1475            peers=format!("[{:#}]", msg.nodes.iter()
1476                .map(|node_rec| node_rec.id
1477            ).format(", ")),
1478            "Received peers from Neighbours packet"
1479        );
1480
1481        // This is the recursive lookup step where we initiate new FindNode requests for new nodes
1482        // that were discovered.
1483        for node in msg.nodes.into_iter().map(NodeRecord::into_ipv4_mapped) {
1484            // prevent banned peers from being added to the context
1485            if self.config.ban_list.is_banned(&node.id, &node.address) {
1486                trace!(target: "discv4", peer_id=?node.id, ip=?node.address, "ignoring banned record");
1487                continue
1488            }
1489
1490            ctx.add_node(node);
1491        }
1492
1493        // get the next closest nodes, not yet queried nodes and start over.
1494        let closest =
1495            ctx.filter_closest(ALPHA, |node| !self.pending_find_nodes.contains_key(&node.id));
1496
1497        for closest in closest {
1498            let key = kad_key(closest.id);
1499            match self.kbuckets.entry(&key) {
1500                BucketEntry::Absent(entry) => {
1501                    // the node's endpoint is not proven yet, so we need to ping it first, on
1502                    // success, we will add the node to the pending_lookup table, and wait to send
1503                    // back a Pong before initiating a FindNode request.
1504                    // In order to prevent that this node is selected again on subsequent responses,
1505                    // while the ping is still active, we always mark it as queried.
1506                    ctx.mark_queried(closest.id);
1507                    let node = NodeEntry::new(closest);
1508                    match entry.insert(
1509                        node,
1510                        NodeStatus {
1511                            direction: ConnectionDirection::Outgoing,
1512                            state: ConnectionState::Disconnected,
1513                        },
1514                    ) {
1515                        BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1516                            // only ping if the node was added to the table
1517                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1518                        }
1519                        BucketInsertResult::Full => {
1520                            // new node but the node's bucket is already full
1521                            self.notify(DiscoveryUpdate::DiscoveredAtCapacity(closest))
1522                        }
1523                        _ => {}
1524                    }
1525                }
1526                BucketEntry::SelfEntry => {
1527                    // we received our own node entry
1528                }
1529                BucketEntry::Present(entry, _) => {
1530                    if entry.value().has_endpoint_proof {
1531                        if entry
1532                            .value()
1533                            .exceeds_find_node_failures(self.config.max_find_node_failures)
1534                        {
1535                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1536                        } else {
1537                            self.find_node(&closest, ctx.clone());
1538                        }
1539                    }
1540                }
1541                BucketEntry::Pending(mut entry, _) => {
1542                    if entry.value().has_endpoint_proof {
1543                        if entry
1544                            .value()
1545                            .exceeds_find_node_failures(self.config.max_find_node_failures)
1546                        {
1547                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1548                        } else {
1549                            self.find_node(&closest, ctx.clone());
1550                        }
1551                    }
1552                }
1553            }
1554        }
1555    }
1556
1557    /// Sends a Neighbours packet for `target` to the given addr
1558    fn respond_closest(&mut self, target: PeerId, to: SocketAddr) {
1559        let key = kad_key(target);
1560        let expire = self.send_neighbours_expiration();
1561
1562        // get the MAX_NODES_PER_BUCKET closest nodes to the target
1563        let closest_nodes =
1564            self.kbuckets.closest_values(&key).take(MAX_NODES_PER_BUCKET).collect::<Vec<_>>();
1565
1566        for nodes in closest_nodes.chunks(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) {
1567            let nodes = nodes.iter().map(|node| node.value.record).collect::<Vec<NodeRecord>>();
1568            trace!(target: "discv4", len = nodes.len(), to=?to,"Sent neighbours packet");
1569            let msg = Message::Neighbours(Neighbours { nodes, expire });
1570            self.send_packet(msg, to);
1571        }
1572    }
1573
1574    fn evict_expired_requests(&mut self, now: Instant) {
1575        self.pending_enr_requests.retain(|_node_id, enr_request| {
1576            now.duration_since(enr_request.sent_at) < self.config.enr_expiration
1577        });
1578
1579        let mut failed_pings = Vec::new();
1580        self.pending_pings.retain(|node_id, ping_request| {
1581            if now.duration_since(ping_request.sent_at) > self.config.ping_expiration {
1582                failed_pings.push(*node_id);
1583                return false
1584            }
1585            true
1586        });
1587
1588        if !failed_pings.is_empty() {
1589            // remove nodes that failed to pong
1590            trace!(target: "discv4", num=%failed_pings.len(), "evicting nodes due to failed pong");
1591            for node_id in failed_pings {
1592                self.remove_node(node_id);
1593            }
1594        }
1595
1596        let mut failed_lookups = Vec::new();
1597        self.pending_lookup.retain(|node_id, (lookup_sent_at, _)| {
1598            if now.duration_since(*lookup_sent_at) > self.config.request_timeout {
1599                failed_lookups.push(*node_id);
1600                return false
1601            }
1602            true
1603        });
1604
1605        if !failed_lookups.is_empty() {
1606            // remove nodes that failed the e2e lookup process, so we can restart it
1607            trace!(target: "discv4", num=%failed_lookups.len(), "evicting nodes due to failed lookup");
1608            for node_id in failed_lookups {
1609                self.remove_node(node_id);
1610            }
1611        }
1612
1613        self.evict_failed_find_nodes(now);
1614    }
1615
1616    /// Handles failed responses to `FindNode`
1617    fn evict_failed_find_nodes(&mut self, now: Instant) {
1618        let mut failed_find_nodes = Vec::new();
1619        self.pending_find_nodes.retain(|node_id, find_node_request| {
1620            if now.duration_since(find_node_request.sent_at) > self.config.neighbours_expiration {
1621                if !find_node_request.answered {
1622                    // node actually responded but with fewer entries than expected, but we don't
1623                    // treat this as an hard error since it responded.
1624                    failed_find_nodes.push(*node_id);
1625                }
1626                return false
1627            }
1628            true
1629        });
1630
1631        if failed_find_nodes.is_empty() {
1632            return
1633        }
1634
1635        trace!(target: "discv4", num=%failed_find_nodes.len(), "processing failed find nodes");
1636
1637        for node_id in failed_find_nodes {
1638            let key = kad_key(node_id);
1639            let failures = match self.kbuckets.entry(&key) {
1640                kbucket::Entry::Present(mut entry, _) => {
1641                    entry.value_mut().inc_failed_request();
1642                    entry.value().find_node_failures
1643                }
1644                kbucket::Entry::Pending(mut entry, _) => {
1645                    entry.value().inc_failed_request();
1646                    entry.value().find_node_failures
1647                }
1648                _ => continue,
1649            };
1650
1651            // if the node failed to respond anything useful multiple times, remove the node from
1652            // the table, but only if there are enough other nodes in the bucket (bucket must be at
1653            // least half full)
1654            if failures > self.config.max_find_node_failures {
1655                self.soft_remove_node(node_id);
1656            }
1657        }
1658    }
1659
1660    /// Re-pings all nodes which endpoint proofs are considered expired: [`NodeEntry::is_expired`]
1661    ///
1662    /// This will send a `Ping` to the nodes, if a node fails to respond with a `Pong` to renew the
1663    /// endpoint proof it will be removed from the table.
1664    fn re_ping_oldest(&mut self) {
1665        let mut nodes = self
1666            .kbuckets
1667            .iter_ref()
1668            .filter(|entry| entry.node.value.is_expired())
1669            .map(|n| n.node.value)
1670            .collect::<Vec<_>>();
1671        nodes.sort_unstable_by_key(|a| a.last_seen);
1672        let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::<Vec<_>>();
1673        for node in to_ping {
1674            self.try_ping(node, PingReason::RePing)
1675        }
1676    }
1677
1678    /// Returns true if the expiration timestamp is in the past.
1679    fn is_expired(&self, expiration: u64) -> bool {
1680        self.ensure_not_expired(expiration).is_err()
1681    }
1682
1683    /// Validate that given timestamp is not expired.
1684    ///
1685    /// Note: this accepts the timestamp as u64 because this is used by the wire protocol, but the
1686    /// UNIX timestamp (number of non-leap seconds since January 1, 1970 0:00:00 UTC) is supposed to
1687    /// be an i64.
1688    ///
1689    /// Returns an error if:
1690    ///  - invalid UNIX timestamp (larger than `i64::MAX`)
1691    ///  - timestamp is expired (lower than current local UNIX timestamp)
1692    fn ensure_not_expired(&self, timestamp: u64) -> Result<(), ()> {
1693        // ensure the timestamp is a valid UNIX timestamp
1694        let _ = i64::try_from(timestamp).map_err(drop)?;
1695
1696        let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
1697        if self.config.enforce_expiration_timestamps && timestamp < now {
1698            trace!(target: "discv4", "Expired packet");
1699            return Err(())
1700        }
1701        Ok(())
1702    }
1703
1704    /// Pops buffered ping requests and sends them.
1705    fn ping_buffered(&mut self) {
1706        while self.pending_pings.len() < MAX_NODES_PING {
1707            match self.queued_pings.pop_front() {
1708                Some((next, reason)) => self.try_ping(next, reason),
1709                None => break,
1710            }
1711        }
1712    }
1713
1714    fn ping_expiration(&self) -> u64 {
1715        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.ping_expiration)
1716            .as_secs()
1717    }
1718
1719    fn find_node_expiration(&self) -> u64 {
1720        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.request_timeout)
1721            .as_secs()
1722    }
1723
1724    fn enr_request_expiration(&self) -> u64 {
1725        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.enr_expiration)
1726            .as_secs()
1727    }
1728
1729    fn send_neighbours_expiration(&self) -> u64 {
1730        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.neighbours_expiration)
1731            .as_secs()
1732    }
1733
1734    /// Polls the socket and advances the state.
1735    ///
1736    /// To prevent traffic amplification attacks, implementations must verify that the sender of a
1737    /// query participates in the discovery protocol. The sender of a packet is considered verified
1738    /// if it has sent a valid Pong response with matching ping hash within the last 12 hours.
1739    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Discv4Event> {
1740        loop {
1741            // drain buffered events first
1742            if let Some(event) = self.queued_events.pop_front() {
1743                return Poll::Ready(event)
1744            }
1745
1746            // trigger self lookup
1747            if self.config.enable_lookup {
1748                while self.lookup_interval.poll_tick(cx).is_ready() {
1749                    let target = self.lookup_rotator.next(&self.local_node_record.id);
1750                    self.lookup_with(target, None);
1751                }
1752            }
1753
1754            // re-ping some peers
1755            while self.ping_interval.poll_tick(cx).is_ready() {
1756                self.re_ping_oldest();
1757            }
1758
1759            if let Some(Poll::Ready(Some(ip))) =
1760                self.resolve_external_ip_interval.as_mut().map(|r| r.poll_tick(cx))
1761            {
1762                self.set_external_ip_addr(ip);
1763            }
1764
1765            // drain all incoming `Discv4` commands, this channel can never close
1766            while let Poll::Ready(Some(cmd)) = self.commands_rx.poll_recv(cx) {
1767                match cmd {
1768                    Discv4Command::Add(enr) => {
1769                        self.add_node(enr);
1770                    }
1771                    Discv4Command::AddBootNode(record) => {
1772                        self.add_boot_node(record);
1773                    }
1774                    Discv4Command::Lookup { node_id, tx } => {
1775                        let node_id = node_id.unwrap_or(self.local_node_record.id);
1776                        self.lookup_with(node_id, tx);
1777                    }
1778                    Discv4Command::SetLookupInterval(duration) => {
1779                        self.set_lookup_interval(duration);
1780                    }
1781                    Discv4Command::Updates(tx) => {
1782                        let rx = self.update_stream();
1783                        let _ = tx.send(rx);
1784                    }
1785                    Discv4Command::BanPeer(node_id) => self.ban_node(node_id),
1786                    Discv4Command::Remove(node_id) => {
1787                        self.remove_node(node_id);
1788                    }
1789                    Discv4Command::Ban(node_id, ip) => {
1790                        self.ban_node(node_id);
1791                        self.ban_ip(ip);
1792                    }
1793                    Discv4Command::BanIp(ip) => {
1794                        self.ban_ip(ip);
1795                    }
1796                    Discv4Command::SetEIP868RLPPair { key, rlp } => {
1797                        debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair");
1798
1799                        let _ = self.local_eip_868_enr.insert_raw_rlp(key, rlp, &self.secret_key);
1800                    }
1801                    Discv4Command::SetTcpPort(port) => {
1802                        debug!(target: "discv4", %port, "Update tcp port");
1803                        self.local_node_record.tcp_port = port;
1804                        if self.local_node_record.address.is_ipv4() {
1805                            let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key);
1806                        } else {
1807                            let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key);
1808                        }
1809                    }
1810
1811                    Discv4Command::Terminated => {
1812                        // terminate the service
1813                        self.queued_events.push_back(Discv4Event::Terminated);
1814                    }
1815                }
1816            }
1817
1818            // restricts how many messages we process in a single poll before yielding back control
1819            let mut udp_message_budget = UDP_MESSAGE_POLL_LOOP_BUDGET;
1820
1821            // process all incoming datagrams
1822            while let Poll::Ready(Some(event)) = self.ingress.poll_recv(cx) {
1823                match event {
1824                    IngressEvent::RecvError(err) => {
1825                        debug!(target: "discv4", %err, "failed to read datagram");
1826                    }
1827                    IngressEvent::BadPacket(from, err, data) => {
1828                        trace!(target: "discv4", ?from, %err, packet=?hex::encode(&data), "bad packet");
1829                    }
1830                    IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => {
1831                        trace!(target: "discv4", r#type=?msg.msg_type(), from=?remote_addr,"received packet");
1832                        let event = match msg {
1833                            Message::Ping(ping) => {
1834                                self.on_ping(ping, remote_addr, node_id, hash);
1835                                Discv4Event::Ping
1836                            }
1837                            Message::Pong(pong) => {
1838                                self.on_pong(pong, remote_addr, node_id);
1839                                Discv4Event::Pong
1840                            }
1841                            Message::FindNode(msg) => {
1842                                self.on_find_node(msg, remote_addr, node_id);
1843                                Discv4Event::FindNode
1844                            }
1845                            Message::Neighbours(msg) => {
1846                                self.on_neighbours(msg, remote_addr, node_id);
1847                                Discv4Event::Neighbours
1848                            }
1849                            Message::EnrRequest(msg) => {
1850                                self.on_enr_request(msg, remote_addr, node_id, hash);
1851                                Discv4Event::EnrRequest
1852                            }
1853                            Message::EnrResponse(msg) => {
1854                                self.on_enr_response(msg, remote_addr, node_id);
1855                                Discv4Event::EnrResponse
1856                            }
1857                        };
1858
1859                        self.queued_events.push_back(event);
1860                    }
1861                }
1862
1863                udp_message_budget -= 1;
1864                if udp_message_budget < 0 {
1865                    trace!(target: "discv4", budget=UDP_MESSAGE_POLL_LOOP_BUDGET, "exhausted message poll budget");
1866                    if self.queued_events.is_empty() {
1867                        // we've exceeded the message budget and have no events to process
1868                        // this will make sure we're woken up again
1869                        cx.waker().wake_by_ref();
1870                    }
1871                    break
1872                }
1873            }
1874
1875            // try resending buffered pings
1876            self.ping_buffered();
1877
1878            // evict expired requests
1879            while self.evict_expired_requests_interval.poll_tick(cx).is_ready() {
1880                self.evict_expired_requests(Instant::now());
1881            }
1882
1883            // evict expired nodes
1884            while self.expire_interval.poll_tick(cx).is_ready() {
1885                self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION);
1886            }
1887
1888            if self.queued_events.is_empty() {
1889                return Poll::Pending
1890            }
1891        }
1892    }
1893}
1894
1895/// Endless future impl
1896impl Stream for Discv4Service {
1897    type Item = Discv4Event;
1898
1899    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1900        // Poll the internal poll method
1901        match ready!(self.get_mut().poll(cx)) {
1902            // if the service is terminated, return None to terminate the stream
1903            Discv4Event::Terminated => Poll::Ready(None),
1904            // For any other event, return Poll::Ready(Some(event))
1905            ev => Poll::Ready(Some(ev)),
1906        }
1907    }
1908}
1909
1910impl fmt::Debug for Discv4Service {
1911    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1912        f.debug_struct("Discv4Service")
1913            .field("local_address", &self.local_address)
1914            .field("local_peer_id", &self.local_peer_id())
1915            .field("local_node_record", &self.local_node_record)
1916            .field("queued_pings", &self.queued_pings)
1917            .field("pending_lookup", &self.pending_lookup)
1918            .field("pending_find_nodes", &self.pending_find_nodes)
1919            .field("lookup_interval", &self.lookup_interval)
1920            .finish_non_exhaustive()
1921    }
1922}
1923
1924/// The Event type the Service stream produces.
1925///
1926/// This is mainly used for testing purposes and represents messages the service processed
1927#[derive(Debug, Eq, PartialEq)]
1928pub enum Discv4Event {
1929    /// A `Ping` message was handled.
1930    Ping,
1931    /// A `Pong` message was handled.
1932    Pong,
1933    /// A `FindNode` message was handled.
1934    FindNode,
1935    /// A `Neighbours` message was handled.
1936    Neighbours,
1937    /// A `EnrRequest` message was handled.
1938    EnrRequest,
1939    /// A `EnrResponse` message was handled.
1940    EnrResponse,
1941    /// Service is being terminated
1942    Terminated,
1943}
1944
1945/// Continuously reads new messages from the channel and writes them to the socket
1946pub(crate) async fn send_loop(udp: Arc<UdpSocket>, rx: EgressReceiver) {
1947    let mut stream = ReceiverStream::new(rx);
1948    while let Some((payload, to)) = stream.next().await {
1949        match udp.send_to(&payload, to).await {
1950            Ok(size) => {
1951                trace!(target: "discv4", ?to, ?size,"sent payload");
1952            }
1953            Err(err) => {
1954                debug!(target: "discv4", ?to, %err,"Failed to send datagram.");
1955            }
1956        }
1957    }
1958}
1959
1960/// Rate limits the number of incoming packets from individual IPs to 1 packet/second
1961const MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP: usize = 60usize;
1962
1963/// Continuously awaits new incoming messages and sends them back through the channel.
1964///
1965/// The receive loop enforce primitive rate limiting for ips to prevent message spams from
1966/// individual IPs
1967pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_id: PeerId) {
1968    let send = |event: IngressEvent| async {
1969        let _ = tx.send(event).await.map_err(|err| {
1970            debug!(
1971                target: "discv4",
1972                 %err,
1973                "failed send incoming packet",
1974            )
1975        });
1976    };
1977
1978    let mut cache = ReceiveCache::default();
1979
1980    // tick at half the rate of the limit
1981    let tick = MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP / 2;
1982    let mut interval = tokio::time::interval(Duration::from_secs(tick as u64));
1983
1984    let mut buf = [0; MAX_PACKET_SIZE];
1985    loop {
1986        let res = udp.recv_from(&mut buf).await;
1987        match res {
1988            Err(err) => {
1989                debug!(target: "discv4", %err, "Failed to read datagram.");
1990                send(IngressEvent::RecvError(err)).await;
1991            }
1992            Ok((read, remote_addr)) => {
1993                // rate limit incoming packets by IP
1994                if cache.inc_ip(remote_addr.ip()) > MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP {
1995                    trace!(target: "discv4", ?remote_addr, "Too many incoming packets from IP.");
1996                    continue
1997                }
1998
1999                let packet = &buf[..read];
2000                match Message::decode(packet) {
2001                    Ok(packet) => {
2002                        if packet.node_id == local_id {
2003                            // received our own message
2004                            debug!(target: "discv4", ?remote_addr, "Received own packet.");
2005                            continue
2006                        }
2007
2008                        // skip if we've already received the same packet
2009                        if cache.contains_packet(packet.hash) {
2010                            debug!(target: "discv4", ?remote_addr, "Received duplicate packet.");
2011                            continue
2012                        }
2013
2014                        send(IngressEvent::Packet(remote_addr, packet)).await;
2015                    }
2016                    Err(err) => {
2017                        trace!(target: "discv4", %err,"Failed to decode packet");
2018                        send(IngressEvent::BadPacket(remote_addr, err, packet.to_vec())).await
2019                    }
2020                }
2021            }
2022        }
2023
2024        // reset the tracked ips if the interval has passed
2025        if poll_fn(|cx| match interval.poll_tick(cx) {
2026            Poll::Ready(_) => Poll::Ready(true),
2027            Poll::Pending => Poll::Ready(false),
2028        })
2029        .await
2030        {
2031            cache.tick_ips(tick);
2032        }
2033    }
2034}
2035
2036/// A cache for received packets and their source address.
2037///
2038/// This is used to discard duplicated packets and rate limit messages from the same source.
2039struct ReceiveCache {
2040    /// keeps track of how many messages we've received from a given IP address since the last
2041    /// tick.
2042    ///
2043    /// This is used to count the number of messages received from a given IP address within an
2044    /// interval.
2045    ip_messages: HashMap<IpAddr, usize>,
2046    // keeps track of unique packet hashes
2047    unique_packets: schnellru::LruMap<B256, ()>,
2048}
2049
2050impl ReceiveCache {
2051    /// Updates the counter for each IP address and removes IPs that have exceeded the limit.
2052    ///
2053    /// This will decrement the counter for each IP address and remove IPs that have reached 0.
2054    fn tick_ips(&mut self, tick: usize) {
2055        self.ip_messages.retain(|_, count| {
2056            if let Some(reset) = count.checked_sub(tick) {
2057                *count = reset;
2058                true
2059            } else {
2060                false
2061            }
2062        });
2063    }
2064
2065    /// Increases the counter for the given IP address and returns the new count.
2066    fn inc_ip(&mut self, ip: IpAddr) -> usize {
2067        let ctn = self.ip_messages.entry(ip).or_default();
2068        *ctn = ctn.saturating_add(1);
2069        *ctn
2070    }
2071
2072    /// Returns true if we previously received the packet
2073    fn contains_packet(&mut self, hash: B256) -> bool {
2074        !self.unique_packets.insert(hash, ())
2075    }
2076}
2077
2078impl Default for ReceiveCache {
2079    fn default() -> Self {
2080        Self {
2081            ip_messages: Default::default(),
2082            unique_packets: schnellru::LruMap::new(schnellru::ByLength::new(32)),
2083        }
2084    }
2085}
2086
2087/// The commands sent from the frontend [Discv4] to the service [`Discv4Service`].
2088enum Discv4Command {
2089    Add(NodeRecord),
2090    AddBootNode(NodeRecord),
2091    SetTcpPort(u16),
2092    SetEIP868RLPPair { key: Vec<u8>, rlp: Bytes },
2093    Ban(PeerId, IpAddr),
2094    BanPeer(PeerId),
2095    BanIp(IpAddr),
2096    Remove(PeerId),
2097    Lookup { node_id: Option<PeerId>, tx: Option<NodeRecordSender> },
2098    SetLookupInterval(Duration),
2099    Updates(OneshotSender<ReceiverStream<DiscoveryUpdate>>),
2100    Terminated,
2101}
2102
2103/// Event type receiver produces
2104#[derive(Debug)]
2105pub(crate) enum IngressEvent {
2106    /// Encountered an error when reading a datagram message.
2107    RecvError(io::Error),
2108    /// Received a bad message
2109    BadPacket(SocketAddr, DecodePacketError, Vec<u8>),
2110    /// Received a datagram from an address.
2111    Packet(SocketAddr, Packet),
2112}
2113
2114/// Tracks a sent ping
2115#[derive(Debug)]
2116struct PingRequest {
2117    // Timestamp when the request was sent.
2118    sent_at: Instant,
2119    // Node to which the request was sent.
2120    node: NodeRecord,
2121    // Hash sent in the Ping request
2122    echo_hash: B256,
2123    /// Why this ping was sent.
2124    reason: PingReason,
2125}
2126
2127/// Rotates the `PeerId` that is periodically looked up.
2128///
2129/// By selecting different targets, the lookups will be seeded with different ALPHA seed nodes.
2130#[derive(Debug)]
2131struct LookupTargetRotator {
2132    interval: usize,
2133    counter: usize,
2134}
2135
2136// === impl LookupTargetRotator ===
2137
2138impl LookupTargetRotator {
2139    /// Returns a rotator that always returns the local target.
2140    const fn local_only() -> Self {
2141        Self { interval: 1, counter: 0 }
2142    }
2143}
2144
2145impl Default for LookupTargetRotator {
2146    fn default() -> Self {
2147        Self {
2148            // every 4th lookup is our own node
2149            interval: 4,
2150            counter: 3,
2151        }
2152    }
2153}
2154
2155impl LookupTargetRotator {
2156    /// This will return the next node id to lookup
2157    fn next(&mut self, local: &PeerId) -> PeerId {
2158        self.counter += 1;
2159        self.counter %= self.interval;
2160        if self.counter == 0 {
2161            return *local
2162        }
2163        PeerId::random()
2164    }
2165}
2166
2167/// Tracks lookups across multiple `FindNode` requests.
2168///
2169/// If this type is dropped by all Clones, it will send all the discovered nodes to the listener, if
2170/// one is present.
2171#[derive(Clone, Debug)]
2172struct LookupContext {
2173    inner: Rc<LookupContextInner>,
2174}
2175
2176impl LookupContext {
2177    /// Create new context for a recursive lookup
2178    fn new(
2179        target: discv5::Key<NodeKey>,
2180        nearest_nodes: impl IntoIterator<Item = (Distance, NodeRecord)>,
2181        listener: Option<NodeRecordSender>,
2182    ) -> Self {
2183        let closest_nodes = nearest_nodes
2184            .into_iter()
2185            .map(|(distance, record)| {
2186                (distance, QueryNode { record, queried: false, responded: false })
2187            })
2188            .collect();
2189
2190        let inner = Rc::new(LookupContextInner {
2191            target,
2192            closest_nodes: RefCell::new(closest_nodes),
2193            listener,
2194        });
2195        Self { inner }
2196    }
2197
2198    /// Returns the target of this lookup
2199    fn target(&self) -> PeerId {
2200        self.inner.target.preimage().0
2201    }
2202
2203    fn closest(&self, num: usize) -> Vec<NodeRecord> {
2204        self.inner
2205            .closest_nodes
2206            .borrow()
2207            .iter()
2208            .filter(|(_, node)| !node.queried)
2209            .map(|(_, n)| n.record)
2210            .take(num)
2211            .collect()
2212    }
2213
2214    /// Returns the closest nodes that have not been queried yet.
2215    fn filter_closest<P>(&self, num: usize, filter: P) -> Vec<NodeRecord>
2216    where
2217        P: FnMut(&NodeRecord) -> bool,
2218    {
2219        self.inner
2220            .closest_nodes
2221            .borrow()
2222            .iter()
2223            .filter(|(_, node)| !node.queried)
2224            .map(|(_, n)| n.record)
2225            .filter(filter)
2226            .take(num)
2227            .collect()
2228    }
2229
2230    /// Inserts the node if it's missing
2231    fn add_node(&self, record: NodeRecord) {
2232        let distance = self.inner.target.distance(&kad_key(record.id));
2233        let mut closest = self.inner.closest_nodes.borrow_mut();
2234        if let btree_map::Entry::Vacant(entry) = closest.entry(distance) {
2235            entry.insert(QueryNode { record, queried: false, responded: false });
2236        }
2237    }
2238
2239    fn set_queried(&self, id: PeerId, val: bool) {
2240        if let Some((_, node)) =
2241            self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2242        {
2243            node.queried = val;
2244        }
2245    }
2246
2247    /// Marks the node as queried
2248    fn mark_queried(&self, id: PeerId) {
2249        self.set_queried(id, true)
2250    }
2251
2252    /// Marks the node as not queried
2253    fn unmark_queried(&self, id: PeerId) {
2254        self.set_queried(id, false)
2255    }
2256
2257    /// Marks the node as responded
2258    fn mark_responded(&self, id: PeerId) {
2259        if let Some((_, node)) =
2260            self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2261        {
2262            node.responded = true;
2263        }
2264    }
2265}
2266
2267// SAFETY: The [`Discv4Service`] is intended to be spawned as task which requires `Send`.
2268// The `LookupContext` is shared by all active `FindNode` requests that are part of the lookup step.
2269// Which can modify the context. The shared context is only ever accessed mutably when a `Neighbour`
2270// response is processed and all Clones are stored inside [`Discv4Service`], in other words it is
2271// guaranteed that there's only 1 owner ([`Discv4Service`]) of all possible [`Rc`] clones of
2272// [`LookupContext`].
2273unsafe impl Send for LookupContext {}
2274#[derive(Debug)]
2275struct LookupContextInner {
2276    /// The target to lookup.
2277    target: discv5::Key<NodeKey>,
2278    /// The closest nodes
2279    closest_nodes: RefCell<BTreeMap<Distance, QueryNode>>,
2280    /// A listener for all the nodes retrieved in this lookup
2281    ///
2282    /// This is present if the lookup was triggered manually via [Discv4] and we want to return all
2283    /// the nodes once the lookup finishes.
2284    listener: Option<NodeRecordSender>,
2285}
2286
2287impl Drop for LookupContextInner {
2288    fn drop(&mut self) {
2289        if let Some(tx) = self.listener.take() {
2290            // there's only 1 instance shared across `FindNode` requests, if this is dropped then
2291            // all requests finished, and we can send all results back
2292            let nodes = self
2293                .closest_nodes
2294                .take()
2295                .into_values()
2296                .filter(|node| node.responded)
2297                .map(|node| node.record)
2298                .collect();
2299            let _ = tx.send(nodes);
2300        }
2301    }
2302}
2303
2304/// Tracks the state of a recursive lookup step
2305#[derive(Debug, Clone, Copy)]
2306struct QueryNode {
2307    record: NodeRecord,
2308    queried: bool,
2309    responded: bool,
2310}
2311
2312#[derive(Debug)]
2313struct FindNodeRequest {
2314    // Timestamp when the request was sent.
2315    sent_at: Instant,
2316    // Number of items sent by the node
2317    response_count: usize,
2318    // Whether the request has been answered yet.
2319    answered: bool,
2320    /// Response buffer
2321    lookup_context: LookupContext,
2322}
2323
2324// === impl FindNodeRequest ===
2325
2326impl FindNodeRequest {
2327    fn new(resp: LookupContext) -> Self {
2328        Self { sent_at: Instant::now(), response_count: 0, answered: false, lookup_context: resp }
2329    }
2330}
2331
2332/// Cached signed `FindNode` packet to avoid redundant ECDSA signing during Kademlia lookups.
2333#[derive(Debug)]
2334struct CachedFindNode {
2335    target: PeerId,
2336    payload: Bytes,
2337    hash: B256,
2338    cached_at: Instant,
2339}
2340
2341impl CachedFindNode {
2342    /// Returns the cached `(payload, hash)` if the target matches and the cache is still fresh,
2343    /// or signs a new packet, updates the cache, and returns it.
2344    fn get_or_sign(
2345        cache: &mut Option<Self>,
2346        target: PeerId,
2347        ttl: Duration,
2348        secret_key: &secp256k1::SecretKey,
2349        expire: u64,
2350    ) -> (Bytes, B256) {
2351        if let Some(c) = cache.as_ref() &&
2352            c.target == target &&
2353            c.cached_at.elapsed() < ttl
2354        {
2355            return (c.payload.clone(), c.hash);
2356        }
2357
2358        let msg = Message::FindNode(FindNode { id: target, expire });
2359        let (payload, hash) = msg.encode(secret_key);
2360
2361        *cache = Some(Self { target, payload: payload.clone(), hash, cached_at: Instant::now() });
2362
2363        (payload, hash)
2364    }
2365}
2366
2367#[derive(Debug)]
2368struct EnrRequestState {
2369    // Timestamp when the request was sent.
2370    sent_at: Instant,
2371    // Hash sent in the Ping request
2372    echo_hash: B256,
2373}
2374
2375/// Stored node info.
2376#[derive(Debug, Clone, Eq, PartialEq)]
2377struct NodeEntry {
2378    /// Node record info.
2379    record: NodeRecord,
2380    /// Timestamp of last pong.
2381    last_seen: Instant,
2382    /// Last enr seq we retrieved via a ENR request.
2383    last_enr_seq: Option<u64>,
2384    /// `ForkId` if retrieved via ENR requests.
2385    fork_id: Option<ForkId>,
2386    /// Counter for failed _consecutive_ findNode requests.
2387    find_node_failures: u8,
2388    /// Whether the endpoint of the peer is proven.
2389    has_endpoint_proof: bool,
2390}
2391
2392// === impl NodeEntry ===
2393
2394impl NodeEntry {
2395    /// Creates a new, unpopulated entry
2396    fn new(record: NodeRecord) -> Self {
2397        Self {
2398            record,
2399            last_seen: Instant::now(),
2400            last_enr_seq: None,
2401            fork_id: None,
2402            find_node_failures: 0,
2403            has_endpoint_proof: false,
2404        }
2405    }
2406
2407    #[cfg(test)]
2408    fn new_proven(record: NodeRecord) -> Self {
2409        let mut node = Self::new(record);
2410        node.has_endpoint_proof = true;
2411        node
2412    }
2413
2414    /// Marks the entry with an established proof and resets the consecutive failure counter.
2415    const fn establish_proof(&mut self) {
2416        self.has_endpoint_proof = true;
2417        self.find_node_failures = 0;
2418    }
2419
2420    /// Returns true if the tracked find node failures exceed the max amount
2421    const fn exceeds_find_node_failures(&self, max_failures: u8) -> bool {
2422        self.find_node_failures >= max_failures
2423    }
2424
2425    /// Updates the last timestamp and sets the enr seq
2426    fn update_with_enr(&mut self, last_enr_seq: Option<u64>) -> Option<u64> {
2427        self.update_now(|s| std::mem::replace(&mut s.last_enr_seq, last_enr_seq))
2428    }
2429
2430    /// Increases the failed request counter
2431    const fn inc_failed_request(&mut self) {
2432        self.find_node_failures += 1;
2433    }
2434
2435    /// Updates the last timestamp and sets the enr seq
2436    fn update_with_fork_id(&mut self, fork_id: Option<ForkId>) -> Option<ForkId> {
2437        self.update_now(|s| std::mem::replace(&mut s.fork_id, fork_id))
2438    }
2439
2440    /// Updates the `last_seen` timestamp and calls the closure
2441    fn update_now<F, R>(&mut self, f: F) -> R
2442    where
2443        F: FnOnce(&mut Self) -> R,
2444    {
2445        self.last_seen = Instant::now();
2446        f(self)
2447    }
2448}
2449
2450// === impl NodeEntry ===
2451
2452impl NodeEntry {
2453    /// Returns true if the node should be re-pinged.
2454    fn is_expired(&self) -> bool {
2455        self.last_seen.elapsed() > (ENDPOINT_PROOF_EXPIRATION / 2)
2456    }
2457}
2458
2459/// Represents why a ping is issued
2460#[derive(Debug)]
2461enum PingReason {
2462    /// Initial ping to a previously unknown peer that was inserted into the table.
2463    InitialInsert,
2464    /// A ping to a peer to establish a bond (endpoint proof).
2465    EstablishBond,
2466    /// Re-ping a peer.
2467    RePing,
2468    /// Part of a lookup to ensure endpoint is proven before we can send a `FindNode` request.
2469    Lookup(NodeRecord, LookupContext),
2470}
2471
2472/// Represents node related updates state changes in the underlying node table
2473#[derive(Debug, Clone)]
2474pub enum DiscoveryUpdate {
2475    /// A new node was discovered _and_ added to the table.
2476    Added(NodeRecord),
2477    /// A new node was discovered but _not_ added to the table because it is currently full.
2478    DiscoveredAtCapacity(NodeRecord),
2479    /// Received a [`ForkId`] via EIP-868 for the given [`NodeRecord`].
2480    EnrForkId(NodeRecord, ForkId),
2481    /// Node that was removed from the table
2482    Removed(PeerId),
2483    /// A series of updates
2484    Batch(Vec<Self>),
2485}
2486
2487#[cfg(test)]
2488mod tests {
2489    use super::*;
2490    use crate::test_utils::{create_discv4, create_discv4_with_config, rng_endpoint, rng_record};
2491    use alloy_primitives::hex;
2492    use alloy_rlp::{Decodable, Encodable};
2493    use rand_08::Rng;
2494    use reth_ethereum_forks::{EnrForkIdEntry, ForkHash};
2495    use reth_network_peers::mainnet_nodes;
2496    use std::future::poll_fn;
2497
2498    #[tokio::test]
2499    async fn test_configured_enr_forkid_entry() {
2500        let fork: ForkId = ForkId { hash: ForkHash([220, 233, 108, 45]), next: 0u64 };
2501        let mut disc_conf = Discv4Config::default();
2502        disc_conf.add_eip868_pair("eth", EnrForkIdEntry::from(fork));
2503        let (_discv4, service) = create_discv4_with_config(disc_conf).await;
2504        let eth = service.local_eip_868_enr.get_raw_rlp(b"eth").unwrap();
2505        let fork_entry_id = EnrForkIdEntry::decode(&mut &eth[..]).unwrap();
2506
2507        let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2508        let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2509        let expected = EnrForkIdEntry {
2510            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2511        };
2512        assert_eq!(expected, fork_entry_id);
2513        assert_eq!(expected, decoded);
2514    }
2515
2516    #[test]
2517    fn test_enr_forkid_entry_decode() {
2518        let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2519        let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2520        let expected = EnrForkIdEntry {
2521            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2522        };
2523        assert_eq!(expected, decoded);
2524    }
2525
2526    #[test]
2527    fn test_enr_forkid_entry_encode() {
2528        let original = EnrForkIdEntry {
2529            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2530        };
2531        let expected: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2532        let mut encoded = Vec::with_capacity(expected.len());
2533        original.encode(&mut encoded);
2534        assert_eq!(&expected[..], encoded.as_slice());
2535    }
2536
2537    #[test]
2538    fn test_local_rotator() {
2539        let id = PeerId::random();
2540        let mut rotator = LookupTargetRotator::local_only();
2541        assert_eq!(rotator.next(&id), id);
2542        assert_eq!(rotator.next(&id), id);
2543    }
2544
2545    #[test]
2546    fn test_rotator() {
2547        let id = PeerId::random();
2548        let mut rotator = LookupTargetRotator::default();
2549        assert_eq!(rotator.next(&id), id);
2550        assert_ne!(rotator.next(&id), id);
2551        assert_ne!(rotator.next(&id), id);
2552        assert_ne!(rotator.next(&id), id);
2553        assert_eq!(rotator.next(&id), id);
2554    }
2555
2556    #[tokio::test]
2557    async fn test_pending_ping() {
2558        let (_, mut service) = create_discv4().await;
2559
2560        let local_addr = service.local_addr();
2561
2562        let mut num_inserted = 0;
2563        loop {
2564            let node = NodeRecord::new(local_addr, PeerId::random());
2565            if service.add_node(node) {
2566                num_inserted += 1;
2567                assert!(service.pending_pings.contains_key(&node.id));
2568                assert_eq!(service.pending_pings.len(), num_inserted);
2569                if num_inserted == MAX_NODES_PING {
2570                    break
2571                }
2572            }
2573        }
2574
2575        // `pending_pings` is full, insert into `queued_pings`.
2576        num_inserted = 0;
2577        for _ in 0..MAX_NODES_PING {
2578            let node = NodeRecord::new(local_addr, PeerId::random());
2579            if service.add_node(node) {
2580                num_inserted += 1;
2581                assert!(!service.pending_pings.contains_key(&node.id));
2582                assert_eq!(service.pending_pings.len(), MAX_NODES_PING);
2583                assert_eq!(service.queued_pings.len(), num_inserted);
2584            }
2585        }
2586    }
2587
2588    // Bootstraps with mainnet boot nodes
2589    #[tokio::test(flavor = "multi_thread")]
2590    #[ignore]
2591    async fn test_mainnet_lookup() {
2592        reth_tracing::init_test_tracing();
2593        let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2594
2595        let all_nodes = mainnet_nodes();
2596        let config = Discv4Config::builder()
2597            .add_boot_nodes(all_nodes)
2598            .lookup_interval(Duration::from_secs(1))
2599            .add_eip868_pair("eth", fork_id)
2600            .build();
2601        let (_discv4, mut service) = create_discv4_with_config(config).await;
2602
2603        let mut updates = service.update_stream();
2604
2605        let _handle = service.spawn();
2606
2607        let mut table = HashMap::new();
2608        while let Some(update) = updates.next().await {
2609            match update {
2610                DiscoveryUpdate::EnrForkId(record, fork_id) => {
2611                    println!("{record:?}, {fork_id:?}");
2612                }
2613                DiscoveryUpdate::Added(record) => {
2614                    table.insert(record.id, record);
2615                }
2616                DiscoveryUpdate::Removed(id) => {
2617                    table.remove(&id);
2618                }
2619                _ => {}
2620            }
2621            println!("total peers {}", table.len());
2622        }
2623    }
2624
2625    #[tokio::test]
2626    async fn test_mapped_ipv4() {
2627        reth_tracing::init_test_tracing();
2628        let mut rng = rand_08::thread_rng();
2629        let config = Discv4Config::builder().build();
2630        let (_discv4, mut service) = create_discv4_with_config(config).await;
2631
2632        let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2633        let v6 = v4.to_ipv6_mapped();
2634        let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2635
2636        let ping = Ping {
2637            from: rng_endpoint(&mut rng),
2638            to: rng_endpoint(&mut rng),
2639            expire: service.ping_expiration(),
2640            enr_sq: Some(rng.r#gen()),
2641        };
2642
2643        let id = PeerId::random();
2644        service.on_ping(ping, addr, id, B256::random());
2645
2646        let key = kad_key(id);
2647        match service.kbuckets.entry(&key) {
2648            kbucket::Entry::Present(entry, _) => {
2649                let node_addr = entry.value().record.address;
2650                assert!(node_addr.is_ipv4());
2651                assert_eq!(node_addr, IpAddr::from(v4));
2652            }
2653            _ => unreachable!(),
2654        };
2655    }
2656
2657    #[tokio::test]
2658    async fn test_respect_ping_expiration() {
2659        reth_tracing::init_test_tracing();
2660        let mut rng = rand_08::thread_rng();
2661        let config = Discv4Config::builder().build();
2662        let (_discv4, mut service) = create_discv4_with_config(config).await;
2663
2664        let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2665        let v6 = v4.to_ipv6_mapped();
2666        let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2667
2668        let ping = Ping {
2669            from: rng_endpoint(&mut rng),
2670            to: rng_endpoint(&mut rng),
2671            expire: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() - 1,
2672            enr_sq: Some(rng.r#gen()),
2673        };
2674
2675        let id = PeerId::random();
2676        service.on_ping(ping, addr, id, B256::random());
2677
2678        let key = kad_key(id);
2679        match service.kbuckets.entry(&key) {
2680            kbucket::Entry::Absent(_) => {}
2681            _ => unreachable!(),
2682        };
2683    }
2684
2685    #[tokio::test]
2686    async fn test_single_lookups() {
2687        reth_tracing::init_test_tracing();
2688
2689        let config = Discv4Config::builder().build();
2690        let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2691
2692        let id = PeerId::random();
2693        let key = kad_key(id);
2694        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2695
2696        let _ = service.kbuckets.insert_or_update(
2697            &key,
2698            NodeEntry::new_proven(record),
2699            NodeStatus {
2700                direction: ConnectionDirection::Incoming,
2701                state: ConnectionState::Connected,
2702            },
2703        );
2704
2705        service.lookup_self();
2706        assert_eq!(service.pending_find_nodes.len(), 1);
2707
2708        poll_fn(|cx| {
2709            let _ = service.poll(cx);
2710            assert_eq!(service.pending_find_nodes.len(), 1);
2711
2712            Poll::Ready(())
2713        })
2714        .await;
2715    }
2716
2717    #[tokio::test]
2718    async fn test_on_neighbours_recursive_lookup() {
2719        reth_tracing::init_test_tracing();
2720
2721        let config = Discv4Config::builder().build();
2722        let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2723        let (_discv4, mut service2) = create_discv4_with_config(config).await;
2724
2725        let id = PeerId::random();
2726        let key = kad_key(id);
2727        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2728
2729        let _ = service.kbuckets.insert_or_update(
2730            &key,
2731            NodeEntry::new_proven(record),
2732            NodeStatus {
2733                direction: ConnectionDirection::Incoming,
2734                state: ConnectionState::Connected,
2735            },
2736        );
2737        // Needed in this test to populate self.pending_find_nodes for as a prereq to a valid
2738        // on_neighbours request
2739        service.lookup_self();
2740        assert_eq!(service.pending_find_nodes.len(), 1);
2741
2742        poll_fn(|cx| {
2743            let _ = service.poll(cx);
2744            assert_eq!(service.pending_find_nodes.len(), 1);
2745
2746            Poll::Ready(())
2747        })
2748        .await;
2749
2750        let expiry = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() +
2751            10000000000000;
2752        let msg = Neighbours { nodes: vec![service2.local_node_record], expire: expiry };
2753        service.on_neighbours(msg, record.tcp_addr(), id);
2754        // wait for the processed ping
2755        let event = poll_fn(|cx| service2.poll(cx)).await;
2756        assert_eq!(event, Discv4Event::Ping);
2757        // assert that no find_node req has been added here on top of the initial one, since both
2758        // sides of the endpoint proof is not completed here
2759        assert_eq!(service.pending_find_nodes.len(), 1);
2760        // we now wait for PONG
2761        let event = poll_fn(|cx| service.poll(cx)).await;
2762        assert_eq!(event, Discv4Event::Pong);
2763        // Ideally we want to assert against service.pending_lookup.len() here - but because the
2764        // service2 sends Pong and Ping consecutivley on_ping(), the pending_lookup table gets
2765        // drained almost immediately - and no way to grab the handle to its intermediary state here
2766        // :(
2767        let event = poll_fn(|cx| service.poll(cx)).await;
2768        assert_eq!(event, Discv4Event::Ping);
2769        // assert that we've added the find_node req here after both sides of the endpoint proof is
2770        // done
2771        assert_eq!(service.pending_find_nodes.len(), 2);
2772    }
2773
2774    #[tokio::test]
2775    async fn test_no_local_in_closest() {
2776        reth_tracing::init_test_tracing();
2777
2778        let config = Discv4Config::builder().build();
2779        let (_discv4, mut service) = create_discv4_with_config(config).await;
2780
2781        let target_key = kad_key(PeerId::random());
2782
2783        let id = PeerId::random();
2784        let key = kad_key(id);
2785        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2786
2787        let _ = service.kbuckets.insert_or_update(
2788            &key,
2789            NodeEntry::new(record),
2790            NodeStatus {
2791                direction: ConnectionDirection::Incoming,
2792                state: ConnectionState::Connected,
2793            },
2794        );
2795
2796        let closest = service
2797            .kbuckets
2798            .closest_values(&target_key)
2799            .map(|n| n.value.record)
2800            .take(MAX_NODES_PER_BUCKET)
2801            .collect::<Vec<_>>();
2802
2803        assert_eq!(closest.len(), 1);
2804        assert!(!closest.iter().any(|r| r.id == *service.local_peer_id()));
2805    }
2806
2807    #[tokio::test]
2808    async fn test_random_lookup() {
2809        reth_tracing::init_test_tracing();
2810
2811        let config = Discv4Config::builder().build();
2812        let (_discv4, mut service) = create_discv4_with_config(config).await;
2813
2814        let target = PeerId::random();
2815
2816        let id = PeerId::random();
2817        let key = kad_key(id);
2818        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2819
2820        let _ = service.kbuckets.insert_or_update(
2821            &key,
2822            NodeEntry::new_proven(record),
2823            NodeStatus {
2824                direction: ConnectionDirection::Incoming,
2825                state: ConnectionState::Connected,
2826            },
2827        );
2828
2829        service.lookup(target);
2830        assert_eq!(service.pending_find_nodes.len(), 1);
2831
2832        let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2833
2834        assert_eq!(ctx.target(), target);
2835        assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2836
2837        ctx.add_node(record);
2838        assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2839    }
2840
2841    #[tokio::test]
2842    async fn test_reping_on_find_node_failures() {
2843        reth_tracing::init_test_tracing();
2844
2845        let config = Discv4Config::builder().build();
2846        let (_discv4, mut service) = create_discv4_with_config(config).await;
2847
2848        let target = PeerId::random();
2849
2850        let id = PeerId::random();
2851        let key = kad_key(id);
2852        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2853
2854        let mut entry = NodeEntry::new_proven(record);
2855        entry.find_node_failures = u8::MAX;
2856        let _ = service.kbuckets.insert_or_update(
2857            &key,
2858            entry,
2859            NodeStatus {
2860                direction: ConnectionDirection::Incoming,
2861                state: ConnectionState::Connected,
2862            },
2863        );
2864
2865        service.lookup(target);
2866        assert_eq!(service.pending_find_nodes.len(), 0);
2867        assert_eq!(service.pending_pings.len(), 1);
2868
2869        service.update_on_pong(record, None);
2870
2871        service
2872            .on_entry(record.id, |entry| {
2873                // reset on pong
2874                assert_eq!(entry.find_node_failures, 0);
2875                assert!(entry.has_endpoint_proof);
2876            })
2877            .unwrap();
2878    }
2879
2880    #[tokio::test]
2881    async fn test_service_commands() {
2882        reth_tracing::init_test_tracing();
2883
2884        let config = Discv4Config::builder().build();
2885        let (discv4, mut service) = create_discv4_with_config(config).await;
2886
2887        service.lookup_self();
2888
2889        let _handle = service.spawn();
2890        discv4.send_lookup_self();
2891        let _ = discv4.lookup_self().await;
2892    }
2893
2894    #[tokio::test]
2895    async fn test_requests_timeout() {
2896        reth_tracing::init_test_tracing();
2897        let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2898
2899        let config = Discv4Config::builder()
2900            .request_timeout(Duration::from_millis(200))
2901            .ping_expiration(Duration::from_millis(200))
2902            .lookup_neighbours_expiration(Duration::from_millis(200))
2903            .add_eip868_pair("eth", fork_id)
2904            .build();
2905        let (_disv4, mut service) = create_discv4_with_config(config).await;
2906
2907        let id = PeerId::random();
2908        let key = kad_key(id);
2909        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2910
2911        let _ = service.kbuckets.insert_or_update(
2912            &key,
2913            NodeEntry::new_proven(record),
2914            NodeStatus {
2915                direction: ConnectionDirection::Incoming,
2916                state: ConnectionState::Connected,
2917            },
2918        );
2919
2920        service.lookup_self();
2921        assert_eq!(service.pending_find_nodes.len(), 1);
2922
2923        let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2924
2925        service.pending_lookup.insert(record.id, (Instant::now(), ctx));
2926
2927        assert_eq!(service.pending_lookup.len(), 1);
2928
2929        let ping = Ping {
2930            from: service.local_node_record.into(),
2931            to: record.into(),
2932            expire: service.ping_expiration(),
2933            enr_sq: service.enr_seq(),
2934        };
2935        let echo_hash = service.send_packet(Message::Ping(ping), record.udp_addr());
2936        let ping_request = PingRequest {
2937            sent_at: Instant::now(),
2938            node: record,
2939            echo_hash,
2940            reason: PingReason::InitialInsert,
2941        };
2942        service.pending_pings.insert(record.id, ping_request);
2943
2944        assert_eq!(service.pending_pings.len(), 1);
2945
2946        tokio::time::sleep(Duration::from_secs(1)).await;
2947
2948        poll_fn(|cx| {
2949            let _ = service.poll(cx);
2950
2951            assert_eq!(service.pending_find_nodes.len(), 0);
2952            assert_eq!(service.pending_lookup.len(), 0);
2953            assert_eq!(service.pending_pings.len(), 0);
2954
2955            Poll::Ready(())
2956        })
2957        .await;
2958    }
2959
2960    // sends a PING packet with wrong 'to' field and expects a PONG response.
2961    #[tokio::test(flavor = "multi_thread")]
2962    async fn test_check_wrong_to() {
2963        reth_tracing::init_test_tracing();
2964
2965        let config = Discv4Config::builder().external_ip_resolver(None).build();
2966        let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2967        let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2968
2969        // ping node 2 with wrong to field
2970        let mut ping = Ping {
2971            from: service_1.local_node_record.into(),
2972            to: service_2.local_node_record.into(),
2973            expire: service_1.ping_expiration(),
2974            enr_sq: service_1.enr_seq(),
2975        };
2976        ping.to.address = "192.0.2.0".parse().unwrap();
2977
2978        let echo_hash = service_1.send_packet(Message::Ping(ping), service_2.local_addr());
2979        let ping_request = PingRequest {
2980            sent_at: Instant::now(),
2981            node: service_2.local_node_record,
2982            echo_hash,
2983            reason: PingReason::InitialInsert,
2984        };
2985        service_1.pending_pings.insert(*service_2.local_peer_id(), ping_request);
2986
2987        // wait for the processed ping
2988        let event = poll_fn(|cx| service_2.poll(cx)).await;
2989        assert_eq!(event, Discv4Event::Ping);
2990
2991        // we now wait for PONG
2992        let event = poll_fn(|cx| service_1.poll(cx)).await;
2993        assert_eq!(event, Discv4Event::Pong);
2994        // followed by a ping
2995        let event = poll_fn(|cx| service_1.poll(cx)).await;
2996        assert_eq!(event, Discv4Event::Ping);
2997    }
2998
2999    #[tokio::test(flavor = "multi_thread")]
3000    async fn test_check_ping_pong() {
3001        reth_tracing::init_test_tracing();
3002
3003        let config = Discv4Config::builder().external_ip_resolver(None).build();
3004        let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
3005        let (_discv4, mut service_2) = create_discv4_with_config(config).await;
3006
3007        // send ping from 1 -> 2
3008        service_1.add_node(service_2.local_node_record);
3009
3010        // wait for the processed ping
3011        let event = poll_fn(|cx| service_2.poll(cx)).await;
3012        assert_eq!(event, Discv4Event::Ping);
3013
3014        // node is now in the table but not connected yet
3015        let key1 = kad_key(*service_1.local_peer_id());
3016        match service_2.kbuckets.entry(&key1) {
3017            kbucket::Entry::Present(_entry, status) => {
3018                assert!(!status.is_connected());
3019            }
3020            _ => unreachable!(),
3021        }
3022
3023        // we now wait for PONG
3024        let event = poll_fn(|cx| service_1.poll(cx)).await;
3025        assert_eq!(event, Discv4Event::Pong);
3026
3027        // endpoint is proven
3028        let key2 = kad_key(*service_2.local_peer_id());
3029        match service_1.kbuckets.entry(&key2) {
3030            kbucket::Entry::Present(_entry, status) => {
3031                assert!(status.is_connected());
3032            }
3033            _ => unreachable!(),
3034        }
3035
3036        // we now wait for the PING initiated by 2
3037        let event = poll_fn(|cx| service_1.poll(cx)).await;
3038        assert_eq!(event, Discv4Event::Ping);
3039
3040        // Drain events from service_2 until we see the Pong. Intermediate EnrRequest and
3041        // FindNode events are expected: ENR requests come from the ping handshake, and FindNode
3042        // arrives because service_1 resets its lookup interval on the first bootnode pong.
3043        tokio::time::timeout(Duration::from_secs(5), async {
3044            loop {
3045                let event = poll_fn(|cx| service_2.poll(cx)).await;
3046                match event {
3047                    Discv4Event::Pong => break,
3048                    Discv4Event::EnrRequest | Discv4Event::FindNode => {}
3049                    ev => unreachable!("{ev:?}"),
3050                }
3051            }
3052        })
3053        .await
3054        .expect("timed out waiting for Pong from service_2");
3055
3056        // endpoint is proven
3057        match service_2.kbuckets.entry(&key1) {
3058            kbucket::Entry::Present(_entry, status) => {
3059                assert!(status.is_connected());
3060            }
3061            ev => unreachable!("{ev:?}"),
3062        }
3063    }
3064
3065    #[test]
3066    fn test_insert() {
3067        let local_node_record = rng_record(&mut rand_08::thread_rng());
3068        let mut kbuckets: KBucketsTable<NodeKey, NodeEntry> = KBucketsTable::new(
3069            NodeKey::from(&local_node_record).into(),
3070            Duration::from_secs(60),
3071            MAX_NODES_PER_BUCKET,
3072            None,
3073            None,
3074        );
3075
3076        let new_record = rng_record(&mut rand_08::thread_rng());
3077        let key = kad_key(new_record.id);
3078        match kbuckets.entry(&key) {
3079            kbucket::Entry::Absent(entry) => {
3080                let node = NodeEntry::new(new_record);
3081                let _ = entry.insert(
3082                    node,
3083                    NodeStatus {
3084                        direction: ConnectionDirection::Outgoing,
3085                        state: ConnectionState::Disconnected,
3086                    },
3087                );
3088            }
3089            _ => {
3090                unreachable!()
3091            }
3092        };
3093        match kbuckets.entry(&key) {
3094            kbucket::Entry::Present(_, _) => {}
3095            _ => {
3096                unreachable!()
3097            }
3098        }
3099    }
3100
3101    #[tokio::test]
3102    async fn test_bootnode_not_in_update_stream() {
3103        reth_tracing::init_test_tracing();
3104        let (_, service_1) = create_discv4().await;
3105        let peerid_1 = *service_1.local_peer_id();
3106
3107        let config = Discv4Config::builder().add_boot_node(service_1.local_node_record).build();
3108        service_1.spawn();
3109
3110        let (_, mut service_2) = create_discv4_with_config(config).await;
3111
3112        let mut updates = service_2.update_stream();
3113
3114        service_2.spawn();
3115
3116        // Poll for events for a reasonable time
3117        let mut bootnode_appeared = false;
3118        let timeout = tokio::time::sleep(Duration::from_secs(1));
3119        tokio::pin!(timeout);
3120
3121        loop {
3122            tokio::select! {
3123                Some(update) = updates.next() => {
3124                    if let DiscoveryUpdate::Added(record) = update
3125                        && record.id == peerid_1 {
3126                            bootnode_appeared = true;
3127                            break;
3128                        }
3129                }
3130                _ = &mut timeout => break,
3131            }
3132        }
3133
3134        // Assert bootnode did not appear in update stream
3135        assert!(bootnode_appeared, "Bootnode should appear in update stream");
3136    }
3137
3138    fn insert_proven_node(service: &mut Discv4Service, record: NodeRecord) {
3139        let key = kad_key(record.id);
3140        let _ = service.kbuckets.insert_or_update(
3141            &key,
3142            NodeEntry::new_proven(record),
3143            NodeStatus {
3144                direction: ConnectionDirection::Incoming,
3145                state: ConnectionState::Connected,
3146            },
3147        );
3148    }
3149
3150    fn insert_initial_ping(service: &mut Discv4Service, record: NodeRecord) -> B256 {
3151        let echo_hash = B256::random();
3152        service.pending_pings.insert(
3153            record.id,
3154            PingRequest {
3155                sent_at: Instant::now(),
3156                node: record,
3157                echo_hash,
3158                reason: PingReason::InitialInsert,
3159            },
3160        );
3161        echo_hash
3162    }
3163
3164    fn make_pong(service: &Discv4Service, echo_hash: B256) -> Pong {
3165        Pong {
3166            to: rng_endpoint(&mut rand_08::thread_rng()),
3167            echo: echo_hash,
3168            expire: service.ping_expiration(),
3169            enr_sq: None,
3170        }
3171    }
3172
3173    #[tokio::test]
3174    async fn test_lookup_reset_on_first_bootnode_pong() {
3175        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
3176        let config = Discv4Config::builder().add_boot_node(record).build();
3177        let (_discv4, mut service) = create_discv4_with_config(config).await;
3178
3179        // 1. initial state
3180        assert!(service.pending_lookup_reset);
3181
3182        // 2. setup: proven bootnode + pending InitialInsert ping
3183        insert_proven_node(&mut service, record);
3184        let echo_hash = insert_initial_ping(&mut service, record);
3185
3186        // 3. input: pong arrives
3187        service.on_pong(make_pong(&service, echo_hash), record.udp_addr(), record.id);
3188
3189        // 4. flag should be consumed — interval was reset
3190        assert!(!service.pending_lookup_reset, "flag should be consumed");
3191    }
3192
3193    #[tokio::test]
3194    async fn test_lookup_reset_fires_only_once() {
3195        let records: Vec<_> = (0..2)
3196            .map(|_| NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random()))
3197            .collect();
3198        let config = Discv4Config::builder().add_boot_nodes(records.clone()).build();
3199        let (_discv4, mut service) = create_discv4_with_config(config).await;
3200
3201        // 1. setup: two proven bootnodes with pending InitialInsert pings
3202        for &r in &records {
3203            insert_proven_node(&mut service, r);
3204        }
3205        let hashes: Vec<_> =
3206            records.iter().map(|r| insert_initial_ping(&mut service, *r)).collect();
3207
3208        // 2. first pong -> consumes the flag (resets the interval)
3209        service.on_pong(make_pong(&service, hashes[0]), records[0].udp_addr(), records[0].id);
3210        assert!(!service.pending_lookup_reset);
3211
3212        // 3. second pong -> flag already consumed, no second reset
3213        service.on_pong(make_pong(&service, hashes[1]), records[1].udp_addr(), records[1].id);
3214        assert!(!service.pending_lookup_reset);
3215    }
3216
3217    #[tokio::test]
3218    async fn test_lookup_reset_not_triggered_by_non_bootnode() {
3219        let bootnode = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
3220        let config = Discv4Config::builder().add_boot_node(bootnode).build();
3221        let (_discv4, mut service) = create_discv4_with_config(config).await;
3222
3223        assert!(service.pending_lookup_reset);
3224
3225        // a non-bootnode pong should not consume the flag
3226        let stranger = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
3227        insert_proven_node(&mut service, stranger);
3228        let echo_hash = insert_initial_ping(&mut service, stranger);
3229        service.on_pong(make_pong(&service, echo_hash), stranger.udp_addr(), stranger.id);
3230
3231        assert!(service.pending_lookup_reset, "flag should not be consumed by non-bootnode");
3232    }
3233
3234    #[tokio::test]
3235    async fn test_lookup_reset_disabled_when_lookup_disabled() {
3236        let config = Discv4Config::builder().enable_lookup(false).build();
3237        let (_discv4, service) = create_discv4_with_config(config).await;
3238
3239        // flag should be false when lookups are disabled
3240        assert!(!service.pending_lookup_reset);
3241    }
3242}