Skip to main content

reth_discv4/
lib.rs

1//! Discovery v4 implementation: <https://github.com/ethereum/devp2p/blob/master/discv4.md>
2//!
3//! Discv4 employs a kademlia-like routing table to store and manage discovered peers and topics.
4//! The protocol allows for external IP discovery in NAT environments through regular PING/PONG's
5//! with discovered nodes. Nodes return the external IP address that they have received and a simple
6//! majority is chosen as our external IP address. If an external IP address is updated, this is
7//! produced as an event to notify the swarm (if one is used for this behaviour).
8//!
9//! This implementation consists of a [`Discv4`] and [`Discv4Service`] pair. The service manages the
10//! state and drives the UDP socket. The (optional) [`Discv4`] serves as the frontend to interact
11//! with the service via a channel. Whenever the underlying table changes service produces a
12//! [`DiscoveryUpdate`] that listeners will receive.
13//!
14//! ## Feature Flags
15//!
16//! - `serde` (default): Enable serde support
17//! - `test-utils`: Export utilities for testing
18
19#![doc(
20    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
21    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
22    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
23)]
24#![cfg_attr(not(test), warn(unused_crate_dependencies))]
25#![cfg_attr(docsrs, feature(doc_cfg))]
26
27use crate::{
28    error::{DecodePacketError, Discv4Error},
29    proto::{FindNode, Message, Neighbours, Packet, Ping, Pong},
30};
31use alloy_primitives::{bytes::Bytes, hex, B256};
32use discv5::{
33    kbucket,
34    kbucket::{
35        BucketInsertResult, Distance, Entry as BucketEntry, InsertResult, KBucketsTable,
36        NodeStatus, MAX_NODES_PER_BUCKET,
37    },
38    ConnectionDirection, ConnectionState,
39};
40use enr::Enr;
41use itertools::Itertools;
42use parking_lot::Mutex;
43use proto::{EnrRequest, EnrResponse};
44use reth_ethereum_forks::ForkId;
45use reth_network_peers::{pk2id, PeerId};
46use secp256k1::SecretKey;
47use std::{
48    cell::RefCell,
49    collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque},
50    fmt,
51    future::poll_fn,
52    io,
53    net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
54    pin::Pin,
55    rc::Rc,
56    sync::Arc,
57    task::{ready, Context, Poll},
58    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
59};
60use tokio::{
61    net::UdpSocket,
62    sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::Sender as OneshotSender},
63    task::{JoinHandle, JoinSet},
64    time::Interval,
65};
66use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
67use tracing::{debug, trace};
68
69pub mod error;
70pub mod proto;
71
72mod config;
73pub use config::{Discv4Config, Discv4ConfigBuilder};
74
75mod node;
76use node::{kad_key, NodeKey};
77
78mod table;
79
80// reexport NodeRecord primitive
81pub use reth_network_peers::NodeRecord;
82
83#[cfg(any(test, feature = "test-utils"))]
84pub mod test_utils;
85
86use crate::table::PongTable;
87use reth_net_nat::ResolveNatInterval;
88/// reexport to get public ip.
89pub use reth_net_nat::{external_ip, NatResolver};
90
91/// The default address for discv4 via UDP
92///
93/// Note: the default TCP address is the same.
94pub const DEFAULT_DISCOVERY_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
95
96/// The default port for discv4 via UDP
97///
98/// Note: the default TCP port is the same.
99pub const DEFAULT_DISCOVERY_PORT: u16 = 30303;
100
101/// The default address for discv4 via UDP: "0.0.0.0:30303"
102///
103/// Note: The default TCP address is the same.
104pub const DEFAULT_DISCOVERY_ADDRESS: SocketAddr =
105    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT));
106
107/// The maximum size of any packet is 1280 bytes.
108const MAX_PACKET_SIZE: usize = 1280;
109
110/// Length of the UDP datagram packet-header: Hash(32b) + Signature(65b) + Packet Type(1b)
111const MIN_PACKET_SIZE: usize = 32 + 65 + 1;
112
113/// Concurrency factor for `FindNode` requests to pick `ALPHA` closest nodes, <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>
114const ALPHA: usize = 3;
115
116/// Maximum number of nodes to ping at concurrently.
117///
118/// This corresponds to 2 full `Neighbours` responses with 16 _new_ nodes. This will apply some
119/// backpressure in recursive lookups.
120const MAX_NODES_PING: usize = 2 * MAX_NODES_PER_BUCKET;
121
122/// Maximum number of pings to keep queued.
123///
124/// If we are currently sending too many pings, any new pings will be queued. To prevent unbounded
125/// growth of the queue, the queue has a maximum capacity, after which any additional pings will be
126/// discarded.
127///
128/// This corresponds to 2 full `Neighbours` responses with 16 new nodes.
129const MAX_QUEUED_PINGS: usize = 2 * MAX_NODES_PER_BUCKET;
130
131/// The size of the datagram is limited [`MAX_PACKET_SIZE`], 16 nodes, as the discv4 specifies don't
132/// fit in one datagram. The safe number of nodes that always fit in a datagram is 12, with worst
133/// case all of them being IPv6 nodes. This is calculated by `(MAX_PACKET_SIZE - (header + expire +
134/// rlp overhead) / size(rlp(Node_IPv6))`
135/// Even in the best case where all nodes are IPv4, only 14 nodes fit into one packet.
136const SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS: usize = (MAX_PACKET_SIZE - 109) / 91;
137
138/// The timeout used to identify expired nodes, 24h
139///
140/// Mirrors geth's `bondExpiration` of 24h
141const ENDPOINT_PROOF_EXPIRATION: Duration = Duration::from_secs(24 * 60 * 60);
142
143/// Duration used to expire nodes from the routing table 1hr
144const EXPIRE_DURATION: Duration = Duration::from_secs(60 * 60);
145
146// Restricts how many udp messages can be processed in a single [Discv4Service::poll] call.
147//
148// This will act as a manual yield point when draining the socket messages where the most CPU
149// expensive part is handling outgoing messages: encoding and hashing the packet
150const UDP_MESSAGE_POLL_LOOP_BUDGET: i32 = 4;
151
152type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>;
153type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>;
154
155pub(crate) type IngressSender = mpsc::Sender<IngressEvent>;
156pub(crate) type IngressReceiver = mpsc::Receiver<IngressEvent>;
157
158type NodeRecordSender = OneshotSender<Vec<NodeRecord>>;
159
160/// The Discv4 frontend.
161///
162/// This is a cloneable type that communicates with the [`Discv4Service`] by sending commands over a
163/// shared channel.
164///
165/// See also [`Discv4::spawn`]
166#[derive(Debug, Clone)]
167pub struct Discv4 {
168    /// The address of the udp socket
169    local_addr: SocketAddr,
170    /// channel to send commands over to the service
171    to_service: mpsc::UnboundedSender<Discv4Command>,
172    /// Tracks the local node record.
173    ///
174    /// This includes the currently tracked external IP address of the node.
175    node_record: Arc<Mutex<NodeRecord>>,
176}
177
178impl Discv4 {
179    /// Same as [`Self::bind`] but also spawns the service onto a new task.
180    ///
181    /// See also: [`Discv4Service::spawn()`]
182    pub async fn spawn(
183        local_address: SocketAddr,
184        local_enr: NodeRecord,
185        secret_key: SecretKey,
186        config: Discv4Config,
187    ) -> io::Result<Self> {
188        let (discv4, service) = Self::bind(local_address, local_enr, secret_key, config).await?;
189
190        service.spawn();
191
192        Ok(discv4)
193    }
194
195    /// Returns a new instance with the given channel directly
196    ///
197    /// NOTE: this is only intended for test setups.
198    #[cfg(feature = "test-utils")]
199    pub fn noop() -> Self {
200        let (to_service, _rx) = mpsc::unbounded_channel();
201        let local_addr =
202            (IpAddr::from(std::net::Ipv4Addr::UNSPECIFIED), DEFAULT_DISCOVERY_PORT).into();
203        Self {
204            local_addr,
205            to_service,
206            node_record: Arc::new(Mutex::new(NodeRecord::new(
207                "127.0.0.1:3030".parse().unwrap(),
208                PeerId::random(),
209            ))),
210        }
211    }
212
213    /// Binds a new `UdpSocket` and creates the service
214    ///
215    /// ```
216    /// use reth_discv4::{Discv4, Discv4Config};
217    /// use reth_network_peers::{pk2id, NodeRecord, PeerId};
218    /// use secp256k1::SECP256K1;
219    /// use std::{net::SocketAddr, str::FromStr};
220    /// # async fn t() -> std:: io::Result<()> {
221    ///
222    /// // generate a (random) keypair
223    /// let (secret_key, pk) = SECP256K1.generate_keypair(&mut rand_08::thread_rng());
224    /// let id = pk2id(&pk);
225    ///
226    /// let socket = SocketAddr::from_str("0.0.0.0:0").unwrap();
227    /// let local_enr =
228    ///     NodeRecord { address: socket.ip(), tcp_port: socket.port(), udp_port: socket.port(), id };
229    /// let config = Discv4Config::default();
230    ///
231    /// let (discv4, mut service) = Discv4::bind(socket, local_enr, secret_key, config).await.unwrap();
232    ///
233    /// // get an update strea
234    /// let updates = service.update_stream();
235    ///
236    /// let _handle = service.spawn();
237    ///
238    /// // lookup the local node in the DHT
239    /// let _discovered = discv4.lookup_self().await.unwrap();
240    ///
241    /// # Ok(())
242    /// # }
243    /// ```
244    pub async fn bind(
245        local_address: SocketAddr,
246        mut local_node_record: NodeRecord,
247        secret_key: SecretKey,
248        config: Discv4Config,
249    ) -> io::Result<(Self, Discv4Service)> {
250        let socket = UdpSocket::bind(local_address).await?;
251        let local_addr = socket.local_addr()?;
252        local_node_record.udp_port = local_addr.port();
253        trace!(target: "discv4", ?local_addr,"opened UDP socket");
254
255        let mut service =
256            Discv4Service::new(socket, local_addr, local_node_record, secret_key, config);
257
258        // resolve the external address immediately
259        service.resolve_external_ip();
260
261        let discv4 = service.handle();
262        Ok((discv4, service))
263    }
264
265    /// Returns the address of the UDP socket.
266    pub const fn local_addr(&self) -> SocketAddr {
267        self.local_addr
268    }
269
270    /// Returns the [`NodeRecord`] of the local node.
271    ///
272    /// This includes the currently tracked external IP address of the node.
273    pub fn node_record(&self) -> NodeRecord {
274        *self.node_record.lock()
275    }
276
277    /// Returns the currently tracked external IP of the node.
278    pub fn external_ip(&self) -> IpAddr {
279        self.node_record.lock().address
280    }
281
282    /// Sets the [Interval] used for periodically looking up targets over the network
283    pub fn set_lookup_interval(&self, duration: Duration) {
284        self.send_to_service(Discv4Command::SetLookupInterval(duration))
285    }
286
287    /// Starts a `FindNode` recursive lookup that locates the closest nodes to the given node id. See also: <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>
288    ///
289    /// The lookup initiator starts by picking α closest nodes to the target it knows of. The
290    /// initiator then sends concurrent `FindNode` packets to those nodes. α is a system-wide
291    /// concurrency parameter, such as 3. In the recursive step, the initiator resends `FindNode` to
292    /// nodes it has learned about from previous queries. Of the k nodes the initiator has heard of
293    /// closest to the target, it picks α that it has not yet queried and resends `FindNode` to
294    /// them. Nodes that fail to respond quickly are removed from consideration until and unless
295    /// they do respond.
296    //
297    // If a round of FindNode queries fails to return a node any closer than the closest already
298    // seen, the initiator resends the find node to all of the k closest nodes it has not already
299    // queried. The lookup terminates when the initiator has queried and gotten responses from the k
300    // closest nodes it has seen.
301    pub async fn lookup_self(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
302        self.lookup_node(None).await
303    }
304
305    /// Looks up the given node id.
306    ///
307    /// Returning the closest nodes to the given node id.
308    pub async fn lookup(&self, node_id: PeerId) -> Result<Vec<NodeRecord>, Discv4Error> {
309        self.lookup_node(Some(node_id)).await
310    }
311
312    /// Performs a random lookup for node records.
313    pub async fn lookup_random(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
314        let target = PeerId::random();
315        self.lookup_node(Some(target)).await
316    }
317
318    /// Sends a message to the service to lookup the closest nodes
319    pub fn send_lookup(&self, node_id: PeerId) {
320        let cmd = Discv4Command::Lookup { node_id: Some(node_id), tx: None };
321        self.send_to_service(cmd);
322    }
323
324    async fn lookup_node(&self, node_id: Option<PeerId>) -> Result<Vec<NodeRecord>, Discv4Error> {
325        let (tx, rx) = oneshot::channel();
326        let cmd = Discv4Command::Lookup { node_id, tx: Some(tx) };
327        self.to_service.send(cmd)?;
328        Ok(rx.await?)
329    }
330
331    /// Triggers a new self lookup without expecting a response
332    pub fn send_lookup_self(&self) {
333        let cmd = Discv4Command::Lookup { node_id: None, tx: None };
334        self.send_to_service(cmd);
335    }
336
337    /// Removes the peer from the table, if it exists.
338    pub fn remove_peer(&self, node_id: PeerId) {
339        let cmd = Discv4Command::Remove(node_id);
340        self.send_to_service(cmd);
341    }
342
343    /// Adds the node to the table, if it is not already present.
344    pub fn add_node(&self, node_record: NodeRecord) {
345        let cmd = Discv4Command::Add(node_record);
346        self.send_to_service(cmd);
347    }
348
349    /// Adds the peer and id to the ban list.
350    ///
351    /// This will prevent any future inclusion in the table
352    pub fn ban(&self, node_id: PeerId, ip: IpAddr) {
353        let cmd = Discv4Command::Ban(node_id, ip);
354        self.send_to_service(cmd);
355    }
356
357    /// Adds the ip to the ban list.
358    ///
359    /// This will prevent any future inclusion in the table
360    pub fn ban_ip(&self, ip: IpAddr) {
361        let cmd = Discv4Command::BanIp(ip);
362        self.send_to_service(cmd);
363    }
364
365    /// Adds the peer to the ban list.
366    ///
367    /// This will prevent any future inclusion in the table
368    pub fn ban_node(&self, node_id: PeerId) {
369        let cmd = Discv4Command::BanPeer(node_id);
370        self.send_to_service(cmd);
371    }
372
373    /// Sets the tcp port
374    ///
375    /// This will update our [`NodeRecord`]'s tcp port.
376    pub fn set_tcp_port(&self, port: u16) {
377        let cmd = Discv4Command::SetTcpPort(port);
378        self.send_to_service(cmd);
379    }
380
381    /// Sets the pair in the EIP-868 [`Enr`] of the node.
382    ///
383    /// If the key already exists, this will update it.
384    ///
385    /// CAUTION: The value **must** be rlp encoded
386    pub fn set_eip868_rlp_pair(&self, key: Vec<u8>, rlp: Bytes) {
387        let cmd = Discv4Command::SetEIP868RLPPair { key, rlp };
388        self.send_to_service(cmd);
389    }
390
391    /// Sets the pair in the EIP-868 [`Enr`] of the node.
392    ///
393    /// If the key already exists, this will update it.
394    pub fn set_eip868_rlp(&self, key: Vec<u8>, value: impl alloy_rlp::Encodable) {
395        self.set_eip868_rlp_pair(key, Bytes::from(alloy_rlp::encode(&value)))
396    }
397
398    #[inline]
399    fn send_to_service(&self, cmd: Discv4Command) {
400        let _ = self.to_service.send(cmd).map_err(|err| {
401            debug!(
402                target: "discv4",
403                %err,
404                "channel capacity reached, dropping command",
405            )
406        });
407    }
408
409    /// Returns the receiver half of new listener channel that streams [`DiscoveryUpdate`]s.
410    pub async fn update_stream(&self) -> Result<ReceiverStream<DiscoveryUpdate>, Discv4Error> {
411        let (tx, rx) = oneshot::channel();
412        let cmd = Discv4Command::Updates(tx);
413        self.to_service.send(cmd)?;
414        Ok(rx.await?)
415    }
416
417    /// Terminates the spawned [`Discv4Service`].
418    pub fn terminate(&self) {
419        self.send_to_service(Discv4Command::Terminated);
420    }
421}
422
423/// Manages discv4 peer discovery over UDP.
424///
425/// This is a [Stream] to handles incoming and outgoing discv4 messages and emits updates via:
426/// [`Discv4Service::update_stream`].
427///
428/// This type maintains the discv Kademlia routing table and is responsible for performing lookups.
429///
430/// ## Lookups
431///
432/// See also [Recursive Lookups](https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup).
433/// Lookups are either triggered periodically or performaned on demand: [`Discv4::lookup`]
434/// Newly discovered nodes are emitted as [`DiscoveryUpdate::Added`] event to all subscribers:
435/// [`Discv4Service::update_stream`].
436#[must_use = "Stream does nothing unless polled"]
437pub struct Discv4Service {
438    /// Local address of the UDP socket.
439    local_address: SocketAddr,
440    /// The local ENR for EIP-868 <https://eips.ethereum.org/EIPS/eip-868>
441    local_eip_868_enr: Enr<SecretKey>,
442    /// Local ENR of the server.
443    local_node_record: NodeRecord,
444    /// Keeps track of the node record of the local node.
445    shared_node_record: Arc<Mutex<NodeRecord>>,
446    /// The secret key used to sign payloads
447    secret_key: SecretKey,
448    /// The UDP socket for sending and receiving messages.
449    _socket: Arc<UdpSocket>,
450    /// The spawned UDP tasks.
451    ///
452    /// Note: If dropped, the spawned send+receive tasks are aborted.
453    _tasks: JoinSet<()>,
454    /// The routing table.
455    kbuckets: KBucketsTable<NodeKey, NodeEntry>,
456    /// Receiver for incoming messages
457    ///
458    /// Receives incoming messages from the UDP task.
459    ingress: IngressReceiver,
460    /// Sender for sending outgoing messages
461    ///
462    /// Sends outgoing messages to the UDP task.
463    egress: EgressSender,
464    /// Buffered pending pings to apply backpressure.
465    ///
466    /// Lookups behave like bursts of requests: Endpoint proof followed by `FindNode` request. [Recursive lookups](https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup) can trigger multiple followup Pings+FindNode requests.
467    /// A cap on concurrent `Ping` prevents escalation where: A large number of new nodes
468    /// discovered via `FindNode` in a recursive lookup triggers a large number of `Ping`s, and
469    /// followup `FindNode` requests.... Buffering them effectively prevents high `Ping` peaks.
470    queued_pings: VecDeque<(NodeRecord, PingReason)>,
471    /// Currently active pings to specific nodes.
472    pending_pings: HashMap<PeerId, PingRequest>,
473    /// Currently active endpoint proof verification lookups to specific nodes.
474    ///
475    /// Entries here means we've proven the peer's endpoint but haven't completed our end of the
476    /// endpoint proof
477    pending_lookup: HashMap<PeerId, (Instant, LookupContext)>,
478    /// Currently active `FindNode` requests
479    pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
480    /// Currently active ENR requests
481    pending_enr_requests: HashMap<PeerId, EnrRequestState>,
482    /// Copy of the sender half of the commands channel for [Discv4]
483    to_service: mpsc::UnboundedSender<Discv4Command>,
484    /// Receiver half of the commands channel for [Discv4]
485    commands_rx: mpsc::UnboundedReceiver<Discv4Command>,
486    /// All subscribers for table updates
487    update_listeners: Vec<mpsc::Sender<DiscoveryUpdate>>,
488    /// The interval when to trigger random lookups
489    lookup_interval: Interval,
490    /// Used to rotate targets to lookup
491    lookup_rotator: LookupTargetRotator,
492    /// Whether we still need to reset the lookup interval on the first bootnode pong.
493    pending_lookup_reset: bool,
494    /// Interval when to recheck active requests
495    evict_expired_requests_interval: Interval,
496    /// Interval when to resend pings.
497    ping_interval: Interval,
498    /// The interval at which to attempt resolving external IP again.
499    resolve_external_ip_interval: Option<ResolveNatInterval>,
500    /// How this services is configured
501    config: Discv4Config,
502    /// Buffered events populated during poll.
503    queued_events: VecDeque<Discv4Event>,
504    /// Keeps track of nodes from which we have received a `Pong` message.
505    received_pongs: PongTable,
506    /// Interval used to expire additionally tracked nodes
507    expire_interval: Interval,
508    /// Cached signed `FindNode` packet to avoid redundant ECDSA signing during lookups.
509    cached_find_node: Option<CachedFindNode>,
510}
511
512impl Discv4Service {
513    /// Create a new instance for a bound [`UdpSocket`].
514    pub(crate) fn new(
515        socket: UdpSocket,
516        local_address: SocketAddr,
517        local_node_record: NodeRecord,
518        secret_key: SecretKey,
519        config: Discv4Config,
520    ) -> Self {
521        let socket = Arc::new(socket);
522        let (ingress_tx, ingress_rx) = mpsc::channel(config.udp_ingress_message_buffer);
523        let (egress_tx, egress_rx) = mpsc::channel(config.udp_egress_message_buffer);
524        let mut tasks = JoinSet::<()>::new();
525
526        let udp = Arc::clone(&socket);
527        tasks.spawn(receive_loop(udp, ingress_tx, local_node_record.id));
528
529        let udp = Arc::clone(&socket);
530        tasks.spawn(send_loop(udp, egress_rx));
531
532        let kbuckets = KBucketsTable::new(
533            NodeKey::from(&local_node_record).into(),
534            Duration::from_secs(60),
535            MAX_NODES_PER_BUCKET,
536            None,
537            None,
538        );
539
540        let self_lookup_interval = tokio::time::interval(config.lookup_interval);
541
542        // Wait `ping_interval` and then start pinging every `ping_interval` because we want to wait
543        // for
544        let ping_interval = tokio::time::interval_at(
545            tokio::time::Instant::now() + config.ping_interval,
546            config.ping_interval,
547        );
548
549        let evict_expired_requests_interval = tokio::time::interval_at(
550            tokio::time::Instant::now() + config.request_timeout,
551            config.request_timeout,
552        );
553
554        let lookup_rotator = if config.enable_dht_random_walk {
555            LookupTargetRotator::default()
556        } else {
557            LookupTargetRotator::local_only()
558        };
559
560        // for EIP-868 construct an ENR
561        let local_eip_868_enr = {
562            let mut builder = Enr::builder();
563            builder.ip(local_node_record.address);
564            if local_node_record.address.is_ipv4() {
565                builder.udp4(local_node_record.udp_port);
566                builder.tcp4(local_node_record.tcp_port);
567            } else {
568                builder.udp6(local_node_record.udp_port);
569                builder.tcp6(local_node_record.tcp_port);
570            }
571
572            for (key, val) in &config.additional_eip868_rlp_pairs {
573                builder.add_value_rlp(key, val.clone());
574            }
575            builder.build(&secret_key).expect("v4 is set")
576        };
577
578        let (to_service, commands_rx) = mpsc::unbounded_channel();
579
580        let shared_node_record = Arc::new(Mutex::new(local_node_record));
581
582        Self {
583            local_address,
584            local_eip_868_enr,
585            local_node_record,
586            shared_node_record,
587            _socket: socket,
588            kbuckets,
589            secret_key,
590            _tasks: tasks,
591            ingress: ingress_rx,
592            egress: egress_tx,
593            queued_pings: VecDeque::with_capacity(MAX_QUEUED_PINGS),
594            pending_pings: Default::default(),
595            pending_lookup: Default::default(),
596            pending_find_nodes: Default::default(),
597            pending_enr_requests: Default::default(),
598            commands_rx,
599            to_service,
600            update_listeners: Vec::with_capacity(1),
601            lookup_interval: self_lookup_interval,
602            ping_interval,
603            evict_expired_requests_interval,
604            lookup_rotator,
605            pending_lookup_reset: config.enable_lookup,
606            resolve_external_ip_interval: config.resolve_external_ip_interval(),
607            config,
608            queued_events: Default::default(),
609            received_pongs: Default::default(),
610            expire_interval: tokio::time::interval(EXPIRE_DURATION),
611            cached_find_node: None,
612        }
613    }
614
615    /// Returns the frontend handle that can communicate with the service via commands.
616    pub fn handle(&self) -> Discv4 {
617        Discv4 {
618            local_addr: self.local_address,
619            to_service: self.to_service.clone(),
620            node_record: self.shared_node_record.clone(),
621        }
622    }
623
624    /// Returns the current enr sequence of the local record.
625    fn enr_seq(&self) -> Option<u64> {
626        self.config.enable_eip868.then(|| self.local_eip_868_enr.seq())
627    }
628
629    /// Sets the [Interval] used for periodically looking up targets over the network
630    pub fn set_lookup_interval(&mut self, duration: Duration) {
631        self.lookup_interval = tokio::time::interval(duration);
632    }
633
634    /// Sets the external Ip to the configured external IP if [`NatResolver::ExternalIp`] or
635    /// [`NatResolver::ExternalAddr`]. In the case of [`NatResolver::ExternalAddr`], it will return
636    /// the first IP address found for the domain associated with the discv4 UDP port.
637    fn resolve_external_ip(&mut self) {
638        if let Some(r) = &self.resolve_external_ip_interval &&
639            let Some(external_ip) =
640                r.resolver().clone().as_external_ip(self.local_node_record.udp_port)
641        {
642            self.set_external_ip_addr(external_ip);
643        }
644    }
645
646    /// Sets the given ip address as the node's external IP in the node record announced in
647    /// discovery
648    pub fn set_external_ip_addr(&mut self, external_ip: IpAddr) {
649        if self.local_node_record.address != external_ip {
650            debug!(target: "discv4", ?external_ip, "Updating external ip");
651            self.local_node_record.address = external_ip;
652            let _ = self.local_eip_868_enr.set_ip(external_ip, &self.secret_key);
653            let mut lock = self.shared_node_record.lock();
654            *lock = self.local_node_record;
655            debug!(target: "discv4", enr=?self.local_eip_868_enr, "Updated local ENR");
656        }
657    }
658
659    /// Returns the [`PeerId`] that identifies this node
660    pub const fn local_peer_id(&self) -> &PeerId {
661        &self.local_node_record.id
662    }
663
664    /// Returns the address of the UDP socket
665    pub const fn local_addr(&self) -> SocketAddr {
666        self.local_address
667    }
668
669    /// Returns the ENR of this service.
670    ///
671    /// Note: this will include the external address if resolved.
672    pub const fn local_enr(&self) -> NodeRecord {
673        self.local_node_record
674    }
675
676    /// Returns mutable reference to ENR for testing.
677    #[cfg(test)]
678    pub const fn local_enr_mut(&mut self) -> &mut NodeRecord {
679        &mut self.local_node_record
680    }
681
682    /// Returns true if the given `PeerId` is currently in the bucket
683    pub fn contains_node(&self, id: PeerId) -> bool {
684        let key = kad_key(id);
685        self.kbuckets.get_index(&key).is_some()
686    }
687
688    /// Bootstraps the local node to join the DHT.
689    ///
690    /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
691    /// own ID in the DHT. This introduces the local node to the other nodes
692    /// in the DHT and populates its routing table with the closest proven neighbours.
693    ///
694    /// This inserts the configured bootnodes into the routing table and pings them. Once the
695    /// endpoint proof succeeds (pong received), a [`DiscoveryUpdate::Added`] event is emitted,
696    /// same as with [`Self::add_node`].
697    ///
698    /// **Note:** This is a noop if there are no bootnodes.
699    pub fn bootstrap(&mut self) {
700        for record in self.config.bootstrap_nodes.clone() {
701            debug!(target: "discv4", ?record, "pinging boot node");
702            let key = kad_key(record.id);
703            let entry = NodeEntry::new(record);
704
705            // insert the boot node in the table
706            match self.kbuckets.insert_or_update(
707                &key,
708                entry,
709                NodeStatus {
710                    state: ConnectionState::Disconnected,
711                    direction: ConnectionDirection::Outgoing,
712                },
713            ) {
714                InsertResult::Failed(_) => {}
715                _ => {
716                    self.try_ping(record, PingReason::InitialInsert);
717                }
718            }
719        }
720    }
721
722    /// Spawns this services onto a new task
723    ///
724    /// Note: requires a running tokio runtime
725    pub fn spawn(mut self) -> JoinHandle<()> {
726        tokio::task::spawn(async move {
727            self.bootstrap();
728
729            while let Some(event) = self.next().await {
730                trace!(target: "discv4", ?event, "processed");
731            }
732            trace!(target: "discv4", "service terminated");
733        })
734    }
735
736    /// Creates a new bounded channel for [`DiscoveryUpdate`]s.
737    pub fn update_stream(&mut self) -> ReceiverStream<DiscoveryUpdate> {
738        let (tx, rx) = mpsc::channel(512);
739        self.update_listeners.push(tx);
740        ReceiverStream::new(rx)
741    }
742
743    /// Looks up the local node in the DHT.
744    pub fn lookup_self(&mut self) {
745        self.lookup(self.local_node_record.id)
746    }
747
748    /// Looks up the given node in the DHT
749    ///
750    /// A `FindNode` packet requests information about nodes close to target. The target is a
751    /// 64-byte secp256k1 public key. When `FindNode` is received, the recipient should reply
752    /// with Neighbors packets containing the closest 16 nodes to target found in its local
753    /// table.
754    //
755    // To guard against traffic amplification attacks, Neighbors replies should only be sent if the
756    // sender of FindNode has been verified by the endpoint proof procedure.
757    pub fn lookup(&mut self, target: PeerId) {
758        self.lookup_with(target, None)
759    }
760
761    /// Starts the recursive lookup process for the given target, <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>.
762    ///
763    /// At first the `ALPHA` (==3, defined concurrency factor) nodes that are closest to the target
764    /// in the underlying DHT are selected to seed the lookup via `FindNode` requests. In the
765    /// recursive step, the initiator resends `FindNode` to nodes it has learned about from previous
766    /// queries.
767    ///
768    /// This takes an optional Sender through which all successfully discovered nodes are sent once
769    /// the request has finished.
770    fn lookup_with(&mut self, target: PeerId, tx: Option<NodeRecordSender>) {
771        trace!(target: "discv4", ?target, "Starting lookup");
772        let target_key = kad_key(target);
773
774        // Start a lookup context with the 16 (MAX_NODES_PER_BUCKET) closest nodes to which we have
775        // a valid endpoint proof
776        let ctx = LookupContext::new(
777            target_key.clone(),
778            self.kbuckets
779                .closest_values(&target_key)
780                .filter(|node| {
781                    node.value.has_endpoint_proof &&
782                        !self.pending_find_nodes.contains_key(&node.key.preimage().0)
783                })
784                .take(MAX_NODES_PER_BUCKET)
785                .map(|n| (target_key.distance(&n.key), n.value.record)),
786            tx,
787        );
788
789        // From those 16, pick the 3 closest to start the concurrent lookup.
790        let closest = ctx.closest(ALPHA);
791
792        if closest.is_empty() && self.pending_find_nodes.is_empty() {
793            // no closest nodes, and no lookup in progress: table is empty.
794            // This could happen if all records were deleted from the table due to missed pongs
795            // (e.g. connectivity problems over a long period of time, or issues during initial
796            // bootstrapping) so we attempt to bootstrap again
797            self.bootstrap();
798            return
799        }
800
801        trace!(target: "discv4", ?target, num = closest.len(), "Start lookup closest nodes");
802
803        for node in closest {
804            // here we still want to check against previous request failures and if necessary
805            // re-establish a new endpoint proof because it can be the case that the other node lost
806            // our entry and no longer has an endpoint proof on their end
807            self.find_node_checked(&node, ctx.clone());
808        }
809    }
810
811    /// Sends a new `FindNode` packet to the node with `target` as the lookup target.
812    ///
813    /// CAUTION: This expects there's a valid Endpoint proof to the given `node`.
814    fn find_node(&mut self, node: &NodeRecord, ctx: LookupContext) {
815        trace!(target: "discv4", ?node, lookup=?ctx.target(), "Sending FindNode");
816        ctx.mark_queried(node.id);
817        let (payload, hash) = self.find_node_packet(ctx.target());
818        let to = node.udp_addr();
819        trace!(target: "discv4", ?to, ?hash, "sending FindNode packet");
820        let _ = self.egress.try_send((payload, to)).map_err(|err| {
821            debug!(target: "discv4", %err, "dropped outgoing packet");
822        });
823        self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx));
824    }
825
826    /// Sends a new `FindNode` packet to the node with `target` as the lookup target but checks
827    /// whether we should send a new ping first to renew the endpoint proof by checking the
828    /// previously failed findNode requests. It could be that the node is no longer reachable or
829    /// lost our entry.
830    fn find_node_checked(&mut self, node: &NodeRecord, ctx: LookupContext) {
831        let max_failures = self.config.max_find_node_failures;
832        let needs_ping = self
833            .on_entry(node.id, |entry| entry.exceeds_find_node_failures(max_failures))
834            .unwrap_or(true);
835        if needs_ping {
836            self.try_ping(*node, PingReason::Lookup(*node, ctx))
837        } else {
838            self.find_node(node, ctx)
839        }
840    }
841
842    /// Notifies all listeners.
843    ///
844    /// Removes all listeners that are closed.
845    fn notify(&mut self, update: DiscoveryUpdate) {
846        self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) {
847            Ok(()) => true,
848            Err(err) => match err {
849                TrySendError::Full(_) => true,
850                TrySendError::Closed(_) => false,
851            },
852        });
853    }
854
855    /// Adds the ip to the ban list indefinitely
856    pub fn ban_ip(&mut self, ip: IpAddr) {
857        self.config.ban_list.ban_ip(ip);
858    }
859
860    /// Adds the peer to the ban list indefinitely.
861    pub fn ban_node(&mut self, node_id: PeerId) {
862        self.remove_node(node_id);
863        self.config.ban_list.ban_peer(node_id);
864    }
865
866    /// Adds the ip to the ban list until the given timestamp.
867    pub fn ban_ip_until(&mut self, ip: IpAddr, until: Instant) {
868        self.config.ban_list.ban_ip_until(ip, until);
869    }
870
871    /// Adds the peer to the ban list and bans it until the given timestamp
872    pub fn ban_node_until(&mut self, node_id: PeerId, until: Instant) {
873        self.remove_node(node_id);
874        self.config.ban_list.ban_peer_until(node_id, until);
875    }
876
877    /// Removes a `node_id` from the routing table.
878    ///
879    /// This allows applications, for whatever reason, to remove nodes from the local routing
880    /// table. Returns `true` if the node was in the table and `false` otherwise.
881    pub fn remove_node(&mut self, node_id: PeerId) -> bool {
882        let key = kad_key(node_id);
883        self.remove_key(node_id, key)
884    }
885
886    /// Removes a `node_id` from the routing table but only if there are enough other nodes in the
887    /// bucket (bucket must be at least half full)
888    ///
889    /// Returns `true` if the node was removed
890    pub fn soft_remove_node(&mut self, node_id: PeerId) -> bool {
891        let key = kad_key(node_id);
892        let Some(bucket) = self.kbuckets.get_bucket(&key) else { return false };
893        if bucket.num_entries() < MAX_NODES_PER_BUCKET / 2 {
894            // skip half empty bucket
895            return false
896        }
897        self.remove_key(node_id, key)
898    }
899
900    fn remove_key(&mut self, node_id: PeerId, key: discv5::Key<NodeKey>) -> bool {
901        let removed = self.kbuckets.remove(&key);
902        if removed {
903            trace!(target: "discv4", ?node_id, "removed node");
904            self.notify(DiscoveryUpdate::Removed(node_id));
905        }
906        removed
907    }
908
909    /// Gets the number of entries that are considered connected.
910    pub fn num_connected(&self) -> usize {
911        self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected())
912    }
913
914    /// Check if the peer has an active bond.
915    fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool {
916        if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) &&
917            timestamp.elapsed() < self.config.bond_expiration
918        {
919            return true
920        }
921        false
922    }
923
924    /// Applies a closure on the pending or present [`NodeEntry`].
925    fn on_entry<F, R>(&mut self, peer_id: PeerId, f: F) -> Option<R>
926    where
927        F: FnOnce(&NodeEntry) -> R,
928    {
929        let key = kad_key(peer_id);
930        match self.kbuckets.entry(&key) {
931            BucketEntry::Present(entry, _) => Some(f(entry.value())),
932            BucketEntry::Pending(mut entry, _) => Some(f(entry.value())),
933            _ => None,
934        }
935    }
936
937    /// Update the entry on RE-ping.
938    ///
939    /// Invoked when we received the Pong to our [`PingReason::RePing`] ping.
940    ///
941    /// On re-ping we check for a changed `enr_seq` if eip868 is enabled and when it changed we sent
942    /// a followup request to retrieve the updated ENR
943    fn update_on_reping(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
944        if record.id == self.local_node_record.id {
945            return
946        }
947
948        // If EIP868 extension is disabled then we want to ignore this
949        if !self.config.enable_eip868 {
950            last_enr_seq = None;
951        }
952
953        let key = kad_key(record.id);
954        let old_enr = match self.kbuckets.entry(&key) {
955            kbucket::Entry::Present(mut entry, _) => {
956                entry.value_mut().update_with_enr(last_enr_seq)
957            }
958            kbucket::Entry::Pending(mut entry, _) => entry.value().update_with_enr(last_enr_seq),
959            _ => return,
960        };
961
962        // Check if ENR was updated
963        match (last_enr_seq, old_enr) {
964            (Some(new), Some(old)) if new > old => {
965                self.send_enr_request(record);
966            }
967            (Some(_), None) => {
968                // got an ENR
969                self.send_enr_request(record);
970            }
971            _ => {}
972        };
973    }
974
975    /// Callback invoked when we receive a pong from the peer.
976    fn update_on_pong(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
977        if record.id == *self.local_peer_id() {
978            return
979        }
980
981        // If EIP868 extension is disabled then we want to ignore this
982        if !self.config.enable_eip868 {
983            last_enr_seq = None;
984        }
985
986        // if the peer included a enr seq in the pong then we can try to request the ENR of that
987        // node
988        let has_enr_seq = last_enr_seq.is_some();
989
990        let key = kad_key(record.id);
991        match self.kbuckets.entry(&key) {
992            kbucket::Entry::Present(mut entry, old_status) => {
993                // endpoint is now proven
994                entry.value_mut().establish_proof();
995                entry.value_mut().update_with_enr(last_enr_seq);
996
997                if !old_status.is_connected() {
998                    let _ = entry.update(ConnectionState::Connected, Some(old_status.direction));
999                    trace!(target: "discv4", ?record, "added after successful endpoint proof");
1000                    self.notify(DiscoveryUpdate::Added(record));
1001
1002                    if has_enr_seq {
1003                        // request the ENR of the node
1004                        self.send_enr_request(record);
1005                    }
1006                }
1007            }
1008            kbucket::Entry::Pending(mut entry, mut status) => {
1009                // endpoint is now proven
1010                entry.value().establish_proof();
1011                entry.value().update_with_enr(last_enr_seq);
1012
1013                if !status.is_connected() {
1014                    status.state = ConnectionState::Connected;
1015                    let _ = entry.update(status);
1016                    trace!(target: "discv4", ?record, "added after successful endpoint proof");
1017                    self.notify(DiscoveryUpdate::Added(record));
1018
1019                    if has_enr_seq {
1020                        // request the ENR of the node
1021                        self.send_enr_request(record);
1022                    }
1023                }
1024            }
1025            _ => {}
1026        };
1027    }
1028
1029    /// Adds all nodes
1030    ///
1031    /// See [`Self::add_node`]
1032    pub fn add_all_nodes(&mut self, records: impl IntoIterator<Item = NodeRecord>) {
1033        for record in records {
1034            self.add_node(record);
1035        }
1036    }
1037
1038    /// If the node's not in the table yet, this will add it to the table and start the endpoint
1039    /// proof by sending a ping to the node.
1040    ///
1041    /// Returns `true` if the record was added successfully, and `false` if the node is either
1042    /// already in the table or the record's bucket is full.
1043    pub fn add_node(&mut self, record: NodeRecord) -> bool {
1044        let key = kad_key(record.id);
1045        match self.kbuckets.entry(&key) {
1046            kbucket::Entry::Absent(entry) => {
1047                let node = NodeEntry::new(record);
1048                match entry.insert(
1049                    node,
1050                    NodeStatus {
1051                        direction: ConnectionDirection::Outgoing,
1052                        state: ConnectionState::Disconnected,
1053                    },
1054                ) {
1055                    BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1056                        trace!(target: "discv4", ?record, "inserted new record");
1057                    }
1058                    _ => return false,
1059                }
1060            }
1061            _ => return false,
1062        }
1063
1064        // send the initial ping to the _new_ node
1065        self.try_ping(record, PingReason::InitialInsert);
1066        true
1067    }
1068
1069    /// Encodes the packet, sends it and returns the hash.
1070    pub(crate) fn send_packet(&self, msg: Message, to: SocketAddr) -> B256 {
1071        let (payload, hash) = msg.encode(&self.secret_key);
1072        trace!(target: "discv4", r#type=?msg.msg_type(), ?to, ?hash, "sending packet");
1073        let _ = self.egress.try_send((payload, to)).map_err(|err| {
1074            debug!(
1075                target: "discv4",
1076                %err,
1077                "dropped outgoing packet",
1078            );
1079        });
1080        hash
1081    }
1082
1083    /// Returns a signed `FindNode` packet for `target`, reusing a cached payload when possible.
1084    fn find_node_packet(&mut self, target: PeerId) -> (Bytes, B256) {
1085        let expire = self.find_node_expiration();
1086        let cache_ttl = self.config.request_timeout / 4;
1087        CachedFindNode::get_or_sign(
1088            &mut self.cached_find_node,
1089            target,
1090            cache_ttl,
1091            &self.secret_key,
1092            expire,
1093        )
1094    }
1095
1096    /// Message handler for an incoming `Ping`
1097    fn on_ping(&mut self, ping: Ping, remote_addr: SocketAddr, remote_id: PeerId, hash: B256) {
1098        if self.is_expired(ping.expire) {
1099            // ping's expiration timestamp is in the past
1100            return
1101        }
1102
1103        // create the record
1104        let record = NodeRecord {
1105            address: remote_addr.ip(),
1106            udp_port: remote_addr.port(),
1107            tcp_port: ping.from.tcp_port,
1108            id: remote_id,
1109        }
1110        .into_ipv4_mapped();
1111
1112        let key = kad_key(record.id);
1113
1114        // See also <https://github.com/ethereum/devp2p/blob/master/discv4.md#ping-packet-0x01>:
1115        // > If no communication with the sender of this ping has occurred within the last 12h, a
1116        // > ping should be sent in addition to pong in order to receive an endpoint proof.
1117        //
1118        // Note: we only mark if the node is absent because the `last 12h` condition is handled by
1119        // the ping interval
1120        let mut is_new_insert = false;
1121        let mut needs_bond = false;
1122        let mut is_proven = false;
1123
1124        let old_enr = match self.kbuckets.entry(&key) {
1125            kbucket::Entry::Present(mut entry, _) => {
1126                if entry.value().is_expired() {
1127                    // If no communication with the sender has occurred within the last 12h, a ping
1128                    // should be sent in addition to pong in order to receive an endpoint proof.
1129                    needs_bond = true;
1130                } else {
1131                    is_proven = entry.value().has_endpoint_proof;
1132                }
1133                entry.value_mut().update_with_enr(ping.enr_sq)
1134            }
1135            kbucket::Entry::Pending(mut entry, _) => {
1136                if entry.value().is_expired() {
1137                    // If no communication with the sender has occurred within the last 12h, a ping
1138                    // should be sent in addition to pong in order to receive an endpoint proof.
1139                    needs_bond = true;
1140                } else {
1141                    is_proven = entry.value().has_endpoint_proof;
1142                }
1143                entry.value().update_with_enr(ping.enr_sq)
1144            }
1145            kbucket::Entry::Absent(entry) => {
1146                let mut node = NodeEntry::new(record);
1147                node.last_enr_seq = ping.enr_sq;
1148
1149                match entry.insert(
1150                    node,
1151                    NodeStatus {
1152                        direction: ConnectionDirection::Incoming,
1153                        // mark as disconnected until endpoint proof established on pong
1154                        state: ConnectionState::Disconnected,
1155                    },
1156                ) {
1157                    BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1158                        // mark as new insert if insert was successful
1159                        is_new_insert = true;
1160                    }
1161                    BucketInsertResult::Full => {
1162                        // we received a ping but the corresponding bucket for the peer is already
1163                        // full, we can't add any additional peers to that bucket, but we still want
1164                        // to emit an event that we discovered the node
1165                        trace!(target: "discv4", ?record, "discovered new record but bucket is full");
1166                        self.notify(DiscoveryUpdate::DiscoveredAtCapacity(record));
1167                        needs_bond = true;
1168                    }
1169                    BucketInsertResult::TooManyIncoming | BucketInsertResult::NodeExists => {
1170                        needs_bond = true;
1171                        // insert unsuccessful but we still want to send the pong
1172                    }
1173                    BucketInsertResult::FailedFilter => return,
1174                }
1175
1176                None
1177            }
1178            kbucket::Entry::SelfEntry => return,
1179        };
1180
1181        // send the pong first, but the PONG and optionally PING don't need to be send in a
1182        // particular order
1183        let pong = Message::Pong(Pong {
1184            // we use the actual address of the peer
1185            to: record.into(),
1186            echo: hash,
1187            expire: ping.expire,
1188            enr_sq: self.enr_seq(),
1189        });
1190        self.send_packet(pong, remote_addr);
1191
1192        // if node was absent also send a ping to establish the endpoint proof from our end
1193        if is_new_insert {
1194            self.try_ping(record, PingReason::InitialInsert);
1195        } else if needs_bond {
1196            self.try_ping(record, PingReason::EstablishBond);
1197        } else if is_proven {
1198            // if node has been proven, this means we've received a pong and verified its endpoint
1199            // proof. We've also sent a pong above to verify our endpoint proof, so we can now
1200            // send our find_nodes request if PingReason::Lookup
1201            if let Some((_, ctx)) = self.pending_lookup.remove(&record.id) {
1202                if self.pending_find_nodes.contains_key(&record.id) {
1203                    // there's already another pending request, unmark it so the next round can
1204                    // try to send it
1205                    ctx.unmark_queried(record.id);
1206                } else {
1207                    // we just received a ping from that peer so we can send a find node request
1208                    // directly
1209                    self.find_node(&record, ctx);
1210                }
1211            }
1212        } else {
1213            // Request ENR if included in the ping
1214            match (ping.enr_sq, old_enr) {
1215                (Some(new), Some(old)) if new > old => {
1216                    self.send_enr_request(record);
1217                }
1218                (Some(_), None) => {
1219                    self.send_enr_request(record);
1220                }
1221                _ => {}
1222            };
1223        }
1224    }
1225
1226    // Guarding function for [`Self::send_ping`] that applies pre-checks
1227    fn try_ping(&mut self, node: NodeRecord, reason: PingReason) {
1228        if node.id == *self.local_peer_id() {
1229            // don't ping ourselves
1230            return
1231        }
1232
1233        if self.pending_pings.contains_key(&node.id) ||
1234            self.pending_find_nodes.contains_key(&node.id)
1235        {
1236            return
1237        }
1238
1239        if self.queued_pings.iter().any(|(n, _)| n.id == node.id) {
1240            return
1241        }
1242
1243        if self.pending_pings.len() < MAX_NODES_PING {
1244            self.send_ping(node, reason);
1245        } else if self.queued_pings.len() < MAX_QUEUED_PINGS {
1246            self.queued_pings.push_back((node, reason));
1247        }
1248    }
1249
1250    /// Sends a ping message to the node's UDP address.
1251    ///
1252    /// Returns the echo hash of the ping message.
1253    pub(crate) fn send_ping(&mut self, node: NodeRecord, reason: PingReason) -> B256 {
1254        let remote_addr = node.udp_addr();
1255        let id = node.id;
1256        let ping = Ping {
1257            from: self.local_node_record.into(),
1258            to: node.into(),
1259            expire: self.ping_expiration(),
1260            enr_sq: self.enr_seq(),
1261        };
1262        trace!(target: "discv4", ?ping, "sending ping");
1263        let echo_hash = self.send_packet(Message::Ping(ping), remote_addr);
1264
1265        self.pending_pings
1266            .insert(id, PingRequest { sent_at: Instant::now(), node, echo_hash, reason });
1267        echo_hash
1268    }
1269
1270    /// Sends an enr request message to the node's UDP address.
1271    ///
1272    /// Returns the echo hash of the ping message.
1273    pub(crate) fn send_enr_request(&mut self, node: NodeRecord) {
1274        if !self.config.enable_eip868 {
1275            return
1276        }
1277        let remote_addr = node.udp_addr();
1278        let enr_request = EnrRequest { expire: self.enr_request_expiration() };
1279
1280        trace!(target: "discv4", ?enr_request, "sending enr request");
1281        let echo_hash = self.send_packet(Message::EnrRequest(enr_request), remote_addr);
1282
1283        self.pending_enr_requests
1284            .insert(node.id, EnrRequestState { sent_at: Instant::now(), echo_hash });
1285    }
1286
1287    /// Message handler for an incoming `Pong`.
1288    fn on_pong(&mut self, pong: Pong, remote_addr: SocketAddr, remote_id: PeerId) {
1289        if self.is_expired(pong.expire) {
1290            return
1291        }
1292
1293        let PingRequest { node, reason, .. } = match self.pending_pings.entry(remote_id) {
1294            Entry::Occupied(entry) => {
1295                {
1296                    let request = entry.get();
1297                    if request.echo_hash != pong.echo {
1298                        trace!(target: "discv4", from=?remote_addr, expected=?request.echo_hash, echo_hash=?pong.echo,"Got unexpected Pong");
1299                        return
1300                    }
1301                }
1302                entry.remove()
1303            }
1304            Entry::Vacant(_) => return,
1305        };
1306
1307        // keep track of the pong
1308        self.received_pongs.on_pong(remote_id, remote_addr.ip());
1309
1310        match reason {
1311            PingReason::InitialInsert => {
1312                self.update_on_pong(node, pong.enr_sq);
1313                // Reset the lookup interval so the next poll_tick fires immediately,
1314                // rather than waiting the full ~20s for the first lookup.
1315                if self.pending_lookup_reset && self.config.bootstrap_nodes.contains(&node) {
1316                    self.pending_lookup_reset = false;
1317                    self.lookup_interval.reset();
1318                }
1319            }
1320            PingReason::EstablishBond => {
1321                // no initial lookup needed here since the node was already in the table.
1322                self.update_on_pong(node, pong.enr_sq);
1323            }
1324            PingReason::RePing => {
1325                self.update_on_reping(node, pong.enr_sq);
1326            }
1327            PingReason::Lookup(node, ctx) => {
1328                self.update_on_pong(node, pong.enr_sq);
1329                // insert node and assoc. lookup_context into the pending_lookup table to complete
1330                // our side of the endpoint proof verification.
1331                // Start the lookup timer here - and evict accordingly. Note that this is a separate
1332                // timer than the ping_request timer.
1333                self.pending_lookup.insert(node.id, (Instant::now(), ctx));
1334            }
1335        }
1336    }
1337
1338    /// Handler for an incoming `FindNode` message
1339    fn on_find_node(&mut self, msg: FindNode, remote_addr: SocketAddr, node_id: PeerId) {
1340        if self.is_expired(msg.expire) {
1341            // expiration timestamp is in the past
1342            return
1343        }
1344        if node_id == *self.local_peer_id() {
1345            // ignore find node requests to ourselves
1346            return
1347        }
1348
1349        if self.has_bond(node_id, remote_addr.ip()) {
1350            self.respond_closest(msg.id, remote_addr)
1351        }
1352    }
1353
1354    /// Handler for incoming `EnrResponse` message
1355    fn on_enr_response(&mut self, msg: EnrResponse, remote_addr: SocketAddr, id: PeerId) {
1356        trace!(target: "discv4", ?remote_addr, ?msg, "received ENR response");
1357        if let Some(resp) = self.pending_enr_requests.remove(&id) {
1358            // ensure the ENR's public key matches the expected node id
1359            let enr_id = pk2id(&msg.enr.public_key());
1360            if id != enr_id {
1361                return
1362            }
1363
1364            if resp.echo_hash == msg.request_hash {
1365                let key = kad_key(id);
1366                let fork_id = msg.eth_fork_id();
1367                let (record, old_fork_id) = match self.kbuckets.entry(&key) {
1368                    kbucket::Entry::Present(mut entry, _) => {
1369                        let id = entry.value_mut().update_with_fork_id(fork_id);
1370                        (entry.value().record, id)
1371                    }
1372                    kbucket::Entry::Pending(mut entry, _) => {
1373                        let id = entry.value().update_with_fork_id(fork_id);
1374                        (entry.value().record, id)
1375                    }
1376                    _ => return,
1377                };
1378                match (fork_id, old_fork_id) {
1379                    (Some(new), Some(old)) if new != old => {
1380                        self.notify(DiscoveryUpdate::EnrForkId(record, new))
1381                    }
1382                    (Some(new), None) => self.notify(DiscoveryUpdate::EnrForkId(record, new)),
1383                    _ => {}
1384                }
1385            }
1386        }
1387    }
1388
1389    /// Handler for incoming `EnrRequest` message
1390    fn on_enr_request(
1391        &self,
1392        msg: EnrRequest,
1393        remote_addr: SocketAddr,
1394        id: PeerId,
1395        request_hash: B256,
1396    ) {
1397        if !self.config.enable_eip868 || self.is_expired(msg.expire) {
1398            return
1399        }
1400
1401        if self.has_bond(id, remote_addr.ip()) {
1402            self.send_packet(
1403                Message::EnrResponse(EnrResponse {
1404                    request_hash,
1405                    enr: self.local_eip_868_enr.clone(),
1406                }),
1407                remote_addr,
1408            );
1409        }
1410    }
1411
1412    /// Handler for incoming `Neighbours` messages that are handled if they're responses to
1413    /// `FindNode` requests.
1414    fn on_neighbours(&mut self, msg: Neighbours, remote_addr: SocketAddr, node_id: PeerId) {
1415        if self.is_expired(msg.expire) {
1416            // response is expired
1417            return
1418        }
1419        // check if this request was expected
1420        let ctx = match self.pending_find_nodes.entry(node_id) {
1421            Entry::Occupied(mut entry) => {
1422                {
1423                    let request = entry.get_mut();
1424                    // Mark the request as answered
1425                    request.answered = true;
1426                    let total = request.response_count + msg.nodes.len();
1427
1428                    // Neighbours response is exactly 1 bucket (16 entries).
1429                    if total <= MAX_NODES_PER_BUCKET {
1430                        request.response_count = total;
1431                    } else {
1432                        trace!(target: "discv4", total, from=?remote_addr, "Received neighbors packet entries exceeds max nodes per bucket");
1433                        return
1434                    }
1435                };
1436
1437                if entry.get().response_count == MAX_NODES_PER_BUCKET {
1438                    // node responding with a full bucket of records
1439                    let ctx = entry.remove().lookup_context;
1440                    ctx.mark_responded(node_id);
1441                    ctx
1442                } else {
1443                    entry.get().lookup_context.clone()
1444                }
1445            }
1446            Entry::Vacant(_) => {
1447                // received neighbours response without requesting it
1448                trace!(target: "discv4", from=?remote_addr, "Received unsolicited Neighbours");
1449                return
1450            }
1451        };
1452
1453        // log the peers we discovered
1454        trace!(target: "discv4",
1455            target=format!("{:#?}", node_id),
1456            peers_count=msg.nodes.len(),
1457            peers=format!("[{:#}]", msg.nodes.iter()
1458                .map(|node_rec| node_rec.id
1459            ).format(", ")),
1460            "Received peers from Neighbours packet"
1461        );
1462
1463        // This is the recursive lookup step where we initiate new FindNode requests for new nodes
1464        // that were discovered.
1465        for node in msg.nodes.into_iter().map(NodeRecord::into_ipv4_mapped) {
1466            // prevent banned peers from being added to the context
1467            if self.config.ban_list.is_banned(&node.id, &node.address) {
1468                trace!(target: "discv4", peer_id=?node.id, ip=?node.address, "ignoring banned record");
1469                continue
1470            }
1471
1472            ctx.add_node(node);
1473        }
1474
1475        // get the next closest nodes, not yet queried nodes and start over.
1476        let closest =
1477            ctx.filter_closest(ALPHA, |node| !self.pending_find_nodes.contains_key(&node.id));
1478
1479        for closest in closest {
1480            let key = kad_key(closest.id);
1481            match self.kbuckets.entry(&key) {
1482                BucketEntry::Absent(entry) => {
1483                    // the node's endpoint is not proven yet, so we need to ping it first, on
1484                    // success, we will add the node to the pending_lookup table, and wait to send
1485                    // back a Pong before initiating a FindNode request.
1486                    // In order to prevent that this node is selected again on subsequent responses,
1487                    // while the ping is still active, we always mark it as queried.
1488                    ctx.mark_queried(closest.id);
1489                    let node = NodeEntry::new(closest);
1490                    match entry.insert(
1491                        node,
1492                        NodeStatus {
1493                            direction: ConnectionDirection::Outgoing,
1494                            state: ConnectionState::Disconnected,
1495                        },
1496                    ) {
1497                        BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1498                            // only ping if the node was added to the table
1499                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1500                        }
1501                        BucketInsertResult::Full => {
1502                            // new node but the node's bucket is already full
1503                            self.notify(DiscoveryUpdate::DiscoveredAtCapacity(closest))
1504                        }
1505                        _ => {}
1506                    }
1507                }
1508                BucketEntry::SelfEntry => {
1509                    // we received our own node entry
1510                }
1511                BucketEntry::Present(entry, _) => {
1512                    if entry.value().has_endpoint_proof {
1513                        if entry
1514                            .value()
1515                            .exceeds_find_node_failures(self.config.max_find_node_failures)
1516                        {
1517                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1518                        } else {
1519                            self.find_node(&closest, ctx.clone());
1520                        }
1521                    }
1522                }
1523                BucketEntry::Pending(mut entry, _) => {
1524                    if entry.value().has_endpoint_proof {
1525                        if entry
1526                            .value()
1527                            .exceeds_find_node_failures(self.config.max_find_node_failures)
1528                        {
1529                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1530                        } else {
1531                            self.find_node(&closest, ctx.clone());
1532                        }
1533                    }
1534                }
1535            }
1536        }
1537    }
1538
1539    /// Sends a Neighbours packet for `target` to the given addr
1540    fn respond_closest(&mut self, target: PeerId, to: SocketAddr) {
1541        let key = kad_key(target);
1542        let expire = self.send_neighbours_expiration();
1543
1544        // get the MAX_NODES_PER_BUCKET closest nodes to the target
1545        let closest_nodes =
1546            self.kbuckets.closest_values(&key).take(MAX_NODES_PER_BUCKET).collect::<Vec<_>>();
1547
1548        for nodes in closest_nodes.chunks(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) {
1549            let nodes = nodes.iter().map(|node| node.value.record).collect::<Vec<NodeRecord>>();
1550            trace!(target: "discv4", len = nodes.len(), to=?to,"Sent neighbours packet");
1551            let msg = Message::Neighbours(Neighbours { nodes, expire });
1552            self.send_packet(msg, to);
1553        }
1554    }
1555
1556    fn evict_expired_requests(&mut self, now: Instant) {
1557        self.pending_enr_requests.retain(|_node_id, enr_request| {
1558            now.duration_since(enr_request.sent_at) < self.config.enr_expiration
1559        });
1560
1561        let mut failed_pings = Vec::new();
1562        self.pending_pings.retain(|node_id, ping_request| {
1563            if now.duration_since(ping_request.sent_at) > self.config.ping_expiration {
1564                failed_pings.push(*node_id);
1565                return false
1566            }
1567            true
1568        });
1569
1570        if !failed_pings.is_empty() {
1571            // remove nodes that failed to pong
1572            trace!(target: "discv4", num=%failed_pings.len(), "evicting nodes due to failed pong");
1573            for node_id in failed_pings {
1574                self.remove_node(node_id);
1575            }
1576        }
1577
1578        let mut failed_lookups = Vec::new();
1579        self.pending_lookup.retain(|node_id, (lookup_sent_at, _)| {
1580            if now.duration_since(*lookup_sent_at) > self.config.request_timeout {
1581                failed_lookups.push(*node_id);
1582                return false
1583            }
1584            true
1585        });
1586
1587        if !failed_lookups.is_empty() {
1588            // remove nodes that failed the e2e lookup process, so we can restart it
1589            trace!(target: "discv4", num=%failed_lookups.len(), "evicting nodes due to failed lookup");
1590            for node_id in failed_lookups {
1591                self.remove_node(node_id);
1592            }
1593        }
1594
1595        self.evict_failed_find_nodes(now);
1596    }
1597
1598    /// Handles failed responses to `FindNode`
1599    fn evict_failed_find_nodes(&mut self, now: Instant) {
1600        let mut failed_find_nodes = Vec::new();
1601        self.pending_find_nodes.retain(|node_id, find_node_request| {
1602            if now.duration_since(find_node_request.sent_at) > self.config.neighbours_expiration {
1603                if !find_node_request.answered {
1604                    // node actually responded but with fewer entries than expected, but we don't
1605                    // treat this as an hard error since it responded.
1606                    failed_find_nodes.push(*node_id);
1607                }
1608                return false
1609            }
1610            true
1611        });
1612
1613        if failed_find_nodes.is_empty() {
1614            return
1615        }
1616
1617        trace!(target: "discv4", num=%failed_find_nodes.len(), "processing failed find nodes");
1618
1619        for node_id in failed_find_nodes {
1620            let key = kad_key(node_id);
1621            let failures = match self.kbuckets.entry(&key) {
1622                kbucket::Entry::Present(mut entry, _) => {
1623                    entry.value_mut().inc_failed_request();
1624                    entry.value().find_node_failures
1625                }
1626                kbucket::Entry::Pending(mut entry, _) => {
1627                    entry.value().inc_failed_request();
1628                    entry.value().find_node_failures
1629                }
1630                _ => continue,
1631            };
1632
1633            // if the node failed to respond anything useful multiple times, remove the node from
1634            // the table, but only if there are enough other nodes in the bucket (bucket must be at
1635            // least half full)
1636            if failures > self.config.max_find_node_failures {
1637                self.soft_remove_node(node_id);
1638            }
1639        }
1640    }
1641
1642    /// Re-pings all nodes which endpoint proofs are considered expired: [`NodeEntry::is_expired`]
1643    ///
1644    /// This will send a `Ping` to the nodes, if a node fails to respond with a `Pong` to renew the
1645    /// endpoint proof it will be removed from the table.
1646    fn re_ping_oldest(&mut self) {
1647        let mut nodes = self
1648            .kbuckets
1649            .iter_ref()
1650            .filter(|entry| entry.node.value.is_expired())
1651            .map(|n| n.node.value)
1652            .collect::<Vec<_>>();
1653        nodes.sort_by_key(|a| a.last_seen);
1654        let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::<Vec<_>>();
1655        for node in to_ping {
1656            self.try_ping(node, PingReason::RePing)
1657        }
1658    }
1659
1660    /// Returns true if the expiration timestamp is in the past.
1661    fn is_expired(&self, expiration: u64) -> bool {
1662        self.ensure_not_expired(expiration).is_err()
1663    }
1664
1665    /// Validate that given timestamp is not expired.
1666    ///
1667    /// Note: this accepts the timestamp as u64 because this is used by the wire protocol, but the
1668    /// UNIX timestamp (number of non-leap seconds since January 1, 1970 0:00:00 UTC) is supposed to
1669    /// be an i64.
1670    ///
1671    /// Returns an error if:
1672    ///  - invalid UNIX timestamp (larger than `i64::MAX`)
1673    ///  - timestamp is expired (lower than current local UNIX timestamp)
1674    fn ensure_not_expired(&self, timestamp: u64) -> Result<(), ()> {
1675        // ensure the timestamp is a valid UNIX timestamp
1676        let _ = i64::try_from(timestamp).map_err(drop)?;
1677
1678        let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
1679        if self.config.enforce_expiration_timestamps && timestamp < now {
1680            trace!(target: "discv4", "Expired packet");
1681            return Err(())
1682        }
1683        Ok(())
1684    }
1685
1686    /// Pops buffered ping requests and sends them.
1687    fn ping_buffered(&mut self) {
1688        while self.pending_pings.len() < MAX_NODES_PING {
1689            match self.queued_pings.pop_front() {
1690                Some((next, reason)) => self.try_ping(next, reason),
1691                None => break,
1692            }
1693        }
1694    }
1695
1696    fn ping_expiration(&self) -> u64 {
1697        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.ping_expiration)
1698            .as_secs()
1699    }
1700
1701    fn find_node_expiration(&self) -> u64 {
1702        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.request_timeout)
1703            .as_secs()
1704    }
1705
1706    fn enr_request_expiration(&self) -> u64 {
1707        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.enr_expiration)
1708            .as_secs()
1709    }
1710
1711    fn send_neighbours_expiration(&self) -> u64 {
1712        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.neighbours_expiration)
1713            .as_secs()
1714    }
1715
1716    /// Polls the socket and advances the state.
1717    ///
1718    /// To prevent traffic amplification attacks, implementations must verify that the sender of a
1719    /// query participates in the discovery protocol. The sender of a packet is considered verified
1720    /// if it has sent a valid Pong response with matching ping hash within the last 12 hours.
1721    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Discv4Event> {
1722        loop {
1723            // drain buffered events first
1724            if let Some(event) = self.queued_events.pop_front() {
1725                return Poll::Ready(event)
1726            }
1727
1728            // trigger self lookup
1729            if self.config.enable_lookup {
1730                while self.lookup_interval.poll_tick(cx).is_ready() {
1731                    let target = self.lookup_rotator.next(&self.local_node_record.id);
1732                    self.lookup_with(target, None);
1733                }
1734            }
1735
1736            // re-ping some peers
1737            while self.ping_interval.poll_tick(cx).is_ready() {
1738                self.re_ping_oldest();
1739            }
1740
1741            if let Some(Poll::Ready(Some(ip))) =
1742                self.resolve_external_ip_interval.as_mut().map(|r| r.poll_tick(cx))
1743            {
1744                self.set_external_ip_addr(ip);
1745            }
1746
1747            // drain all incoming `Discv4` commands, this channel can never close
1748            while let Poll::Ready(Some(cmd)) = self.commands_rx.poll_recv(cx) {
1749                match cmd {
1750                    Discv4Command::Add(enr) => {
1751                        self.add_node(enr);
1752                    }
1753                    Discv4Command::Lookup { node_id, tx } => {
1754                        let node_id = node_id.unwrap_or(self.local_node_record.id);
1755                        self.lookup_with(node_id, tx);
1756                    }
1757                    Discv4Command::SetLookupInterval(duration) => {
1758                        self.set_lookup_interval(duration);
1759                    }
1760                    Discv4Command::Updates(tx) => {
1761                        let rx = self.update_stream();
1762                        let _ = tx.send(rx);
1763                    }
1764                    Discv4Command::BanPeer(node_id) => self.ban_node(node_id),
1765                    Discv4Command::Remove(node_id) => {
1766                        self.remove_node(node_id);
1767                    }
1768                    Discv4Command::Ban(node_id, ip) => {
1769                        self.ban_node(node_id);
1770                        self.ban_ip(ip);
1771                    }
1772                    Discv4Command::BanIp(ip) => {
1773                        self.ban_ip(ip);
1774                    }
1775                    Discv4Command::SetEIP868RLPPair { key, rlp } => {
1776                        debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair");
1777
1778                        let _ = self.local_eip_868_enr.insert_raw_rlp(key, rlp, &self.secret_key);
1779                    }
1780                    Discv4Command::SetTcpPort(port) => {
1781                        debug!(target: "discv4", %port, "Update tcp port");
1782                        self.local_node_record.tcp_port = port;
1783                        if self.local_node_record.address.is_ipv4() {
1784                            let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key);
1785                        } else {
1786                            let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key);
1787                        }
1788                    }
1789
1790                    Discv4Command::Terminated => {
1791                        // terminate the service
1792                        self.queued_events.push_back(Discv4Event::Terminated);
1793                    }
1794                }
1795            }
1796
1797            // restricts how many messages we process in a single poll before yielding back control
1798            let mut udp_message_budget = UDP_MESSAGE_POLL_LOOP_BUDGET;
1799
1800            // process all incoming datagrams
1801            while let Poll::Ready(Some(event)) = self.ingress.poll_recv(cx) {
1802                match event {
1803                    IngressEvent::RecvError(err) => {
1804                        debug!(target: "discv4", %err, "failed to read datagram");
1805                    }
1806                    IngressEvent::BadPacket(from, err, data) => {
1807                        trace!(target: "discv4", ?from, %err, packet=?hex::encode(&data), "bad packet");
1808                    }
1809                    IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => {
1810                        trace!(target: "discv4", r#type=?msg.msg_type(), from=?remote_addr,"received packet");
1811                        let event = match msg {
1812                            Message::Ping(ping) => {
1813                                self.on_ping(ping, remote_addr, node_id, hash);
1814                                Discv4Event::Ping
1815                            }
1816                            Message::Pong(pong) => {
1817                                self.on_pong(pong, remote_addr, node_id);
1818                                Discv4Event::Pong
1819                            }
1820                            Message::FindNode(msg) => {
1821                                self.on_find_node(msg, remote_addr, node_id);
1822                                Discv4Event::FindNode
1823                            }
1824                            Message::Neighbours(msg) => {
1825                                self.on_neighbours(msg, remote_addr, node_id);
1826                                Discv4Event::Neighbours
1827                            }
1828                            Message::EnrRequest(msg) => {
1829                                self.on_enr_request(msg, remote_addr, node_id, hash);
1830                                Discv4Event::EnrRequest
1831                            }
1832                            Message::EnrResponse(msg) => {
1833                                self.on_enr_response(msg, remote_addr, node_id);
1834                                Discv4Event::EnrResponse
1835                            }
1836                        };
1837
1838                        self.queued_events.push_back(event);
1839                    }
1840                }
1841
1842                udp_message_budget -= 1;
1843                if udp_message_budget < 0 {
1844                    trace!(target: "discv4", budget=UDP_MESSAGE_POLL_LOOP_BUDGET, "exhausted message poll budget");
1845                    if self.queued_events.is_empty() {
1846                        // we've exceeded the message budget and have no events to process
1847                        // this will make sure we're woken up again
1848                        cx.waker().wake_by_ref();
1849                    }
1850                    break
1851                }
1852            }
1853
1854            // try resending buffered pings
1855            self.ping_buffered();
1856
1857            // evict expired requests
1858            while self.evict_expired_requests_interval.poll_tick(cx).is_ready() {
1859                self.evict_expired_requests(Instant::now());
1860            }
1861
1862            // evict expired nodes
1863            while self.expire_interval.poll_tick(cx).is_ready() {
1864                self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION);
1865            }
1866
1867            if self.queued_events.is_empty() {
1868                return Poll::Pending
1869            }
1870        }
1871    }
1872}
1873
1874/// Endless future impl
1875impl Stream for Discv4Service {
1876    type Item = Discv4Event;
1877
1878    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1879        // Poll the internal poll method
1880        match ready!(self.get_mut().poll(cx)) {
1881            // if the service is terminated, return None to terminate the stream
1882            Discv4Event::Terminated => Poll::Ready(None),
1883            // For any other event, return Poll::Ready(Some(event))
1884            ev => Poll::Ready(Some(ev)),
1885        }
1886    }
1887}
1888
1889impl fmt::Debug for Discv4Service {
1890    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1891        f.debug_struct("Discv4Service")
1892            .field("local_address", &self.local_address)
1893            .field("local_peer_id", &self.local_peer_id())
1894            .field("local_node_record", &self.local_node_record)
1895            .field("queued_pings", &self.queued_pings)
1896            .field("pending_lookup", &self.pending_lookup)
1897            .field("pending_find_nodes", &self.pending_find_nodes)
1898            .field("lookup_interval", &self.lookup_interval)
1899            .finish_non_exhaustive()
1900    }
1901}
1902
1903/// The Event type the Service stream produces.
1904///
1905/// This is mainly used for testing purposes and represents messages the service processed
1906#[derive(Debug, Eq, PartialEq)]
1907pub enum Discv4Event {
1908    /// A `Ping` message was handled.
1909    Ping,
1910    /// A `Pong` message was handled.
1911    Pong,
1912    /// A `FindNode` message was handled.
1913    FindNode,
1914    /// A `Neighbours` message was handled.
1915    Neighbours,
1916    /// A `EnrRequest` message was handled.
1917    EnrRequest,
1918    /// A `EnrResponse` message was handled.
1919    EnrResponse,
1920    /// Service is being terminated
1921    Terminated,
1922}
1923
1924/// Continuously reads new messages from the channel and writes them to the socket
1925pub(crate) async fn send_loop(udp: Arc<UdpSocket>, rx: EgressReceiver) {
1926    let mut stream = ReceiverStream::new(rx);
1927    while let Some((payload, to)) = stream.next().await {
1928        match udp.send_to(&payload, to).await {
1929            Ok(size) => {
1930                trace!(target: "discv4", ?to, ?size,"sent payload");
1931            }
1932            Err(err) => {
1933                debug!(target: "discv4", ?to, %err,"Failed to send datagram.");
1934            }
1935        }
1936    }
1937}
1938
1939/// Rate limits the number of incoming packets from individual IPs to 1 packet/second
1940const MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP: usize = 60usize;
1941
1942/// Continuously awaits new incoming messages and sends them back through the channel.
1943///
1944/// The receive loop enforce primitive rate limiting for ips to prevent message spams from
1945/// individual IPs
1946pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_id: PeerId) {
1947    let send = |event: IngressEvent| async {
1948        let _ = tx.send(event).await.map_err(|err| {
1949            debug!(
1950                target: "discv4",
1951                 %err,
1952                "failed send incoming packet",
1953            )
1954        });
1955    };
1956
1957    let mut cache = ReceiveCache::default();
1958
1959    // tick at half the rate of the limit
1960    let tick = MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP / 2;
1961    let mut interval = tokio::time::interval(Duration::from_secs(tick as u64));
1962
1963    let mut buf = [0; MAX_PACKET_SIZE];
1964    loop {
1965        let res = udp.recv_from(&mut buf).await;
1966        match res {
1967            Err(err) => {
1968                debug!(target: "discv4", %err, "Failed to read datagram.");
1969                send(IngressEvent::RecvError(err)).await;
1970            }
1971            Ok((read, remote_addr)) => {
1972                // rate limit incoming packets by IP
1973                if cache.inc_ip(remote_addr.ip()) > MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP {
1974                    trace!(target: "discv4", ?remote_addr, "Too many incoming packets from IP.");
1975                    continue
1976                }
1977
1978                let packet = &buf[..read];
1979                match Message::decode(packet) {
1980                    Ok(packet) => {
1981                        if packet.node_id == local_id {
1982                            // received our own message
1983                            debug!(target: "discv4", ?remote_addr, "Received own packet.");
1984                            continue
1985                        }
1986
1987                        // skip if we've already received the same packet
1988                        if cache.contains_packet(packet.hash) {
1989                            debug!(target: "discv4", ?remote_addr, "Received duplicate packet.");
1990                            continue
1991                        }
1992
1993                        send(IngressEvent::Packet(remote_addr, packet)).await;
1994                    }
1995                    Err(err) => {
1996                        trace!(target: "discv4", %err,"Failed to decode packet");
1997                        send(IngressEvent::BadPacket(remote_addr, err, packet.to_vec())).await
1998                    }
1999                }
2000            }
2001        }
2002
2003        // reset the tracked ips if the interval has passed
2004        if poll_fn(|cx| match interval.poll_tick(cx) {
2005            Poll::Ready(_) => Poll::Ready(true),
2006            Poll::Pending => Poll::Ready(false),
2007        })
2008        .await
2009        {
2010            cache.tick_ips(tick);
2011        }
2012    }
2013}
2014
2015/// A cache for received packets and their source address.
2016///
2017/// This is used to discard duplicated packets and rate limit messages from the same source.
2018struct ReceiveCache {
2019    /// keeps track of how many messages we've received from a given IP address since the last
2020    /// tick.
2021    ///
2022    /// This is used to count the number of messages received from a given IP address within an
2023    /// interval.
2024    ip_messages: HashMap<IpAddr, usize>,
2025    // keeps track of unique packet hashes
2026    unique_packets: schnellru::LruMap<B256, ()>,
2027}
2028
2029impl ReceiveCache {
2030    /// Updates the counter for each IP address and removes IPs that have exceeded the limit.
2031    ///
2032    /// This will decrement the counter for each IP address and remove IPs that have reached 0.
2033    fn tick_ips(&mut self, tick: usize) {
2034        self.ip_messages.retain(|_, count| {
2035            if let Some(reset) = count.checked_sub(tick) {
2036                *count = reset;
2037                true
2038            } else {
2039                false
2040            }
2041        });
2042    }
2043
2044    /// Increases the counter for the given IP address and returns the new count.
2045    fn inc_ip(&mut self, ip: IpAddr) -> usize {
2046        let ctn = self.ip_messages.entry(ip).or_default();
2047        *ctn = ctn.saturating_add(1);
2048        *ctn
2049    }
2050
2051    /// Returns true if we previously received the packet
2052    fn contains_packet(&mut self, hash: B256) -> bool {
2053        !self.unique_packets.insert(hash, ())
2054    }
2055}
2056
2057impl Default for ReceiveCache {
2058    fn default() -> Self {
2059        Self {
2060            ip_messages: Default::default(),
2061            unique_packets: schnellru::LruMap::new(schnellru::ByLength::new(32)),
2062        }
2063    }
2064}
2065
2066/// The commands sent from the frontend [Discv4] to the service [`Discv4Service`].
2067enum Discv4Command {
2068    Add(NodeRecord),
2069    SetTcpPort(u16),
2070    SetEIP868RLPPair { key: Vec<u8>, rlp: Bytes },
2071    Ban(PeerId, IpAddr),
2072    BanPeer(PeerId),
2073    BanIp(IpAddr),
2074    Remove(PeerId),
2075    Lookup { node_id: Option<PeerId>, tx: Option<NodeRecordSender> },
2076    SetLookupInterval(Duration),
2077    Updates(OneshotSender<ReceiverStream<DiscoveryUpdate>>),
2078    Terminated,
2079}
2080
2081/// Event type receiver produces
2082#[derive(Debug)]
2083pub(crate) enum IngressEvent {
2084    /// Encountered an error when reading a datagram message.
2085    RecvError(io::Error),
2086    /// Received a bad message
2087    BadPacket(SocketAddr, DecodePacketError, Vec<u8>),
2088    /// Received a datagram from an address.
2089    Packet(SocketAddr, Packet),
2090}
2091
2092/// Tracks a sent ping
2093#[derive(Debug)]
2094struct PingRequest {
2095    // Timestamp when the request was sent.
2096    sent_at: Instant,
2097    // Node to which the request was sent.
2098    node: NodeRecord,
2099    // Hash sent in the Ping request
2100    echo_hash: B256,
2101    /// Why this ping was sent.
2102    reason: PingReason,
2103}
2104
2105/// Rotates the `PeerId` that is periodically looked up.
2106///
2107/// By selecting different targets, the lookups will be seeded with different ALPHA seed nodes.
2108#[derive(Debug)]
2109struct LookupTargetRotator {
2110    interval: usize,
2111    counter: usize,
2112}
2113
2114// === impl LookupTargetRotator ===
2115
2116impl LookupTargetRotator {
2117    /// Returns a rotator that always returns the local target.
2118    const fn local_only() -> Self {
2119        Self { interval: 1, counter: 0 }
2120    }
2121}
2122
2123impl Default for LookupTargetRotator {
2124    fn default() -> Self {
2125        Self {
2126            // every 4th lookup is our own node
2127            interval: 4,
2128            counter: 3,
2129        }
2130    }
2131}
2132
2133impl LookupTargetRotator {
2134    /// This will return the next node id to lookup
2135    fn next(&mut self, local: &PeerId) -> PeerId {
2136        self.counter += 1;
2137        self.counter %= self.interval;
2138        if self.counter == 0 {
2139            return *local
2140        }
2141        PeerId::random()
2142    }
2143}
2144
2145/// Tracks lookups across multiple `FindNode` requests.
2146///
2147/// If this type is dropped by all Clones, it will send all the discovered nodes to the listener, if
2148/// one is present.
2149#[derive(Clone, Debug)]
2150struct LookupContext {
2151    inner: Rc<LookupContextInner>,
2152}
2153
2154impl LookupContext {
2155    /// Create new context for a recursive lookup
2156    fn new(
2157        target: discv5::Key<NodeKey>,
2158        nearest_nodes: impl IntoIterator<Item = (Distance, NodeRecord)>,
2159        listener: Option<NodeRecordSender>,
2160    ) -> Self {
2161        let closest_nodes = nearest_nodes
2162            .into_iter()
2163            .map(|(distance, record)| {
2164                (distance, QueryNode { record, queried: false, responded: false })
2165            })
2166            .collect();
2167
2168        let inner = Rc::new(LookupContextInner {
2169            target,
2170            closest_nodes: RefCell::new(closest_nodes),
2171            listener,
2172        });
2173        Self { inner }
2174    }
2175
2176    /// Returns the target of this lookup
2177    fn target(&self) -> PeerId {
2178        self.inner.target.preimage().0
2179    }
2180
2181    fn closest(&self, num: usize) -> Vec<NodeRecord> {
2182        self.inner
2183            .closest_nodes
2184            .borrow()
2185            .iter()
2186            .filter(|(_, node)| !node.queried)
2187            .map(|(_, n)| n.record)
2188            .take(num)
2189            .collect()
2190    }
2191
2192    /// Returns the closest nodes that have not been queried yet.
2193    fn filter_closest<P>(&self, num: usize, filter: P) -> Vec<NodeRecord>
2194    where
2195        P: FnMut(&NodeRecord) -> bool,
2196    {
2197        self.inner
2198            .closest_nodes
2199            .borrow()
2200            .iter()
2201            .filter(|(_, node)| !node.queried)
2202            .map(|(_, n)| n.record)
2203            .filter(filter)
2204            .take(num)
2205            .collect()
2206    }
2207
2208    /// Inserts the node if it's missing
2209    fn add_node(&self, record: NodeRecord) {
2210        let distance = self.inner.target.distance(&kad_key(record.id));
2211        let mut closest = self.inner.closest_nodes.borrow_mut();
2212        if let btree_map::Entry::Vacant(entry) = closest.entry(distance) {
2213            entry.insert(QueryNode { record, queried: false, responded: false });
2214        }
2215    }
2216
2217    fn set_queried(&self, id: PeerId, val: bool) {
2218        if let Some((_, node)) =
2219            self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2220        {
2221            node.queried = val;
2222        }
2223    }
2224
2225    /// Marks the node as queried
2226    fn mark_queried(&self, id: PeerId) {
2227        self.set_queried(id, true)
2228    }
2229
2230    /// Marks the node as not queried
2231    fn unmark_queried(&self, id: PeerId) {
2232        self.set_queried(id, false)
2233    }
2234
2235    /// Marks the node as responded
2236    fn mark_responded(&self, id: PeerId) {
2237        if let Some((_, node)) =
2238            self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2239        {
2240            node.responded = true;
2241        }
2242    }
2243}
2244
2245// SAFETY: The [`Discv4Service`] is intended to be spawned as task which requires `Send`.
2246// The `LookupContext` is shared by all active `FindNode` requests that are part of the lookup step.
2247// Which can modify the context. The shared context is only ever accessed mutably when a `Neighbour`
2248// response is processed and all Clones are stored inside [`Discv4Service`], in other words it is
2249// guaranteed that there's only 1 owner ([`Discv4Service`]) of all possible [`Rc`] clones of
2250// [`LookupContext`].
2251unsafe impl Send for LookupContext {}
2252#[derive(Debug)]
2253struct LookupContextInner {
2254    /// The target to lookup.
2255    target: discv5::Key<NodeKey>,
2256    /// The closest nodes
2257    closest_nodes: RefCell<BTreeMap<Distance, QueryNode>>,
2258    /// A listener for all the nodes retrieved in this lookup
2259    ///
2260    /// This is present if the lookup was triggered manually via [Discv4] and we want to return all
2261    /// the nodes once the lookup finishes.
2262    listener: Option<NodeRecordSender>,
2263}
2264
2265impl Drop for LookupContextInner {
2266    fn drop(&mut self) {
2267        if let Some(tx) = self.listener.take() {
2268            // there's only 1 instance shared across `FindNode` requests, if this is dropped then
2269            // all requests finished, and we can send all results back
2270            let nodes = self
2271                .closest_nodes
2272                .take()
2273                .into_values()
2274                .filter(|node| node.responded)
2275                .map(|node| node.record)
2276                .collect();
2277            let _ = tx.send(nodes);
2278        }
2279    }
2280}
2281
2282/// Tracks the state of a recursive lookup step
2283#[derive(Debug, Clone, Copy)]
2284struct QueryNode {
2285    record: NodeRecord,
2286    queried: bool,
2287    responded: bool,
2288}
2289
2290#[derive(Debug)]
2291struct FindNodeRequest {
2292    // Timestamp when the request was sent.
2293    sent_at: Instant,
2294    // Number of items sent by the node
2295    response_count: usize,
2296    // Whether the request has been answered yet.
2297    answered: bool,
2298    /// Response buffer
2299    lookup_context: LookupContext,
2300}
2301
2302// === impl FindNodeRequest ===
2303
2304impl FindNodeRequest {
2305    fn new(resp: LookupContext) -> Self {
2306        Self { sent_at: Instant::now(), response_count: 0, answered: false, lookup_context: resp }
2307    }
2308}
2309
2310/// Cached signed `FindNode` packet to avoid redundant ECDSA signing during Kademlia lookups.
2311#[derive(Debug)]
2312struct CachedFindNode {
2313    target: PeerId,
2314    payload: Bytes,
2315    hash: B256,
2316    cached_at: Instant,
2317}
2318
2319impl CachedFindNode {
2320    /// Returns the cached `(payload, hash)` if the target matches and the cache is still fresh,
2321    /// or signs a new packet, updates the cache, and returns it.
2322    fn get_or_sign(
2323        cache: &mut Option<Self>,
2324        target: PeerId,
2325        ttl: Duration,
2326        secret_key: &secp256k1::SecretKey,
2327        expire: u64,
2328    ) -> (Bytes, B256) {
2329        if let Some(c) = cache.as_ref() &&
2330            c.target == target &&
2331            c.cached_at.elapsed() < ttl
2332        {
2333            return (c.payload.clone(), c.hash);
2334        }
2335
2336        let msg = Message::FindNode(FindNode { id: target, expire });
2337        let (payload, hash) = msg.encode(secret_key);
2338
2339        *cache = Some(Self { target, payload: payload.clone(), hash, cached_at: Instant::now() });
2340
2341        (payload, hash)
2342    }
2343}
2344
2345#[derive(Debug)]
2346struct EnrRequestState {
2347    // Timestamp when the request was sent.
2348    sent_at: Instant,
2349    // Hash sent in the Ping request
2350    echo_hash: B256,
2351}
2352
2353/// Stored node info.
2354#[derive(Debug, Clone, Eq, PartialEq)]
2355struct NodeEntry {
2356    /// Node record info.
2357    record: NodeRecord,
2358    /// Timestamp of last pong.
2359    last_seen: Instant,
2360    /// Last enr seq we retrieved via a ENR request.
2361    last_enr_seq: Option<u64>,
2362    /// `ForkId` if retrieved via ENR requests.
2363    fork_id: Option<ForkId>,
2364    /// Counter for failed _consecutive_ findNode requests.
2365    find_node_failures: u8,
2366    /// Whether the endpoint of the peer is proven.
2367    has_endpoint_proof: bool,
2368}
2369
2370// === impl NodeEntry ===
2371
2372impl NodeEntry {
2373    /// Creates a new, unpopulated entry
2374    fn new(record: NodeRecord) -> Self {
2375        Self {
2376            record,
2377            last_seen: Instant::now(),
2378            last_enr_seq: None,
2379            fork_id: None,
2380            find_node_failures: 0,
2381            has_endpoint_proof: false,
2382        }
2383    }
2384
2385    #[cfg(test)]
2386    fn new_proven(record: NodeRecord) -> Self {
2387        let mut node = Self::new(record);
2388        node.has_endpoint_proof = true;
2389        node
2390    }
2391
2392    /// Marks the entry with an established proof and resets the consecutive failure counter.
2393    const fn establish_proof(&mut self) {
2394        self.has_endpoint_proof = true;
2395        self.find_node_failures = 0;
2396    }
2397
2398    /// Returns true if the tracked find node failures exceed the max amount
2399    const fn exceeds_find_node_failures(&self, max_failures: u8) -> bool {
2400        self.find_node_failures >= max_failures
2401    }
2402
2403    /// Updates the last timestamp and sets the enr seq
2404    fn update_with_enr(&mut self, last_enr_seq: Option<u64>) -> Option<u64> {
2405        self.update_now(|s| std::mem::replace(&mut s.last_enr_seq, last_enr_seq))
2406    }
2407
2408    /// Increases the failed request counter
2409    const fn inc_failed_request(&mut self) {
2410        self.find_node_failures += 1;
2411    }
2412
2413    /// Updates the last timestamp and sets the enr seq
2414    fn update_with_fork_id(&mut self, fork_id: Option<ForkId>) -> Option<ForkId> {
2415        self.update_now(|s| std::mem::replace(&mut s.fork_id, fork_id))
2416    }
2417
2418    /// Updates the `last_seen` timestamp and calls the closure
2419    fn update_now<F, R>(&mut self, f: F) -> R
2420    where
2421        F: FnOnce(&mut Self) -> R,
2422    {
2423        self.last_seen = Instant::now();
2424        f(self)
2425    }
2426}
2427
2428// === impl NodeEntry ===
2429
2430impl NodeEntry {
2431    /// Returns true if the node should be re-pinged.
2432    fn is_expired(&self) -> bool {
2433        self.last_seen.elapsed() > (ENDPOINT_PROOF_EXPIRATION / 2)
2434    }
2435}
2436
2437/// Represents why a ping is issued
2438#[derive(Debug)]
2439enum PingReason {
2440    /// Initial ping to a previously unknown peer that was inserted into the table.
2441    InitialInsert,
2442    /// A ping to a peer to establish a bond (endpoint proof).
2443    EstablishBond,
2444    /// Re-ping a peer.
2445    RePing,
2446    /// Part of a lookup to ensure endpoint is proven before we can send a `FindNode` request.
2447    Lookup(NodeRecord, LookupContext),
2448}
2449
2450/// Represents node related updates state changes in the underlying node table
2451#[derive(Debug, Clone)]
2452pub enum DiscoveryUpdate {
2453    /// A new node was discovered _and_ added to the table.
2454    Added(NodeRecord),
2455    /// A new node was discovered but _not_ added to the table because it is currently full.
2456    DiscoveredAtCapacity(NodeRecord),
2457    /// Received a [`ForkId`] via EIP-868 for the given [`NodeRecord`].
2458    EnrForkId(NodeRecord, ForkId),
2459    /// Node that was removed from the table
2460    Removed(PeerId),
2461    /// A series of updates
2462    Batch(Vec<Self>),
2463}
2464
2465#[cfg(test)]
2466mod tests {
2467    use super::*;
2468    use crate::test_utils::{create_discv4, create_discv4_with_config, rng_endpoint, rng_record};
2469    use alloy_primitives::hex;
2470    use alloy_rlp::{Decodable, Encodable};
2471    use rand_08::Rng;
2472    use reth_ethereum_forks::{EnrForkIdEntry, ForkHash};
2473    use reth_network_peers::mainnet_nodes;
2474    use std::future::poll_fn;
2475
2476    #[tokio::test]
2477    async fn test_configured_enr_forkid_entry() {
2478        let fork: ForkId = ForkId { hash: ForkHash([220, 233, 108, 45]), next: 0u64 };
2479        let mut disc_conf = Discv4Config::default();
2480        disc_conf.add_eip868_pair("eth", EnrForkIdEntry::from(fork));
2481        let (_discv4, service) = create_discv4_with_config(disc_conf).await;
2482        let eth = service.local_eip_868_enr.get_raw_rlp(b"eth").unwrap();
2483        let fork_entry_id = EnrForkIdEntry::decode(&mut &eth[..]).unwrap();
2484
2485        let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2486        let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2487        let expected = EnrForkIdEntry {
2488            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2489        };
2490        assert_eq!(expected, fork_entry_id);
2491        assert_eq!(expected, decoded);
2492    }
2493
2494    #[test]
2495    fn test_enr_forkid_entry_decode() {
2496        let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2497        let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2498        let expected = EnrForkIdEntry {
2499            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2500        };
2501        assert_eq!(expected, decoded);
2502    }
2503
2504    #[test]
2505    fn test_enr_forkid_entry_encode() {
2506        let original = EnrForkIdEntry {
2507            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2508        };
2509        let expected: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2510        let mut encoded = Vec::with_capacity(expected.len());
2511        original.encode(&mut encoded);
2512        assert_eq!(&expected[..], encoded.as_slice());
2513    }
2514
2515    #[test]
2516    fn test_local_rotator() {
2517        let id = PeerId::random();
2518        let mut rotator = LookupTargetRotator::local_only();
2519        assert_eq!(rotator.next(&id), id);
2520        assert_eq!(rotator.next(&id), id);
2521    }
2522
2523    #[test]
2524    fn test_rotator() {
2525        let id = PeerId::random();
2526        let mut rotator = LookupTargetRotator::default();
2527        assert_eq!(rotator.next(&id), id);
2528        assert_ne!(rotator.next(&id), id);
2529        assert_ne!(rotator.next(&id), id);
2530        assert_ne!(rotator.next(&id), id);
2531        assert_eq!(rotator.next(&id), id);
2532    }
2533
2534    #[tokio::test]
2535    async fn test_pending_ping() {
2536        let (_, mut service) = create_discv4().await;
2537
2538        let local_addr = service.local_addr();
2539
2540        let mut num_inserted = 0;
2541        loop {
2542            let node = NodeRecord::new(local_addr, PeerId::random());
2543            if service.add_node(node) {
2544                num_inserted += 1;
2545                assert!(service.pending_pings.contains_key(&node.id));
2546                assert_eq!(service.pending_pings.len(), num_inserted);
2547                if num_inserted == MAX_NODES_PING {
2548                    break
2549                }
2550            }
2551        }
2552
2553        // `pending_pings` is full, insert into `queued_pings`.
2554        num_inserted = 0;
2555        for _ in 0..MAX_NODES_PING {
2556            let node = NodeRecord::new(local_addr, PeerId::random());
2557            if service.add_node(node) {
2558                num_inserted += 1;
2559                assert!(!service.pending_pings.contains_key(&node.id));
2560                assert_eq!(service.pending_pings.len(), MAX_NODES_PING);
2561                assert_eq!(service.queued_pings.len(), num_inserted);
2562            }
2563        }
2564    }
2565
2566    // Bootstraps with mainnet boot nodes
2567    #[tokio::test(flavor = "multi_thread")]
2568    #[ignore]
2569    async fn test_mainnet_lookup() {
2570        reth_tracing::init_test_tracing();
2571        let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2572
2573        let all_nodes = mainnet_nodes();
2574        let config = Discv4Config::builder()
2575            .add_boot_nodes(all_nodes)
2576            .lookup_interval(Duration::from_secs(1))
2577            .add_eip868_pair("eth", fork_id)
2578            .build();
2579        let (_discv4, mut service) = create_discv4_with_config(config).await;
2580
2581        let mut updates = service.update_stream();
2582
2583        let _handle = service.spawn();
2584
2585        let mut table = HashMap::new();
2586        while let Some(update) = updates.next().await {
2587            match update {
2588                DiscoveryUpdate::EnrForkId(record, fork_id) => {
2589                    println!("{record:?}, {fork_id:?}");
2590                }
2591                DiscoveryUpdate::Added(record) => {
2592                    table.insert(record.id, record);
2593                }
2594                DiscoveryUpdate::Removed(id) => {
2595                    table.remove(&id);
2596                }
2597                _ => {}
2598            }
2599            println!("total peers {}", table.len());
2600        }
2601    }
2602
2603    #[tokio::test]
2604    async fn test_mapped_ipv4() {
2605        reth_tracing::init_test_tracing();
2606        let mut rng = rand_08::thread_rng();
2607        let config = Discv4Config::builder().build();
2608        let (_discv4, mut service) = create_discv4_with_config(config).await;
2609
2610        let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2611        let v6 = v4.to_ipv6_mapped();
2612        let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2613
2614        let ping = Ping {
2615            from: rng_endpoint(&mut rng),
2616            to: rng_endpoint(&mut rng),
2617            expire: service.ping_expiration(),
2618            enr_sq: Some(rng.r#gen()),
2619        };
2620
2621        let id = PeerId::random();
2622        service.on_ping(ping, addr, id, B256::random());
2623
2624        let key = kad_key(id);
2625        match service.kbuckets.entry(&key) {
2626            kbucket::Entry::Present(entry, _) => {
2627                let node_addr = entry.value().record.address;
2628                assert!(node_addr.is_ipv4());
2629                assert_eq!(node_addr, IpAddr::from(v4));
2630            }
2631            _ => unreachable!(),
2632        };
2633    }
2634
2635    #[tokio::test]
2636    async fn test_respect_ping_expiration() {
2637        reth_tracing::init_test_tracing();
2638        let mut rng = rand_08::thread_rng();
2639        let config = Discv4Config::builder().build();
2640        let (_discv4, mut service) = create_discv4_with_config(config).await;
2641
2642        let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2643        let v6 = v4.to_ipv6_mapped();
2644        let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2645
2646        let ping = Ping {
2647            from: rng_endpoint(&mut rng),
2648            to: rng_endpoint(&mut rng),
2649            expire: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() - 1,
2650            enr_sq: Some(rng.r#gen()),
2651        };
2652
2653        let id = PeerId::random();
2654        service.on_ping(ping, addr, id, B256::random());
2655
2656        let key = kad_key(id);
2657        match service.kbuckets.entry(&key) {
2658            kbucket::Entry::Absent(_) => {}
2659            _ => unreachable!(),
2660        };
2661    }
2662
2663    #[tokio::test]
2664    async fn test_single_lookups() {
2665        reth_tracing::init_test_tracing();
2666
2667        let config = Discv4Config::builder().build();
2668        let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2669
2670        let id = PeerId::random();
2671        let key = kad_key(id);
2672        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2673
2674        let _ = service.kbuckets.insert_or_update(
2675            &key,
2676            NodeEntry::new_proven(record),
2677            NodeStatus {
2678                direction: ConnectionDirection::Incoming,
2679                state: ConnectionState::Connected,
2680            },
2681        );
2682
2683        service.lookup_self();
2684        assert_eq!(service.pending_find_nodes.len(), 1);
2685
2686        poll_fn(|cx| {
2687            let _ = service.poll(cx);
2688            assert_eq!(service.pending_find_nodes.len(), 1);
2689
2690            Poll::Ready(())
2691        })
2692        .await;
2693    }
2694
2695    #[tokio::test]
2696    async fn test_on_neighbours_recursive_lookup() {
2697        reth_tracing::init_test_tracing();
2698
2699        let config = Discv4Config::builder().build();
2700        let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2701        let (_discv4, mut service2) = create_discv4_with_config(config).await;
2702
2703        let id = PeerId::random();
2704        let key = kad_key(id);
2705        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2706
2707        let _ = service.kbuckets.insert_or_update(
2708            &key,
2709            NodeEntry::new_proven(record),
2710            NodeStatus {
2711                direction: ConnectionDirection::Incoming,
2712                state: ConnectionState::Connected,
2713            },
2714        );
2715        // Needed in this test to populate self.pending_find_nodes for as a prereq to a valid
2716        // on_neighbours request
2717        service.lookup_self();
2718        assert_eq!(service.pending_find_nodes.len(), 1);
2719
2720        poll_fn(|cx| {
2721            let _ = service.poll(cx);
2722            assert_eq!(service.pending_find_nodes.len(), 1);
2723
2724            Poll::Ready(())
2725        })
2726        .await;
2727
2728        let expiry = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() +
2729            10000000000000;
2730        let msg = Neighbours { nodes: vec![service2.local_node_record], expire: expiry };
2731        service.on_neighbours(msg, record.tcp_addr(), id);
2732        // wait for the processed ping
2733        let event = poll_fn(|cx| service2.poll(cx)).await;
2734        assert_eq!(event, Discv4Event::Ping);
2735        // assert that no find_node req has been added here on top of the initial one, since both
2736        // sides of the endpoint proof is not completed here
2737        assert_eq!(service.pending_find_nodes.len(), 1);
2738        // we now wait for PONG
2739        let event = poll_fn(|cx| service.poll(cx)).await;
2740        assert_eq!(event, Discv4Event::Pong);
2741        // Ideally we want to assert against service.pending_lookup.len() here - but because the
2742        // service2 sends Pong and Ping consecutivley on_ping(), the pending_lookup table gets
2743        // drained almost immediately - and no way to grab the handle to its intermediary state here
2744        // :(
2745        let event = poll_fn(|cx| service.poll(cx)).await;
2746        assert_eq!(event, Discv4Event::Ping);
2747        // assert that we've added the find_node req here after both sides of the endpoint proof is
2748        // done
2749        assert_eq!(service.pending_find_nodes.len(), 2);
2750    }
2751
2752    #[tokio::test]
2753    async fn test_no_local_in_closest() {
2754        reth_tracing::init_test_tracing();
2755
2756        let config = Discv4Config::builder().build();
2757        let (_discv4, mut service) = create_discv4_with_config(config).await;
2758
2759        let target_key = kad_key(PeerId::random());
2760
2761        let id = PeerId::random();
2762        let key = kad_key(id);
2763        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2764
2765        let _ = service.kbuckets.insert_or_update(
2766            &key,
2767            NodeEntry::new(record),
2768            NodeStatus {
2769                direction: ConnectionDirection::Incoming,
2770                state: ConnectionState::Connected,
2771            },
2772        );
2773
2774        let closest = service
2775            .kbuckets
2776            .closest_values(&target_key)
2777            .map(|n| n.value.record)
2778            .take(MAX_NODES_PER_BUCKET)
2779            .collect::<Vec<_>>();
2780
2781        assert_eq!(closest.len(), 1);
2782        assert!(!closest.iter().any(|r| r.id == *service.local_peer_id()));
2783    }
2784
2785    #[tokio::test]
2786    async fn test_random_lookup() {
2787        reth_tracing::init_test_tracing();
2788
2789        let config = Discv4Config::builder().build();
2790        let (_discv4, mut service) = create_discv4_with_config(config).await;
2791
2792        let target = PeerId::random();
2793
2794        let id = PeerId::random();
2795        let key = kad_key(id);
2796        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2797
2798        let _ = service.kbuckets.insert_or_update(
2799            &key,
2800            NodeEntry::new_proven(record),
2801            NodeStatus {
2802                direction: ConnectionDirection::Incoming,
2803                state: ConnectionState::Connected,
2804            },
2805        );
2806
2807        service.lookup(target);
2808        assert_eq!(service.pending_find_nodes.len(), 1);
2809
2810        let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2811
2812        assert_eq!(ctx.target(), target);
2813        assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2814
2815        ctx.add_node(record);
2816        assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2817    }
2818
2819    #[tokio::test]
2820    async fn test_reping_on_find_node_failures() {
2821        reth_tracing::init_test_tracing();
2822
2823        let config = Discv4Config::builder().build();
2824        let (_discv4, mut service) = create_discv4_with_config(config).await;
2825
2826        let target = PeerId::random();
2827
2828        let id = PeerId::random();
2829        let key = kad_key(id);
2830        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2831
2832        let mut entry = NodeEntry::new_proven(record);
2833        entry.find_node_failures = u8::MAX;
2834        let _ = service.kbuckets.insert_or_update(
2835            &key,
2836            entry,
2837            NodeStatus {
2838                direction: ConnectionDirection::Incoming,
2839                state: ConnectionState::Connected,
2840            },
2841        );
2842
2843        service.lookup(target);
2844        assert_eq!(service.pending_find_nodes.len(), 0);
2845        assert_eq!(service.pending_pings.len(), 1);
2846
2847        service.update_on_pong(record, None);
2848
2849        service
2850            .on_entry(record.id, |entry| {
2851                // reset on pong
2852                assert_eq!(entry.find_node_failures, 0);
2853                assert!(entry.has_endpoint_proof);
2854            })
2855            .unwrap();
2856    }
2857
2858    #[tokio::test]
2859    async fn test_service_commands() {
2860        reth_tracing::init_test_tracing();
2861
2862        let config = Discv4Config::builder().build();
2863        let (discv4, mut service) = create_discv4_with_config(config).await;
2864
2865        service.lookup_self();
2866
2867        let _handle = service.spawn();
2868        discv4.send_lookup_self();
2869        let _ = discv4.lookup_self().await;
2870    }
2871
2872    #[tokio::test]
2873    async fn test_requests_timeout() {
2874        reth_tracing::init_test_tracing();
2875        let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2876
2877        let config = Discv4Config::builder()
2878            .request_timeout(Duration::from_millis(200))
2879            .ping_expiration(Duration::from_millis(200))
2880            .lookup_neighbours_expiration(Duration::from_millis(200))
2881            .add_eip868_pair("eth", fork_id)
2882            .build();
2883        let (_disv4, mut service) = create_discv4_with_config(config).await;
2884
2885        let id = PeerId::random();
2886        let key = kad_key(id);
2887        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2888
2889        let _ = service.kbuckets.insert_or_update(
2890            &key,
2891            NodeEntry::new_proven(record),
2892            NodeStatus {
2893                direction: ConnectionDirection::Incoming,
2894                state: ConnectionState::Connected,
2895            },
2896        );
2897
2898        service.lookup_self();
2899        assert_eq!(service.pending_find_nodes.len(), 1);
2900
2901        let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2902
2903        service.pending_lookup.insert(record.id, (Instant::now(), ctx));
2904
2905        assert_eq!(service.pending_lookup.len(), 1);
2906
2907        let ping = Ping {
2908            from: service.local_node_record.into(),
2909            to: record.into(),
2910            expire: service.ping_expiration(),
2911            enr_sq: service.enr_seq(),
2912        };
2913        let echo_hash = service.send_packet(Message::Ping(ping), record.udp_addr());
2914        let ping_request = PingRequest {
2915            sent_at: Instant::now(),
2916            node: record,
2917            echo_hash,
2918            reason: PingReason::InitialInsert,
2919        };
2920        service.pending_pings.insert(record.id, ping_request);
2921
2922        assert_eq!(service.pending_pings.len(), 1);
2923
2924        tokio::time::sleep(Duration::from_secs(1)).await;
2925
2926        poll_fn(|cx| {
2927            let _ = service.poll(cx);
2928
2929            assert_eq!(service.pending_find_nodes.len(), 0);
2930            assert_eq!(service.pending_lookup.len(), 0);
2931            assert_eq!(service.pending_pings.len(), 0);
2932
2933            Poll::Ready(())
2934        })
2935        .await;
2936    }
2937
2938    // sends a PING packet with wrong 'to' field and expects a PONG response.
2939    #[tokio::test(flavor = "multi_thread")]
2940    async fn test_check_wrong_to() {
2941        reth_tracing::init_test_tracing();
2942
2943        let config = Discv4Config::builder().external_ip_resolver(None).build();
2944        let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2945        let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2946
2947        // ping node 2 with wrong to field
2948        let mut ping = Ping {
2949            from: service_1.local_node_record.into(),
2950            to: service_2.local_node_record.into(),
2951            expire: service_1.ping_expiration(),
2952            enr_sq: service_1.enr_seq(),
2953        };
2954        ping.to.address = "192.0.2.0".parse().unwrap();
2955
2956        let echo_hash = service_1.send_packet(Message::Ping(ping), service_2.local_addr());
2957        let ping_request = PingRequest {
2958            sent_at: Instant::now(),
2959            node: service_2.local_node_record,
2960            echo_hash,
2961            reason: PingReason::InitialInsert,
2962        };
2963        service_1.pending_pings.insert(*service_2.local_peer_id(), ping_request);
2964
2965        // wait for the processed ping
2966        let event = poll_fn(|cx| service_2.poll(cx)).await;
2967        assert_eq!(event, Discv4Event::Ping);
2968
2969        // we now wait for PONG
2970        let event = poll_fn(|cx| service_1.poll(cx)).await;
2971        assert_eq!(event, Discv4Event::Pong);
2972        // followed by a ping
2973        let event = poll_fn(|cx| service_1.poll(cx)).await;
2974        assert_eq!(event, Discv4Event::Ping);
2975    }
2976
2977    #[tokio::test(flavor = "multi_thread")]
2978    async fn test_check_ping_pong() {
2979        reth_tracing::init_test_tracing();
2980
2981        let config = Discv4Config::builder().external_ip_resolver(None).build();
2982        let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2983        let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2984
2985        // send ping from 1 -> 2
2986        service_1.add_node(service_2.local_node_record);
2987
2988        // wait for the processed ping
2989        let event = poll_fn(|cx| service_2.poll(cx)).await;
2990        assert_eq!(event, Discv4Event::Ping);
2991
2992        // node is now in the table but not connected yet
2993        let key1 = kad_key(*service_1.local_peer_id());
2994        match service_2.kbuckets.entry(&key1) {
2995            kbucket::Entry::Present(_entry, status) => {
2996                assert!(!status.is_connected());
2997            }
2998            _ => unreachable!(),
2999        }
3000
3001        // we now wait for PONG
3002        let event = poll_fn(|cx| service_1.poll(cx)).await;
3003        assert_eq!(event, Discv4Event::Pong);
3004
3005        // endpoint is proven
3006        let key2 = kad_key(*service_2.local_peer_id());
3007        match service_1.kbuckets.entry(&key2) {
3008            kbucket::Entry::Present(_entry, status) => {
3009                assert!(status.is_connected());
3010            }
3011            _ => unreachable!(),
3012        }
3013
3014        // we now wait for the PING initiated by 2
3015        let event = poll_fn(|cx| service_1.poll(cx)).await;
3016        assert_eq!(event, Discv4Event::Ping);
3017
3018        // Drain events from service_2 until we see the Pong. Intermediate EnrRequest and
3019        // FindNode events are expected: ENR requests come from the ping handshake, and FindNode
3020        // arrives because service_1 resets its lookup interval on the first bootnode pong.
3021        tokio::time::timeout(Duration::from_secs(5), async {
3022            loop {
3023                let event = poll_fn(|cx| service_2.poll(cx)).await;
3024                match event {
3025                    Discv4Event::Pong => break,
3026                    Discv4Event::EnrRequest | Discv4Event::FindNode => {}
3027                    ev => unreachable!("{ev:?}"),
3028                }
3029            }
3030        })
3031        .await
3032        .expect("timed out waiting for Pong from service_2");
3033
3034        // endpoint is proven
3035        match service_2.kbuckets.entry(&key1) {
3036            kbucket::Entry::Present(_entry, status) => {
3037                assert!(status.is_connected());
3038            }
3039            ev => unreachable!("{ev:?}"),
3040        }
3041    }
3042
3043    #[test]
3044    fn test_insert() {
3045        let local_node_record = rng_record(&mut rand_08::thread_rng());
3046        let mut kbuckets: KBucketsTable<NodeKey, NodeEntry> = KBucketsTable::new(
3047            NodeKey::from(&local_node_record).into(),
3048            Duration::from_secs(60),
3049            MAX_NODES_PER_BUCKET,
3050            None,
3051            None,
3052        );
3053
3054        let new_record = rng_record(&mut rand_08::thread_rng());
3055        let key = kad_key(new_record.id);
3056        match kbuckets.entry(&key) {
3057            kbucket::Entry::Absent(entry) => {
3058                let node = NodeEntry::new(new_record);
3059                let _ = entry.insert(
3060                    node,
3061                    NodeStatus {
3062                        direction: ConnectionDirection::Outgoing,
3063                        state: ConnectionState::Disconnected,
3064                    },
3065                );
3066            }
3067            _ => {
3068                unreachable!()
3069            }
3070        };
3071        match kbuckets.entry(&key) {
3072            kbucket::Entry::Present(_, _) => {}
3073            _ => {
3074                unreachable!()
3075            }
3076        }
3077    }
3078
3079    #[tokio::test]
3080    async fn test_bootnode_not_in_update_stream() {
3081        reth_tracing::init_test_tracing();
3082        let (_, service_1) = create_discv4().await;
3083        let peerid_1 = *service_1.local_peer_id();
3084
3085        let config = Discv4Config::builder().add_boot_node(service_1.local_node_record).build();
3086        service_1.spawn();
3087
3088        let (_, mut service_2) = create_discv4_with_config(config).await;
3089
3090        let mut updates = service_2.update_stream();
3091
3092        service_2.spawn();
3093
3094        // Poll for events for a reasonable time
3095        let mut bootnode_appeared = false;
3096        let timeout = tokio::time::sleep(Duration::from_secs(1));
3097        tokio::pin!(timeout);
3098
3099        loop {
3100            tokio::select! {
3101                Some(update) = updates.next() => {
3102                    if let DiscoveryUpdate::Added(record) = update
3103                        && record.id == peerid_1 {
3104                            bootnode_appeared = true;
3105                            break;
3106                        }
3107                }
3108                _ = &mut timeout => break,
3109            }
3110        }
3111
3112        // Assert bootnode did not appear in update stream
3113        assert!(bootnode_appeared, "Bootnode should appear in update stream");
3114    }
3115
3116    fn insert_proven_node(service: &mut Discv4Service, record: NodeRecord) {
3117        let key = kad_key(record.id);
3118        let _ = service.kbuckets.insert_or_update(
3119            &key,
3120            NodeEntry::new_proven(record),
3121            NodeStatus {
3122                direction: ConnectionDirection::Incoming,
3123                state: ConnectionState::Connected,
3124            },
3125        );
3126    }
3127
3128    fn insert_initial_ping(service: &mut Discv4Service, record: NodeRecord) -> B256 {
3129        let echo_hash = B256::random();
3130        service.pending_pings.insert(
3131            record.id,
3132            PingRequest {
3133                sent_at: Instant::now(),
3134                node: record,
3135                echo_hash,
3136                reason: PingReason::InitialInsert,
3137            },
3138        );
3139        echo_hash
3140    }
3141
3142    fn make_pong(service: &Discv4Service, echo_hash: B256) -> Pong {
3143        Pong {
3144            to: rng_endpoint(&mut rand_08::thread_rng()),
3145            echo: echo_hash,
3146            expire: service.ping_expiration(),
3147            enr_sq: None,
3148        }
3149    }
3150
3151    #[tokio::test]
3152    async fn test_lookup_reset_on_first_bootnode_pong() {
3153        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
3154        let config = Discv4Config::builder().add_boot_node(record).build();
3155        let (_discv4, mut service) = create_discv4_with_config(config).await;
3156
3157        // 1. initial state
3158        assert!(service.pending_lookup_reset);
3159
3160        // 2. setup: proven bootnode + pending InitialInsert ping
3161        insert_proven_node(&mut service, record);
3162        let echo_hash = insert_initial_ping(&mut service, record);
3163
3164        // 3. input: pong arrives
3165        service.on_pong(make_pong(&service, echo_hash), record.udp_addr(), record.id);
3166
3167        // 4. flag should be consumed — interval was reset
3168        assert!(!service.pending_lookup_reset, "flag should be consumed");
3169    }
3170
3171    #[tokio::test]
3172    async fn test_lookup_reset_fires_only_once() {
3173        let records: Vec<_> = (0..2)
3174            .map(|_| NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random()))
3175            .collect();
3176        let config = Discv4Config::builder().add_boot_nodes(records.clone()).build();
3177        let (_discv4, mut service) = create_discv4_with_config(config).await;
3178
3179        // 1. setup: two proven bootnodes with pending InitialInsert pings
3180        for &r in &records {
3181            insert_proven_node(&mut service, r);
3182        }
3183        let hashes: Vec<_> =
3184            records.iter().map(|r| insert_initial_ping(&mut service, *r)).collect();
3185
3186        // 2. first pong -> consumes the flag (resets the interval)
3187        service.on_pong(make_pong(&service, hashes[0]), records[0].udp_addr(), records[0].id);
3188        assert!(!service.pending_lookup_reset);
3189
3190        // 3. second pong -> flag already consumed, no second reset
3191        service.on_pong(make_pong(&service, hashes[1]), records[1].udp_addr(), records[1].id);
3192        assert!(!service.pending_lookup_reset);
3193    }
3194
3195    #[tokio::test]
3196    async fn test_lookup_reset_not_triggered_by_non_bootnode() {
3197        let bootnode = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
3198        let config = Discv4Config::builder().add_boot_node(bootnode).build();
3199        let (_discv4, mut service) = create_discv4_with_config(config).await;
3200
3201        assert!(service.pending_lookup_reset);
3202
3203        // a non-bootnode pong should not consume the flag
3204        let stranger = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
3205        insert_proven_node(&mut service, stranger);
3206        let echo_hash = insert_initial_ping(&mut service, stranger);
3207        service.on_pong(make_pong(&service, echo_hash), stranger.udp_addr(), stranger.id);
3208
3209        assert!(service.pending_lookup_reset, "flag should not be consumed by non-bootnode");
3210    }
3211
3212    #[tokio::test]
3213    async fn test_lookup_reset_disabled_when_lookup_disabled() {
3214        let config = Discv4Config::builder().enable_lookup(false).build();
3215        let (_discv4, service) = create_discv4_with_config(config).await;
3216
3217        // flag should be false when lookups are disabled
3218        assert!(!service.pending_lookup_reset);
3219    }
3220}