Skip to main content

reth_discv4/
lib.rs

1//! Discovery v4 implementation: <https://github.com/ethereum/devp2p/blob/master/discv4.md>
2//!
3//! Discv4 employs a kademlia-like routing table to store and manage discovered peers and topics.
4//! The protocol allows for external IP discovery in NAT environments through regular PING/PONG's
5//! with discovered nodes. Nodes return the external IP address that they have received and a simple
6//! majority is chosen as our external IP address. If an external IP address is updated, this is
7//! produced as an event to notify the swarm (if one is used for this behaviour).
8//!
9//! This implementation consists of a [`Discv4`] and [`Discv4Service`] pair. The service manages the
10//! state and drives the UDP socket. The (optional) [`Discv4`] serves as the frontend to interact
11//! with the service via a channel. Whenever the underlying table changes service produces a
12//! [`DiscoveryUpdate`] that listeners will receive.
13//!
14//! ## Feature Flags
15//!
16//! - `serde` (default): Enable serde support
17//! - `test-utils`: Export utilities for testing
18
19#![doc(
20    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
21    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
22    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
23)]
24#![cfg_attr(not(test), warn(unused_crate_dependencies))]
25#![cfg_attr(docsrs, feature(doc_cfg))]
26
27use crate::{
28    error::{DecodePacketError, Discv4Error},
29    proto::{FindNode, Message, Neighbours, Packet, Ping, Pong},
30};
31use alloy_primitives::{bytes::Bytes, hex, B256};
32use discv5::{
33    kbucket,
34    kbucket::{
35        BucketInsertResult, Distance, Entry as BucketEntry, InsertResult, KBucketsTable,
36        NodeStatus, MAX_NODES_PER_BUCKET,
37    },
38    ConnectionDirection, ConnectionState,
39};
40use enr::Enr;
41use itertools::Itertools;
42use parking_lot::Mutex;
43use proto::{EnrRequest, EnrResponse};
44use reth_ethereum_forks::ForkId;
45use reth_network_peers::{pk2id, PeerId};
46use secp256k1::SecretKey;
47use std::{
48    cell::RefCell,
49    collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque},
50    fmt, io,
51    net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
52    pin::Pin,
53    rc::Rc,
54    sync::Arc,
55    task::{ready, Context, Poll},
56    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
57};
58use tokio::{
59    net::UdpSocket,
60    sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::Sender as OneshotSender},
61    task::{JoinHandle, JoinSet},
62    time::Interval,
63};
64use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
65use tracing::{debug, trace};
66
67pub mod error;
68pub mod proto;
69
70mod config;
71pub use config::{Discv4Config, Discv4ConfigBuilder};
72
73mod node;
74use node::{kad_key, NodeKey};
75
76mod table;
77
78// reexport NodeRecord primitive
79pub use reth_network_peers::NodeRecord;
80
81#[cfg(any(test, feature = "test-utils"))]
82pub mod test_utils;
83
84use crate::table::PongTable;
85use reth_net_nat::ResolveNatInterval;
86/// reexport to get public ip.
87pub use reth_net_nat::{external_ip, NatResolver};
88
89/// The default address for discv4 via UDP
90///
91/// Note: the default TCP address is the same.
92pub const DEFAULT_DISCOVERY_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
93
94/// The default port for discv4 via UDP
95///
96/// Note: the default TCP port is the same.
97pub const DEFAULT_DISCOVERY_PORT: u16 = 30303;
98
99/// The default address for discv4 via UDP: "0.0.0.0:30303"
100///
101/// Note: The default TCP address is the same.
102pub const DEFAULT_DISCOVERY_ADDRESS: SocketAddr =
103    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT));
104
105/// The maximum size of any packet is 1280 bytes.
106const MAX_PACKET_SIZE: usize = 1280;
107
108/// Length of the UDP datagram packet-header: Hash(32b) + Signature(65b) + Packet Type(1b)
109const MIN_PACKET_SIZE: usize = 32 + 65 + 1;
110
111/// Concurrency factor for `FindNode` requests to pick `ALPHA` closest nodes, <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>
112const ALPHA: usize = 3;
113
114/// Maximum number of nodes to ping at concurrently.
115///
116/// This corresponds to 2 full `Neighbours` responses with 16 _new_ nodes. This will apply some
117/// backpressure in recursive lookups.
118const MAX_NODES_PING: usize = 2 * MAX_NODES_PER_BUCKET;
119
120/// Maximum number of pings to keep queued.
121///
122/// If we are currently sending too many pings, any new pings will be queued. To prevent unbounded
123/// growth of the queue, the queue has a maximum capacity, after which any additional pings will be
124/// discarded.
125///
126/// This corresponds to 2 full `Neighbours` responses with 16 new nodes.
127const MAX_QUEUED_PINGS: usize = 2 * MAX_NODES_PER_BUCKET;
128
129/// The size of the datagram is limited [`MAX_PACKET_SIZE`], 16 nodes, as the discv4 specifies don't
130/// fit in one datagram. The safe number of nodes that always fit in a datagram is 12, with worst
131/// case all of them being IPv6 nodes. This is calculated by `(MAX_PACKET_SIZE - (header + expire +
132/// rlp overhead) / size(rlp(Node_IPv6))`
133/// Even in the best case where all nodes are IPv4, only 14 nodes fit into one packet.
134const SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS: usize = (MAX_PACKET_SIZE - 109) / 91;
135
136/// The timeout used to identify expired nodes, 24h
137///
138/// Mirrors geth's `bondExpiration` of 24h
139const ENDPOINT_PROOF_EXPIRATION: Duration = Duration::from_secs(24 * 60 * 60);
140
141/// Duration used to expire nodes from the routing table 1hr
142const EXPIRE_DURATION: Duration = Duration::from_secs(60 * 60);
143
144// Restricts how many udp messages can be processed in a single [Discv4Service::poll] call.
145//
146// This will act as a manual yield point when draining the socket messages where the most CPU
147// expensive part is handling outgoing messages: encoding and hashing the packet
148const UDP_MESSAGE_POLL_LOOP_BUDGET: i32 = 4;
149
150type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>;
151type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>;
152
153pub(crate) type IngressSender = mpsc::Sender<IngressEvent>;
154pub(crate) type IngressReceiver = mpsc::Receiver<IngressEvent>;
155
156type NodeRecordSender = OneshotSender<Vec<NodeRecord>>;
157
158/// The Discv4 frontend.
159///
160/// This is a cloneable type that communicates with the [`Discv4Service`] by sending commands over a
161/// shared channel.
162///
163/// See also [`Discv4::spawn`]
164#[derive(Debug, Clone)]
165pub struct Discv4 {
166    /// The address of the udp socket
167    local_addr: SocketAddr,
168    /// channel to send commands over to the service
169    to_service: mpsc::UnboundedSender<Discv4Command>,
170    /// Tracks the local node record.
171    ///
172    /// This includes the currently tracked external IP address of the node.
173    node_record: Arc<Mutex<NodeRecord>>,
174}
175
176impl Discv4 {
177    /// Same as [`Self::bind`] but also spawns the service onto a new task.
178    ///
179    /// See also: [`Discv4Service::spawn()`]
180    pub async fn spawn(
181        local_address: SocketAddr,
182        local_enr: NodeRecord,
183        secret_key: SecretKey,
184        config: Discv4Config,
185    ) -> io::Result<Self> {
186        let (discv4, service) = Self::bind(local_address, local_enr, secret_key, config).await?;
187
188        service.spawn();
189
190        Ok(discv4)
191    }
192
193    /// Returns a new instance with the given channel directly
194    ///
195    /// NOTE: this is only intended for test setups.
196    #[cfg(feature = "test-utils")]
197    pub fn noop() -> Self {
198        let (to_service, _rx) = mpsc::unbounded_channel();
199        let local_addr =
200            (IpAddr::from(std::net::Ipv4Addr::UNSPECIFIED), DEFAULT_DISCOVERY_PORT).into();
201        Self {
202            local_addr,
203            to_service,
204            node_record: Arc::new(Mutex::new(NodeRecord::new(
205                "127.0.0.1:3030".parse().unwrap(),
206                PeerId::random(),
207            ))),
208        }
209    }
210
211    /// Binds a new `UdpSocket` and creates the service
212    ///
213    /// ```
214    /// use reth_discv4::{Discv4, Discv4Config};
215    /// use reth_network_peers::{pk2id, NodeRecord, PeerId};
216    /// use secp256k1::SECP256K1;
217    /// use std::{net::SocketAddr, str::FromStr};
218    /// # async fn t() -> std:: io::Result<()> {
219    ///
220    /// // generate a (random) keypair
221    /// let (secret_key, pk) = SECP256K1.generate_keypair(&mut rand_08::thread_rng());
222    /// let id = pk2id(&pk);
223    ///
224    /// let socket = SocketAddr::from_str("0.0.0.0:0").unwrap();
225    /// let local_enr =
226    ///     NodeRecord { address: socket.ip(), tcp_port: socket.port(), udp_port: socket.port(), id };
227    /// let config = Discv4Config::default();
228    ///
229    /// let (discv4, mut service) = Discv4::bind(socket, local_enr, secret_key, config).await.unwrap();
230    ///
231    /// // get an update strea
232    /// let updates = service.update_stream();
233    ///
234    /// let _handle = service.spawn();
235    ///
236    /// // lookup the local node in the DHT
237    /// let _discovered = discv4.lookup_self().await.unwrap();
238    ///
239    /// # Ok(())
240    /// # }
241    /// ```
242    pub async fn bind(
243        local_address: SocketAddr,
244        local_node_record: NodeRecord,
245        secret_key: SecretKey,
246        config: Discv4Config,
247    ) -> io::Result<(Self, Discv4Service)> {
248        let socket = Arc::new(UdpSocket::bind(local_address).await?);
249        trace!(target: "discv4", local_addr=?socket.local_addr(), "opened UDP socket");
250        let (tx, rx) = mpsc::channel(config.udp_ingress_message_buffer);
251
252        Self::bind_with_socket(socket, Some(tx), rx, local_node_record, secret_key, config)
253    }
254
255    /// Creates a new `Discv4` instance using a pre-bound shared socket. No receive loop is
256    /// spawned; instead returns an [`IngressHandler`] that should be used to forward raw packets
257    /// received by the socket owner (e.g. discv5 unrecognized frames).
258    pub fn bind_shared(
259        socket: Arc<UdpSocket>,
260        local_node_record: NodeRecord,
261        secret_key: SecretKey,
262        config: Discv4Config,
263    ) -> io::Result<(Self, Discv4Service, IngressHandler)> {
264        let (tx, rx) = mpsc::channel(config.udp_ingress_message_buffer);
265        let local_id = local_node_record.id;
266        let (discv4, service) =
267            Self::bind_with_socket(socket, None, rx, local_node_record, secret_key, config)?;
268
269        let handler = IngressHandler::new(tx, local_id);
270
271        Ok((discv4, service, handler))
272    }
273
274    fn bind_with_socket(
275        socket: Arc<UdpSocket>,
276        ingress_tx: Option<IngressSender>,
277        ingress_rx: IngressReceiver,
278        mut local_node_record: NodeRecord,
279        secret_key: SecretKey,
280        config: Discv4Config,
281    ) -> io::Result<(Self, Discv4Service)> {
282        let local_addr = socket.local_addr()?;
283        local_node_record.udp_port = local_addr.port();
284
285        let mut service = Discv4Service::new(
286            socket,
287            ingress_tx,
288            ingress_rx,
289            local_addr,
290            local_node_record,
291            secret_key,
292            config,
293        );
294
295        // resolve the external address immediately
296        service.resolve_external_ip();
297
298        let discv4 = service.handle();
299        Ok((discv4, service))
300    }
301
302    /// Returns the address of the UDP socket.
303    pub const fn local_addr(&self) -> SocketAddr {
304        self.local_addr
305    }
306
307    /// Returns the [`NodeRecord`] of the local node.
308    ///
309    /// This includes the currently tracked external IP address of the node.
310    pub fn node_record(&self) -> NodeRecord {
311        *self.node_record.lock()
312    }
313
314    /// Returns the currently tracked external IP of the node.
315    pub fn external_ip(&self) -> IpAddr {
316        self.node_record.lock().address
317    }
318
319    /// Sets the [Interval] used for periodically looking up targets over the network
320    pub fn set_lookup_interval(&self, duration: Duration) {
321        self.send_to_service(Discv4Command::SetLookupInterval(duration))
322    }
323
324    /// Starts a `FindNode` recursive lookup that locates the closest nodes to the given node id. See also: <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>
325    ///
326    /// The lookup initiator starts by picking α closest nodes to the target it knows of. The
327    /// initiator then sends concurrent `FindNode` packets to those nodes. α is a system-wide
328    /// concurrency parameter, such as 3. In the recursive step, the initiator resends `FindNode` to
329    /// nodes it has learned about from previous queries. Of the k nodes the initiator has heard of
330    /// closest to the target, it picks α that it has not yet queried and resends `FindNode` to
331    /// them. Nodes that fail to respond quickly are removed from consideration until and unless
332    /// they do respond.
333    //
334    // If a round of FindNode queries fails to return a node any closer than the closest already
335    // seen, the initiator resends the find node to all of the k closest nodes it has not already
336    // queried. The lookup terminates when the initiator has queried and gotten responses from the k
337    // closest nodes it has seen.
338    pub async fn lookup_self(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
339        self.lookup_node(None).await
340    }
341
342    /// Looks up the given node id.
343    ///
344    /// Returning the closest nodes to the given node id.
345    pub async fn lookup(&self, node_id: PeerId) -> Result<Vec<NodeRecord>, Discv4Error> {
346        self.lookup_node(Some(node_id)).await
347    }
348
349    /// Performs a random lookup for node records.
350    pub async fn lookup_random(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
351        let target = PeerId::random();
352        self.lookup_node(Some(target)).await
353    }
354
355    /// Sends a message to the service to lookup the closest nodes
356    pub fn send_lookup(&self, node_id: PeerId) {
357        let cmd = Discv4Command::Lookup { node_id: Some(node_id), tx: None };
358        self.send_to_service(cmd);
359    }
360
361    async fn lookup_node(&self, node_id: Option<PeerId>) -> Result<Vec<NodeRecord>, Discv4Error> {
362        let (tx, rx) = oneshot::channel();
363        let cmd = Discv4Command::Lookup { node_id, tx: Some(tx) };
364        self.to_service.send(cmd)?;
365        Ok(rx.await?)
366    }
367
368    /// Triggers a new self lookup without expecting a response
369    pub fn send_lookup_self(&self) {
370        let cmd = Discv4Command::Lookup { node_id: None, tx: None };
371        self.send_to_service(cmd);
372    }
373
374    /// Removes the peer from the table, if it exists.
375    pub fn remove_peer(&self, node_id: PeerId) {
376        let cmd = Discv4Command::Remove(node_id);
377        self.send_to_service(cmd);
378    }
379
380    /// Adds the node to the table, if it is not already present.
381    pub fn add_node(&self, node_record: NodeRecord) {
382        let cmd = Discv4Command::Add(node_record);
383        self.send_to_service(cmd);
384    }
385
386    /// Adds the node as a bootnode.
387    ///
388    /// This registers the node in the configured bootstrap set and inserts it into the routing
389    /// table, pinging it to establish the endpoint proof, same as the nodes provided at startup.
390    pub fn add_boot_node(&self, node_record: NodeRecord) {
391        let cmd = Discv4Command::AddBootNode(node_record);
392        self.send_to_service(cmd);
393    }
394
395    /// Adds the peer and id to the ban list.
396    ///
397    /// This will prevent any future inclusion in the table
398    pub fn ban(&self, node_id: PeerId, ip: IpAddr) {
399        let cmd = Discv4Command::Ban(node_id, ip);
400        self.send_to_service(cmd);
401    }
402
403    /// Adds the ip to the ban list.
404    ///
405    /// This will prevent any future inclusion in the table
406    pub fn ban_ip(&self, ip: IpAddr) {
407        let cmd = Discv4Command::BanIp(ip);
408        self.send_to_service(cmd);
409    }
410
411    /// Adds the peer to the ban list.
412    ///
413    /// This will prevent any future inclusion in the table
414    pub fn ban_node(&self, node_id: PeerId) {
415        let cmd = Discv4Command::BanPeer(node_id);
416        self.send_to_service(cmd);
417    }
418
419    /// Sets the tcp port
420    ///
421    /// This will update our [`NodeRecord`]'s tcp port.
422    pub fn set_tcp_port(&self, port: u16) {
423        let cmd = Discv4Command::SetTcpPort(port);
424        self.send_to_service(cmd);
425    }
426
427    /// Sets the pair in the EIP-868 [`Enr`] of the node.
428    ///
429    /// If the key already exists, this will update it.
430    ///
431    /// CAUTION: The value **must** be rlp encoded
432    pub fn set_eip868_rlp_pair(&self, key: Vec<u8>, rlp: Bytes) {
433        let cmd = Discv4Command::SetEIP868RLPPair { key, rlp };
434        self.send_to_service(cmd);
435    }
436
437    /// Sets the pair in the EIP-868 [`Enr`] of the node.
438    ///
439    /// If the key already exists, this will update it.
440    pub fn set_eip868_rlp(&self, key: Vec<u8>, value: impl alloy_rlp::Encodable) {
441        self.set_eip868_rlp_pair(key, Bytes::from(alloy_rlp::encode(&value)))
442    }
443
444    #[inline]
445    fn send_to_service(&self, cmd: Discv4Command) {
446        let _ = self.to_service.send(cmd).map_err(|err| {
447            debug!(
448                target: "discv4",
449                %err,
450                "channel capacity reached, dropping command",
451            )
452        });
453    }
454
455    /// Returns the receiver half of new listener channel that streams [`DiscoveryUpdate`]s.
456    pub async fn update_stream(&self) -> Result<ReceiverStream<DiscoveryUpdate>, Discv4Error> {
457        let (tx, rx) = oneshot::channel();
458        let cmd = Discv4Command::Updates(tx);
459        self.to_service.send(cmd)?;
460        Ok(rx.await?)
461    }
462
463    /// Terminates the spawned [`Discv4Service`].
464    pub fn terminate(&self) {
465        self.send_to_service(Discv4Command::Terminated);
466    }
467}
468
469/// Manages discv4 peer discovery over UDP.
470///
471/// This is a [Stream] to handles incoming and outgoing discv4 messages and emits updates via:
472/// [`Discv4Service::update_stream`].
473///
474/// This type maintains the discv Kademlia routing table and is responsible for performing lookups.
475///
476/// ## Lookups
477///
478/// See also [Recursive Lookups](https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup).
479/// Lookups are either triggered periodically or performaned on demand: [`Discv4::lookup`]
480/// Newly discovered nodes are emitted as [`DiscoveryUpdate::Added`] event to all subscribers:
481/// [`Discv4Service::update_stream`].
482#[must_use = "Stream does nothing unless polled"]
483pub struct Discv4Service {
484    /// Local address of the UDP socket.
485    local_address: SocketAddr,
486    /// The local ENR for EIP-868 <https://eips.ethereum.org/EIPS/eip-868>
487    local_eip_868_enr: Enr<SecretKey>,
488    /// Local ENR of the server.
489    local_node_record: NodeRecord,
490    /// Keeps track of the node record of the local node.
491    shared_node_record: Arc<Mutex<NodeRecord>>,
492    /// The secret key used to sign payloads
493    secret_key: SecretKey,
494    /// The UDP socket for sending and receiving messages.
495    _socket: Arc<UdpSocket>,
496    /// The spawned UDP tasks.
497    ///
498    /// Note: If dropped, the spawned send+receive tasks are aborted.
499    _tasks: JoinSet<()>,
500    /// The routing table.
501    kbuckets: KBucketsTable<NodeKey, NodeEntry>,
502    /// Receiver for incoming messages
503    ///
504    /// Receives incoming messages from the UDP task.
505    ingress: IngressReceiver,
506    /// Sender for sending outgoing messages
507    ///
508    /// Sends outgoing messages to the UDP task.
509    egress: EgressSender,
510    /// Buffered pending pings to apply backpressure.
511    ///
512    /// Lookups behave like bursts of requests: Endpoint proof followed by `FindNode` request. [Recursive lookups](https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup) can trigger multiple followup Pings+FindNode requests.
513    /// A cap on concurrent `Ping` prevents escalation where: A large number of new nodes
514    /// discovered via `FindNode` in a recursive lookup triggers a large number of `Ping`s, and
515    /// followup `FindNode` requests.... Buffering them effectively prevents high `Ping` peaks.
516    queued_pings: VecDeque<(NodeRecord, PingReason)>,
517    /// Currently active pings to specific nodes.
518    pending_pings: HashMap<PeerId, PingRequest>,
519    /// Currently active endpoint proof verification lookups to specific nodes.
520    ///
521    /// Entries here means we've proven the peer's endpoint but haven't completed our end of the
522    /// endpoint proof
523    pending_lookup: HashMap<PeerId, (Instant, LookupContext)>,
524    /// Currently active `FindNode` requests
525    pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
526    /// Currently active ENR requests
527    pending_enr_requests: HashMap<PeerId, EnrRequestState>,
528    /// Copy of the sender half of the commands channel for [Discv4]
529    to_service: mpsc::UnboundedSender<Discv4Command>,
530    /// Receiver half of the commands channel for [Discv4]
531    commands_rx: mpsc::UnboundedReceiver<Discv4Command>,
532    /// All subscribers for table updates
533    update_listeners: Vec<mpsc::Sender<DiscoveryUpdate>>,
534    /// The interval when to trigger random lookups
535    lookup_interval: Interval,
536    /// Used to rotate targets to lookup
537    lookup_rotator: LookupTargetRotator,
538    /// Whether we still need to reset the lookup interval on the first bootnode pong.
539    pending_lookup_reset: bool,
540    /// Interval when to recheck active requests
541    evict_expired_requests_interval: Interval,
542    /// Interval when to resend pings.
543    ping_interval: Interval,
544    /// The interval at which to attempt resolving external IP again.
545    resolve_external_ip_interval: Option<ResolveNatInterval>,
546    /// How this services is configured
547    config: Discv4Config,
548    /// Buffered events populated during poll.
549    queued_events: VecDeque<Discv4Event>,
550    /// Keeps track of nodes from which we have received a `Pong` message.
551    received_pongs: PongTable,
552    /// Interval used to expire additionally tracked nodes
553    expire_interval: Interval,
554    /// Cached signed `FindNode` packet to avoid redundant ECDSA signing during lookups.
555    cached_find_node: Option<CachedFindNode>,
556}
557
558impl Discv4Service {
559    /// Create a new instance for a bound [`UdpSocket`].
560    ///
561    /// If `ingress_tx` is `Some`, the receive loop is spawned to read from the socket. If `None`,
562    /// the caller feeds packets into `ingress_rx` externally (shared socket mode).
563    pub(crate) fn new(
564        socket: Arc<UdpSocket>,
565        ingress_tx: Option<IngressSender>,
566        ingress_rx: IngressReceiver,
567        local_address: SocketAddr,
568        local_node_record: NodeRecord,
569        secret_key: SecretKey,
570        config: Discv4Config,
571    ) -> Self {
572        let (egress_tx, egress_rx) = mpsc::channel(config.udp_egress_message_buffer);
573        let mut tasks = JoinSet::<()>::new();
574
575        if let Some(ingress_tx) = ingress_tx {
576            let udp = Arc::clone(&socket);
577            tasks.spawn(receive_loop(udp, ingress_tx, local_node_record.id));
578        }
579
580        let udp = Arc::clone(&socket);
581        tasks.spawn(send_loop(udp, egress_rx));
582
583        let kbuckets = KBucketsTable::new(
584            NodeKey::from(&local_node_record).into(),
585            Duration::from_secs(60),
586            MAX_NODES_PER_BUCKET,
587            None,
588            None,
589        );
590
591        let self_lookup_interval = tokio::time::interval(config.lookup_interval);
592
593        // Wait `ping_interval` and then start pinging every `ping_interval` because we want to wait
594        // for
595        let ping_interval = tokio::time::interval_at(
596            tokio::time::Instant::now() + config.ping_interval,
597            config.ping_interval,
598        );
599
600        let evict_expired_requests_interval = tokio::time::interval_at(
601            tokio::time::Instant::now() + config.request_timeout,
602            config.request_timeout,
603        );
604
605        let lookup_rotator = if config.enable_dht_random_walk {
606            LookupTargetRotator::default()
607        } else {
608            LookupTargetRotator::local_only()
609        };
610
611        // for EIP-868 construct an ENR
612        let local_eip_868_enr = {
613            let mut builder = Enr::builder();
614            builder.ip(local_node_record.address);
615            if local_node_record.address.is_ipv4() {
616                builder.udp4(local_node_record.udp_port);
617                builder.tcp4(local_node_record.tcp_port);
618            } else {
619                builder.udp6(local_node_record.udp_port);
620                builder.tcp6(local_node_record.tcp_port);
621            }
622
623            for (key, val) in &config.additional_eip868_rlp_pairs {
624                builder.add_value_rlp(key, val.clone());
625            }
626            builder.build(&secret_key).expect("v4 is set")
627        };
628
629        let (to_service, commands_rx) = mpsc::unbounded_channel();
630
631        let shared_node_record = Arc::new(Mutex::new(local_node_record));
632
633        Self {
634            local_address,
635            local_eip_868_enr,
636            local_node_record,
637            shared_node_record,
638            _socket: socket,
639            kbuckets,
640            secret_key,
641            _tasks: tasks,
642            ingress: ingress_rx,
643            egress: egress_tx,
644            queued_pings: VecDeque::with_capacity(MAX_QUEUED_PINGS),
645            pending_pings: Default::default(),
646            pending_lookup: Default::default(),
647            pending_find_nodes: Default::default(),
648            pending_enr_requests: Default::default(),
649            commands_rx,
650            to_service,
651            update_listeners: Vec::with_capacity(1),
652            lookup_interval: self_lookup_interval,
653            ping_interval,
654            evict_expired_requests_interval,
655            lookup_rotator,
656            pending_lookup_reset: config.enable_lookup,
657            resolve_external_ip_interval: config.resolve_external_ip_interval(),
658            config,
659            queued_events: Default::default(),
660            received_pongs: Default::default(),
661            expire_interval: tokio::time::interval(EXPIRE_DURATION),
662            cached_find_node: None,
663        }
664    }
665
666    /// Returns the frontend handle that can communicate with the service via commands.
667    pub fn handle(&self) -> Discv4 {
668        Discv4 {
669            local_addr: self.local_address,
670            to_service: self.to_service.clone(),
671            node_record: self.shared_node_record.clone(),
672        }
673    }
674
675    /// Returns the current enr sequence of the local record.
676    fn enr_seq(&self) -> Option<u64> {
677        self.config.enable_eip868.then(|| self.local_eip_868_enr.seq())
678    }
679
680    /// Sets the [Interval] used for periodically looking up targets over the network
681    pub fn set_lookup_interval(&mut self, duration: Duration) {
682        self.lookup_interval = tokio::time::interval(duration);
683    }
684
685    /// Sets the external Ip to the configured external IP if [`NatResolver::ExternalIp`] or
686    /// [`NatResolver::ExternalAddr`]. In the case of [`NatResolver::ExternalAddr`], it will return
687    /// the first IP address found for the domain associated with the discv4 UDP port.
688    fn resolve_external_ip(&mut self) {
689        if let Some(r) = &self.resolve_external_ip_interval &&
690            let Some(external_ip) =
691                r.resolver().clone().as_external_ip(self.local_node_record.udp_port)
692        {
693            self.set_external_ip_addr(external_ip);
694        }
695    }
696
697    /// Sets the given ip address as the node's external IP in the node record announced in
698    /// discovery
699    pub fn set_external_ip_addr(&mut self, external_ip: IpAddr) {
700        if self.local_node_record.address != external_ip {
701            debug!(target: "discv4", ?external_ip, "Updating external ip");
702            self.local_node_record.address = external_ip;
703            let _ = self.local_eip_868_enr.set_ip(external_ip, &self.secret_key);
704            let mut lock = self.shared_node_record.lock();
705            *lock = self.local_node_record;
706            debug!(target: "discv4", enr=?self.local_eip_868_enr, "Updated local ENR");
707        }
708    }
709
710    /// Returns the [`PeerId`] that identifies this node
711    pub const fn local_peer_id(&self) -> &PeerId {
712        &self.local_node_record.id
713    }
714
715    /// Returns the address of the UDP socket
716    pub const fn local_addr(&self) -> SocketAddr {
717        self.local_address
718    }
719
720    /// Returns the ENR of this service.
721    ///
722    /// Note: this will include the external address if resolved.
723    pub const fn local_enr(&self) -> NodeRecord {
724        self.local_node_record
725    }
726
727    /// Returns mutable reference to ENR for testing.
728    #[cfg(test)]
729    pub const fn local_enr_mut(&mut self) -> &mut NodeRecord {
730        &mut self.local_node_record
731    }
732
733    /// Returns true if the given `PeerId` is currently in the bucket
734    pub fn contains_node(&self, id: PeerId) -> bool {
735        let key = kad_key(id);
736        self.kbuckets.get_index(&key).is_some()
737    }
738
739    /// Bootstraps the local node to join the DHT.
740    ///
741    /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
742    /// own ID in the DHT. This introduces the local node to the other nodes
743    /// in the DHT and populates its routing table with the closest proven neighbours.
744    ///
745    /// This inserts the configured bootnodes into the routing table and pings them. Once the
746    /// endpoint proof succeeds (pong received), a [`DiscoveryUpdate::Added`] event is emitted,
747    /// same as with [`Self::add_node`].
748    ///
749    /// **Note:** This is a noop if there are no bootnodes.
750    pub fn bootstrap(&mut self) {
751        for record in self.config.bootstrap_nodes.clone() {
752            debug!(target: "discv4", ?record, "pinging boot node");
753            let key = kad_key(record.id);
754            let entry = NodeEntry::new(record);
755
756            // insert the boot node in the table
757            match self.kbuckets.insert_or_update(
758                &key,
759                entry,
760                NodeStatus {
761                    state: ConnectionState::Disconnected,
762                    direction: ConnectionDirection::Outgoing,
763                },
764            ) {
765                InsertResult::Failed(_) => {}
766                _ => {
767                    self.try_ping(record, PingReason::InitialInsert);
768                }
769            }
770        }
771    }
772
773    /// Adds the node to the bootstrap set and to the routing table.
774    ///
775    /// Behaves like [`Self::add_node`] but also registers the node in the configured bootstrap
776    /// set so it is used for subsequent bootstrap attempts.
777    pub fn add_boot_node(&mut self, record: NodeRecord) -> bool {
778        self.config.bootstrap_nodes.insert(record);
779        self.add_node(record)
780    }
781
782    /// Spawns this services onto a new task
783    ///
784    /// Note: requires a running tokio runtime
785    pub fn spawn(mut self) -> JoinHandle<()> {
786        tokio::task::spawn(async move {
787            self.bootstrap();
788
789            while let Some(event) = self.next().await {
790                trace!(target: "discv4", ?event, "processed");
791            }
792            trace!(target: "discv4", "service terminated");
793        })
794    }
795
796    /// Creates a new bounded channel for [`DiscoveryUpdate`]s.
797    pub fn update_stream(&mut self) -> ReceiverStream<DiscoveryUpdate> {
798        let (tx, rx) = mpsc::channel(512);
799        self.update_listeners.push(tx);
800        ReceiverStream::new(rx)
801    }
802
803    /// Looks up the local node in the DHT.
804    pub fn lookup_self(&mut self) {
805        self.lookup(self.local_node_record.id)
806    }
807
808    /// Looks up the given node in the DHT
809    ///
810    /// A `FindNode` packet requests information about nodes close to target. The target is a
811    /// 64-byte secp256k1 public key. When `FindNode` is received, the recipient should reply
812    /// with Neighbors packets containing the closest 16 nodes to target found in its local
813    /// table.
814    //
815    // To guard against traffic amplification attacks, Neighbors replies should only be sent if the
816    // sender of FindNode has been verified by the endpoint proof procedure.
817    pub fn lookup(&mut self, target: PeerId) {
818        self.lookup_with(target, None)
819    }
820
821    /// Starts the recursive lookup process for the given target, <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>.
822    ///
823    /// At first the `ALPHA` (==3, defined concurrency factor) nodes that are closest to the target
824    /// in the underlying DHT are selected to seed the lookup via `FindNode` requests. In the
825    /// recursive step, the initiator resends `FindNode` to nodes it has learned about from previous
826    /// queries.
827    ///
828    /// This takes an optional Sender through which all successfully discovered nodes are sent once
829    /// the request has finished.
830    fn lookup_with(&mut self, target: PeerId, tx: Option<NodeRecordSender>) {
831        trace!(target: "discv4", ?target, "Starting lookup");
832        let target_key = kad_key(target);
833
834        // Start a lookup context with the 16 (MAX_NODES_PER_BUCKET) closest nodes to which we have
835        // a valid endpoint proof
836        let ctx = LookupContext::new(
837            target_key.clone(),
838            self.kbuckets
839                .closest_values(&target_key)
840                .filter(|node| {
841                    node.value.has_endpoint_proof &&
842                        !self.pending_find_nodes.contains_key(&node.key.preimage().0)
843                })
844                .take(MAX_NODES_PER_BUCKET)
845                .map(|n| (target_key.distance(&n.key), n.value.record)),
846            tx,
847        );
848
849        // From those 16, pick the 3 closest to start the concurrent lookup.
850        let closest = ctx.closest(ALPHA);
851
852        if closest.is_empty() && self.pending_find_nodes.is_empty() {
853            // no closest nodes, and no lookup in progress: table is empty.
854            // This could happen if all records were deleted from the table due to missed pongs
855            // (e.g. connectivity problems over a long period of time, or issues during initial
856            // bootstrapping) so we attempt to bootstrap again
857            self.bootstrap();
858            return
859        }
860
861        trace!(target: "discv4", ?target, num = closest.len(), "Start lookup closest nodes");
862
863        for node in closest {
864            // here we still want to check against previous request failures and if necessary
865            // re-establish a new endpoint proof because it can be the case that the other node lost
866            // our entry and no longer has an endpoint proof on their end
867            self.find_node_checked(&node, ctx.clone());
868        }
869    }
870
871    /// Sends a new `FindNode` packet to the node with `target` as the lookup target.
872    ///
873    /// CAUTION: This expects there's a valid Endpoint proof to the given `node`.
874    fn find_node(&mut self, node: &NodeRecord, ctx: LookupContext) {
875        trace!(target: "discv4", ?node, lookup=?ctx.target(), "Sending FindNode");
876        ctx.mark_queried(node.id);
877        let (payload, hash) = self.find_node_packet(ctx.target());
878        let to = node.udp_addr();
879        trace!(target: "discv4", ?to, ?hash, "sending FindNode packet");
880        let _ = self.egress.try_send((payload, to)).map_err(|err| {
881            debug!(target: "discv4", %err, "dropped outgoing packet");
882        });
883        self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx));
884    }
885
886    /// Sends a new `FindNode` packet to the node with `target` as the lookup target but checks
887    /// whether we should send a new ping first to renew the endpoint proof by checking the
888    /// previously failed findNode requests. It could be that the node is no longer reachable or
889    /// lost our entry.
890    fn find_node_checked(&mut self, node: &NodeRecord, ctx: LookupContext) {
891        let max_failures = self.config.max_find_node_failures;
892        let needs_ping = self
893            .on_entry(node.id, |entry| entry.exceeds_find_node_failures(max_failures))
894            .unwrap_or(true);
895        if needs_ping {
896            self.try_ping(*node, PingReason::Lookup(*node, ctx))
897        } else {
898            self.find_node(node, ctx)
899        }
900    }
901
902    /// Notifies all listeners.
903    ///
904    /// Removes all listeners that are closed.
905    fn notify(&mut self, update: DiscoveryUpdate) {
906        self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) {
907            Ok(()) => true,
908            Err(err) => match err {
909                TrySendError::Full(_) => true,
910                TrySendError::Closed(_) => false,
911            },
912        });
913    }
914
915    /// Adds the ip to the ban list indefinitely
916    pub fn ban_ip(&mut self, ip: IpAddr) {
917        self.config.ban_list.ban_ip(ip);
918    }
919
920    /// Adds the peer to the ban list indefinitely.
921    pub fn ban_node(&mut self, node_id: PeerId) {
922        self.remove_node(node_id);
923        self.config.ban_list.ban_peer(node_id);
924    }
925
926    /// Adds the ip to the ban list until the given timestamp.
927    pub fn ban_ip_until(&mut self, ip: IpAddr, until: Instant) {
928        self.config.ban_list.ban_ip_until(ip, until);
929    }
930
931    /// Adds the peer to the ban list and bans it until the given timestamp
932    pub fn ban_node_until(&mut self, node_id: PeerId, until: Instant) {
933        self.remove_node(node_id);
934        self.config.ban_list.ban_peer_until(node_id, until);
935    }
936
937    /// Removes a `node_id` from the routing table.
938    ///
939    /// This allows applications, for whatever reason, to remove nodes from the local routing
940    /// table. Returns `true` if the node was in the table and `false` otherwise.
941    pub fn remove_node(&mut self, node_id: PeerId) -> bool {
942        let key = kad_key(node_id);
943        self.remove_key(node_id, key)
944    }
945
946    /// Removes a `node_id` from the routing table but only if there are enough other nodes in the
947    /// bucket (bucket must be at least half full)
948    ///
949    /// Returns `true` if the node was removed
950    pub fn soft_remove_node(&mut self, node_id: PeerId) -> bool {
951        let key = kad_key(node_id);
952        let Some(bucket) = self.kbuckets.get_bucket(&key) else { return false };
953        if bucket.num_entries() < MAX_NODES_PER_BUCKET / 2 {
954            // skip half empty bucket
955            return false
956        }
957        self.remove_key(node_id, key)
958    }
959
960    fn remove_key(&mut self, node_id: PeerId, key: discv5::Key<NodeKey>) -> bool {
961        let removed = self.kbuckets.remove(&key);
962        if removed {
963            trace!(target: "discv4", ?node_id, "removed node");
964            self.notify(DiscoveryUpdate::Removed(node_id));
965        }
966        removed
967    }
968
969    /// Gets the number of entries that are considered connected.
970    pub fn num_connected(&self) -> usize {
971        self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected())
972    }
973
974    /// Check if the peer has an active bond.
975    fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool {
976        if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) &&
977            timestamp.elapsed() < self.config.bond_expiration
978        {
979            return true
980        }
981        false
982    }
983
984    /// Applies a closure on the pending or present [`NodeEntry`].
985    fn on_entry<F, R>(&mut self, peer_id: PeerId, f: F) -> Option<R>
986    where
987        F: FnOnce(&NodeEntry) -> R,
988    {
989        let key = kad_key(peer_id);
990        match self.kbuckets.entry(&key) {
991            BucketEntry::Present(entry, _) => Some(f(entry.value())),
992            BucketEntry::Pending(entry, _) => Some(f(entry.value())),
993            _ => None,
994        }
995    }
996
997    /// Update the entry on RE-ping.
998    ///
999    /// Invoked when we received the Pong to our [`PingReason::RePing`] ping.
1000    ///
1001    /// On re-ping we check for a changed `enr_seq` if eip868 is enabled and when it changed we sent
1002    /// a followup request to retrieve the updated ENR
1003    fn update_on_reping(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
1004        if record.id == self.local_node_record.id {
1005            return
1006        }
1007
1008        // If EIP868 extension is disabled then we want to ignore this
1009        if !self.config.enable_eip868 {
1010            last_enr_seq = None;
1011        }
1012
1013        let key = kad_key(record.id);
1014        let old_enr = match self.kbuckets.entry(&key) {
1015            kbucket::Entry::Present(mut entry, _) => {
1016                entry.value_mut().update_with_enr(last_enr_seq)
1017            }
1018            kbucket::Entry::Pending(mut entry, _) => {
1019                entry.value_mut().update_with_enr(last_enr_seq)
1020            }
1021            _ => return,
1022        };
1023
1024        // Check if ENR was updated
1025        match (last_enr_seq, old_enr) {
1026            (Some(new), Some(old)) if new > old => {
1027                self.send_enr_request(record);
1028            }
1029            (Some(_), None) => {
1030                // got an ENR
1031                self.send_enr_request(record);
1032            }
1033            _ => {}
1034        };
1035    }
1036
1037    /// Callback invoked when we receive a pong from the peer.
1038    fn update_on_pong(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
1039        if record.id == *self.local_peer_id() {
1040            return
1041        }
1042
1043        // If EIP868 extension is disabled then we want to ignore this
1044        if !self.config.enable_eip868 {
1045            last_enr_seq = None;
1046        }
1047
1048        // if the peer included a enr seq in the pong then we can try to request the ENR of that
1049        // node
1050        let has_enr_seq = last_enr_seq.is_some();
1051
1052        let key = kad_key(record.id);
1053        match self.kbuckets.entry(&key) {
1054            kbucket::Entry::Present(mut entry, old_status) => {
1055                // endpoint is now proven
1056                entry.value_mut().establish_proof();
1057                entry.value_mut().update_with_enr(last_enr_seq);
1058
1059                if !old_status.is_connected() {
1060                    let _ = entry.update(ConnectionState::Connected, Some(old_status.direction));
1061                    trace!(target: "discv4", ?record, "added after successful endpoint proof");
1062                    self.notify(DiscoveryUpdate::Added(record));
1063
1064                    if has_enr_seq {
1065                        // request the ENR of the node
1066                        self.send_enr_request(record);
1067                    }
1068                }
1069            }
1070            kbucket::Entry::Pending(mut entry, mut status) => {
1071                // endpoint is now proven
1072                entry.value_mut().establish_proof();
1073                entry.value_mut().update_with_enr(last_enr_seq);
1074
1075                if !status.is_connected() {
1076                    status.state = ConnectionState::Connected;
1077                    let _ = entry.update(status);
1078                    trace!(target: "discv4", ?record, "added after successful endpoint proof");
1079                    self.notify(DiscoveryUpdate::Added(record));
1080
1081                    if has_enr_seq {
1082                        // request the ENR of the node
1083                        self.send_enr_request(record);
1084                    }
1085                }
1086            }
1087            _ => {}
1088        };
1089    }
1090
1091    /// Adds all nodes
1092    ///
1093    /// See [`Self::add_node`]
1094    pub fn add_all_nodes(&mut self, records: impl IntoIterator<Item = NodeRecord>) {
1095        for record in records {
1096            self.add_node(record);
1097        }
1098    }
1099
1100    /// If the node's not in the table yet, this will add it to the table and start the endpoint
1101    /// proof by sending a ping to the node.
1102    ///
1103    /// Returns `true` if the record was added successfully, and `false` if the node is either
1104    /// already in the table or the record's bucket is full.
1105    pub fn add_node(&mut self, record: NodeRecord) -> bool {
1106        let key = kad_key(record.id);
1107        match self.kbuckets.entry(&key) {
1108            kbucket::Entry::Absent(entry) => {
1109                let node = NodeEntry::new(record);
1110                match entry.insert(
1111                    node,
1112                    NodeStatus {
1113                        direction: ConnectionDirection::Outgoing,
1114                        state: ConnectionState::Disconnected,
1115                    },
1116                ) {
1117                    BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1118                        trace!(target: "discv4", ?record, "inserted new record");
1119                    }
1120                    _ => return false,
1121                }
1122            }
1123            _ => return false,
1124        }
1125
1126        // send the initial ping to the _new_ node
1127        self.try_ping(record, PingReason::InitialInsert);
1128        true
1129    }
1130
1131    /// Encodes the packet, sends it and returns the hash.
1132    pub(crate) fn send_packet(&self, msg: Message, to: SocketAddr) -> B256 {
1133        let (payload, hash) = msg.encode(&self.secret_key);
1134        trace!(target: "discv4", r#type=?msg.msg_type(), ?to, ?hash, "sending packet");
1135        let _ = self.egress.try_send((payload, to)).map_err(|err| {
1136            debug!(
1137                target: "discv4",
1138                %err,
1139                "dropped outgoing packet",
1140            );
1141        });
1142        hash
1143    }
1144
1145    /// Returns a signed `FindNode` packet for `target`, reusing a cached payload when possible.
1146    fn find_node_packet(&mut self, target: PeerId) -> (Bytes, B256) {
1147        let expire = self.find_node_expiration();
1148        let cache_ttl = self.config.request_timeout / 4;
1149        CachedFindNode::get_or_sign(
1150            &mut self.cached_find_node,
1151            target,
1152            cache_ttl,
1153            &self.secret_key,
1154            expire,
1155        )
1156    }
1157
1158    /// Message handler for an incoming `Ping`
1159    fn on_ping(&mut self, ping: Ping, remote_addr: SocketAddr, remote_id: PeerId, hash: B256) {
1160        if self.is_expired(ping.expire) {
1161            // ping's expiration timestamp is in the past
1162            return
1163        }
1164
1165        // create the record
1166        let record = NodeRecord {
1167            address: remote_addr.ip(),
1168            udp_port: remote_addr.port(),
1169            tcp_port: ping.from.tcp_port,
1170            id: remote_id,
1171        }
1172        .into_ipv4_mapped();
1173
1174        let key = kad_key(record.id);
1175
1176        // See also <https://github.com/ethereum/devp2p/blob/master/discv4.md#ping-packet-0x01>:
1177        // > If no communication with the sender of this ping has occurred within the last 12h, a
1178        // > ping should be sent in addition to pong in order to receive an endpoint proof.
1179        //
1180        // Note: we only mark if the node is absent because the `last 12h` condition is handled by
1181        // the ping interval
1182        let mut is_new_insert = false;
1183        let mut needs_bond = false;
1184        let mut is_proven = false;
1185
1186        let old_enr = match self.kbuckets.entry(&key) {
1187            kbucket::Entry::Present(mut entry, _) => {
1188                if entry.value().is_expired() {
1189                    // If no communication with the sender has occurred within the last 12h, a ping
1190                    // should be sent in addition to pong in order to receive an endpoint proof.
1191                    needs_bond = true;
1192                } else {
1193                    is_proven = entry.value().has_endpoint_proof;
1194                }
1195                entry.value_mut().update_with_enr(ping.enr_sq)
1196            }
1197            kbucket::Entry::Pending(mut entry, _) => {
1198                if entry.value().is_expired() {
1199                    // If no communication with the sender has occurred within the last 12h, a ping
1200                    // should be sent in addition to pong in order to receive an endpoint proof.
1201                    needs_bond = true;
1202                } else {
1203                    is_proven = entry.value().has_endpoint_proof;
1204                }
1205                entry.value_mut().update_with_enr(ping.enr_sq)
1206            }
1207            kbucket::Entry::Absent(entry) => {
1208                let mut node = NodeEntry::new(record);
1209                node.last_enr_seq = ping.enr_sq;
1210
1211                match entry.insert(
1212                    node,
1213                    NodeStatus {
1214                        direction: ConnectionDirection::Incoming,
1215                        // mark as disconnected until endpoint proof established on pong
1216                        state: ConnectionState::Disconnected,
1217                    },
1218                ) {
1219                    BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1220                        // mark as new insert if insert was successful
1221                        is_new_insert = true;
1222                    }
1223                    BucketInsertResult::Full => {
1224                        // we received a ping but the corresponding bucket for the peer is already
1225                        // full, we can't add any additional peers to that bucket, but we still want
1226                        // to emit an event that we discovered the node
1227                        trace!(target: "discv4", ?record, "discovered new record but bucket is full");
1228                        self.notify(DiscoveryUpdate::DiscoveredAtCapacity(record));
1229                        needs_bond = true;
1230                    }
1231                    BucketInsertResult::TooManyIncoming | BucketInsertResult::NodeExists => {
1232                        needs_bond = true;
1233                        // insert unsuccessful but we still want to send the pong
1234                    }
1235                    BucketInsertResult::FailedFilter => return,
1236                }
1237
1238                None
1239            }
1240            kbucket::Entry::SelfEntry => return,
1241        };
1242
1243        // send the pong first, but the PONG and optionally PING don't need to be send in a
1244        // particular order
1245        let pong = Message::Pong(Pong {
1246            // we use the actual address of the peer
1247            to: record.into(),
1248            echo: hash,
1249            expire: ping.expire,
1250            enr_sq: self.enr_seq(),
1251        });
1252        self.send_packet(pong, remote_addr);
1253
1254        // if node was absent also send a ping to establish the endpoint proof from our end
1255        if is_new_insert {
1256            self.try_ping(record, PingReason::InitialInsert);
1257        } else if needs_bond {
1258            self.try_ping(record, PingReason::EstablishBond);
1259        } else if is_proven {
1260            // if node has been proven, this means we've received a pong and verified its endpoint
1261            // proof. We've also sent a pong above to verify our endpoint proof, so we can now
1262            // send our find_nodes request if PingReason::Lookup
1263            if let Some((_, ctx)) = self.pending_lookup.remove(&record.id) {
1264                if self.pending_find_nodes.contains_key(&record.id) {
1265                    // there's already another pending request, unmark it so the next round can
1266                    // try to send it
1267                    ctx.unmark_queried(record.id);
1268                } else {
1269                    // we just received a ping from that peer so we can send a find node request
1270                    // directly
1271                    self.find_node(&record, ctx);
1272                }
1273            }
1274        } else {
1275            // Request ENR if included in the ping
1276            match (ping.enr_sq, old_enr) {
1277                (Some(new), Some(old)) if new > old => {
1278                    self.send_enr_request(record);
1279                }
1280                (Some(_), None) => {
1281                    self.send_enr_request(record);
1282                }
1283                _ => {}
1284            };
1285        }
1286    }
1287
1288    // Guarding function for [`Self::send_ping`] that applies pre-checks
1289    fn try_ping(&mut self, node: NodeRecord, reason: PingReason) {
1290        if node.id == *self.local_peer_id() {
1291            // don't ping ourselves
1292            return
1293        }
1294
1295        if self.pending_pings.contains_key(&node.id) ||
1296            self.pending_find_nodes.contains_key(&node.id)
1297        {
1298            return
1299        }
1300
1301        if self.queued_pings.iter().any(|(n, _)| n.id == node.id) {
1302            return
1303        }
1304
1305        if self.pending_pings.len() < MAX_NODES_PING {
1306            self.send_ping(node, reason);
1307        } else if self.queued_pings.len() < MAX_QUEUED_PINGS {
1308            self.queued_pings.push_back((node, reason));
1309        }
1310    }
1311
1312    /// Sends a ping message to the node's UDP address.
1313    ///
1314    /// Returns the echo hash of the ping message.
1315    pub(crate) fn send_ping(&mut self, node: NodeRecord, reason: PingReason) -> B256 {
1316        let remote_addr = node.udp_addr();
1317        let id = node.id;
1318        let ping = Ping {
1319            from: self.local_node_record.into(),
1320            to: node.into(),
1321            expire: self.ping_expiration(),
1322            enr_sq: self.enr_seq(),
1323        };
1324        trace!(target: "discv4", ?ping, "sending ping");
1325        let echo_hash = self.send_packet(Message::Ping(ping), remote_addr);
1326
1327        self.pending_pings
1328            .insert(id, PingRequest { sent_at: Instant::now(), node, echo_hash, reason });
1329        echo_hash
1330    }
1331
1332    /// Sends an enr request message to the node's UDP address.
1333    ///
1334    /// Returns the echo hash of the ping message.
1335    pub(crate) fn send_enr_request(&mut self, node: NodeRecord) {
1336        if !self.config.enable_eip868 {
1337            return
1338        }
1339        let remote_addr = node.udp_addr();
1340        let enr_request = EnrRequest { expire: self.enr_request_expiration() };
1341
1342        trace!(target: "discv4", ?enr_request, "sending enr request");
1343        let echo_hash = self.send_packet(Message::EnrRequest(enr_request), remote_addr);
1344
1345        self.pending_enr_requests
1346            .insert(node.id, EnrRequestState { sent_at: Instant::now(), echo_hash });
1347    }
1348
1349    /// Message handler for an incoming `Pong`.
1350    fn on_pong(&mut self, pong: Pong, remote_addr: SocketAddr, remote_id: PeerId) {
1351        if self.is_expired(pong.expire) {
1352            return
1353        }
1354
1355        let PingRequest { node, reason, .. } = match self.pending_pings.entry(remote_id) {
1356            Entry::Occupied(entry) => {
1357                {
1358                    let request = entry.get();
1359                    if request.echo_hash != pong.echo {
1360                        trace!(target: "discv4", from=?remote_addr, expected=?request.echo_hash, echo_hash=?pong.echo,"Got unexpected Pong");
1361                        return
1362                    }
1363                }
1364                entry.remove()
1365            }
1366            Entry::Vacant(_) => return,
1367        };
1368
1369        // keep track of the pong
1370        self.received_pongs.on_pong(remote_id, remote_addr.ip());
1371
1372        match reason {
1373            PingReason::InitialInsert => {
1374                self.update_on_pong(node, pong.enr_sq);
1375                // Reset the lookup interval so the next poll_tick fires immediately,
1376                // rather than waiting the full ~20s for the first lookup.
1377                if self.pending_lookup_reset && self.config.bootstrap_nodes.contains(&node) {
1378                    self.pending_lookup_reset = false;
1379                    self.lookup_interval.reset();
1380                }
1381            }
1382            PingReason::EstablishBond => {
1383                // no initial lookup needed here since the node was already in the table.
1384                self.update_on_pong(node, pong.enr_sq);
1385            }
1386            PingReason::RePing => {
1387                self.update_on_reping(node, pong.enr_sq);
1388            }
1389            PingReason::Lookup(node, ctx) => {
1390                self.update_on_pong(node, pong.enr_sq);
1391                // insert node and assoc. lookup_context into the pending_lookup table to complete
1392                // our side of the endpoint proof verification.
1393                // Start the lookup timer here - and evict accordingly. Note that this is a separate
1394                // timer than the ping_request timer.
1395                self.pending_lookup.insert(node.id, (Instant::now(), ctx));
1396            }
1397        }
1398    }
1399
1400    /// Handler for an incoming `FindNode` message
1401    fn on_find_node(&mut self, msg: FindNode, remote_addr: SocketAddr, node_id: PeerId) {
1402        if self.is_expired(msg.expire) {
1403            // expiration timestamp is in the past
1404            return
1405        }
1406        if node_id == *self.local_peer_id() {
1407            // ignore find node requests to ourselves
1408            return
1409        }
1410
1411        if self.has_bond(node_id, remote_addr.ip()) {
1412            self.respond_closest(msg.id, remote_addr)
1413        }
1414    }
1415
1416    /// Handler for incoming `EnrResponse` message
1417    fn on_enr_response(&mut self, msg: EnrResponse, remote_addr: SocketAddr, id: PeerId) {
1418        trace!(target: "discv4", ?remote_addr, ?msg, "received ENR response");
1419        if let Some(resp) = self.pending_enr_requests.remove(&id) {
1420            // ensure the ENR's public key matches the expected node id
1421            let enr_id = pk2id(&msg.enr.public_key());
1422            if id != enr_id {
1423                return
1424            }
1425
1426            if resp.echo_hash == msg.request_hash {
1427                let key = kad_key(id);
1428                let fork_id = msg.eth_fork_id();
1429                let (record, old_fork_id) = match self.kbuckets.entry(&key) {
1430                    kbucket::Entry::Present(mut entry, _) => {
1431                        let id = entry.value_mut().update_with_fork_id(fork_id);
1432                        (entry.value().record, id)
1433                    }
1434                    kbucket::Entry::Pending(mut entry, _) => {
1435                        let id = entry.value_mut().update_with_fork_id(fork_id);
1436                        (entry.value().record, id)
1437                    }
1438                    _ => return,
1439                };
1440                match (fork_id, old_fork_id) {
1441                    (Some(new), Some(old)) if new != old => {
1442                        self.notify(DiscoveryUpdate::EnrForkId(record, new))
1443                    }
1444                    (Some(new), None) => self.notify(DiscoveryUpdate::EnrForkId(record, new)),
1445                    _ => {}
1446                }
1447            }
1448        }
1449    }
1450
1451    /// Handler for incoming `EnrRequest` message
1452    fn on_enr_request(
1453        &self,
1454        msg: EnrRequest,
1455        remote_addr: SocketAddr,
1456        id: PeerId,
1457        request_hash: B256,
1458    ) {
1459        if !self.config.enable_eip868 || self.is_expired(msg.expire) {
1460            return
1461        }
1462
1463        if self.has_bond(id, remote_addr.ip()) {
1464            self.send_packet(
1465                Message::EnrResponse(EnrResponse {
1466                    request_hash,
1467                    enr: self.local_eip_868_enr.clone(),
1468                }),
1469                remote_addr,
1470            );
1471        }
1472    }
1473
1474    /// Handler for incoming `Neighbours` messages that are handled if they're responses to
1475    /// `FindNode` requests.
1476    fn on_neighbours(&mut self, msg: Neighbours, remote_addr: SocketAddr, node_id: PeerId) {
1477        if self.is_expired(msg.expire) {
1478            // response is expired
1479            return
1480        }
1481        // check if this request was expected
1482        let ctx = match self.pending_find_nodes.entry(node_id) {
1483            Entry::Occupied(mut entry) => {
1484                {
1485                    let request = entry.get_mut();
1486                    // Mark the request as answered
1487                    request.answered = true;
1488                    let total = request.response_count + msg.nodes.len();
1489
1490                    // Neighbours response is exactly 1 bucket (16 entries).
1491                    if total <= MAX_NODES_PER_BUCKET {
1492                        request.response_count = total;
1493                    } else {
1494                        trace!(target: "discv4", total, from=?remote_addr, "Received neighbors packet entries exceeds max nodes per bucket");
1495                        return
1496                    }
1497                };
1498
1499                if entry.get().response_count == MAX_NODES_PER_BUCKET {
1500                    // node responding with a full bucket of records
1501                    let ctx = entry.remove().lookup_context;
1502                    ctx.mark_responded(node_id);
1503                    ctx
1504                } else {
1505                    entry.get().lookup_context.clone()
1506                }
1507            }
1508            Entry::Vacant(_) => {
1509                // received neighbours response without requesting it
1510                trace!(target: "discv4", from=?remote_addr, "Received unsolicited Neighbours");
1511                return
1512            }
1513        };
1514
1515        // log the peers we discovered
1516        trace!(target: "discv4",
1517            target=format!("{:#?}", node_id),
1518            peers_count=msg.nodes.len(),
1519            peers=format!("[{:#}]", msg.nodes.iter()
1520                .map(|node_rec| node_rec.id
1521            ).format(", ")),
1522            "Received peers from Neighbours packet"
1523        );
1524
1525        // This is the recursive lookup step where we initiate new FindNode requests for new nodes
1526        // that were discovered.
1527        for node in msg.nodes.into_iter().map(NodeRecord::into_ipv4_mapped) {
1528            // prevent banned peers from being added to the context
1529            if self.config.ban_list.is_banned(&node.id, &node.address) {
1530                trace!(target: "discv4", peer_id=?node.id, ip=?node.address, "ignoring banned record");
1531                continue
1532            }
1533
1534            ctx.add_node(node);
1535        }
1536
1537        // get the next closest nodes, not yet queried nodes and start over.
1538        let closest =
1539            ctx.filter_closest(ALPHA, |node| !self.pending_find_nodes.contains_key(&node.id));
1540
1541        for closest in closest {
1542            let key = kad_key(closest.id);
1543            match self.kbuckets.entry(&key) {
1544                BucketEntry::Absent(entry) => {
1545                    // the node's endpoint is not proven yet, so we need to ping it first, on
1546                    // success, we will add the node to the pending_lookup table, and wait to send
1547                    // back a Pong before initiating a FindNode request.
1548                    // In order to prevent that this node is selected again on subsequent responses,
1549                    // while the ping is still active, we always mark it as queried.
1550                    ctx.mark_queried(closest.id);
1551                    let node = NodeEntry::new(closest);
1552                    match entry.insert(
1553                        node,
1554                        NodeStatus {
1555                            direction: ConnectionDirection::Outgoing,
1556                            state: ConnectionState::Disconnected,
1557                        },
1558                    ) {
1559                        BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1560                            // only ping if the node was added to the table
1561                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1562                        }
1563                        BucketInsertResult::Full => {
1564                            // new node but the node's bucket is already full
1565                            self.notify(DiscoveryUpdate::DiscoveredAtCapacity(closest))
1566                        }
1567                        _ => {}
1568                    }
1569                }
1570                BucketEntry::SelfEntry => {
1571                    // we received our own node entry
1572                }
1573                BucketEntry::Present(entry, _) => {
1574                    if entry.value().has_endpoint_proof {
1575                        if entry
1576                            .value()
1577                            .exceeds_find_node_failures(self.config.max_find_node_failures)
1578                        {
1579                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1580                        } else {
1581                            self.find_node(&closest, ctx.clone());
1582                        }
1583                    }
1584                }
1585                BucketEntry::Pending(entry, _) => {
1586                    if entry.value().has_endpoint_proof {
1587                        if entry
1588                            .value()
1589                            .exceeds_find_node_failures(self.config.max_find_node_failures)
1590                        {
1591                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1592                        } else {
1593                            self.find_node(&closest, ctx.clone());
1594                        }
1595                    }
1596                }
1597            }
1598        }
1599    }
1600
1601    /// Sends a Neighbours packet for `target` to the given addr
1602    fn respond_closest(&mut self, target: PeerId, to: SocketAddr) {
1603        let key = kad_key(target);
1604        let expire = self.send_neighbours_expiration();
1605
1606        // get the MAX_NODES_PER_BUCKET closest nodes to the target
1607        let closest_nodes =
1608            self.kbuckets.closest_values(&key).take(MAX_NODES_PER_BUCKET).collect::<Vec<_>>();
1609
1610        for nodes in closest_nodes.chunks(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) {
1611            let nodes = nodes.iter().map(|node| node.value.record).collect::<Vec<NodeRecord>>();
1612            trace!(target: "discv4", len = nodes.len(), to=?to,"Sent neighbours packet");
1613            let msg = Message::Neighbours(Neighbours { nodes, expire });
1614            self.send_packet(msg, to);
1615        }
1616    }
1617
1618    fn evict_expired_requests(&mut self, now: Instant) {
1619        self.pending_enr_requests.retain(|_node_id, enr_request| {
1620            now.duration_since(enr_request.sent_at) < self.config.enr_expiration
1621        });
1622
1623        let mut failed_pings = Vec::new();
1624        self.pending_pings.retain(|node_id, ping_request| {
1625            if now.duration_since(ping_request.sent_at) > self.config.ping_expiration {
1626                failed_pings.push(*node_id);
1627                return false
1628            }
1629            true
1630        });
1631
1632        if !failed_pings.is_empty() {
1633            // remove nodes that failed to pong
1634            trace!(target: "discv4", num=%failed_pings.len(), "evicting nodes due to failed pong");
1635            for node_id in failed_pings {
1636                self.remove_node(node_id);
1637            }
1638        }
1639
1640        let mut failed_lookups = Vec::new();
1641        self.pending_lookup.retain(|node_id, (lookup_sent_at, _)| {
1642            if now.duration_since(*lookup_sent_at) > self.config.request_timeout {
1643                failed_lookups.push(*node_id);
1644                return false
1645            }
1646            true
1647        });
1648
1649        if !failed_lookups.is_empty() {
1650            // remove nodes that failed the e2e lookup process, so we can restart it
1651            trace!(target: "discv4", num=%failed_lookups.len(), "evicting nodes due to failed lookup");
1652            for node_id in failed_lookups {
1653                self.remove_node(node_id);
1654            }
1655        }
1656
1657        self.evict_failed_find_nodes(now);
1658    }
1659
1660    /// Handles failed responses to `FindNode`
1661    fn evict_failed_find_nodes(&mut self, now: Instant) {
1662        let mut failed_find_nodes = Vec::new();
1663        self.pending_find_nodes.retain(|node_id, find_node_request| {
1664            if now.duration_since(find_node_request.sent_at) > self.config.neighbours_expiration {
1665                if !find_node_request.answered {
1666                    // node actually responded but with fewer entries than expected, but we don't
1667                    // treat this as an hard error since it responded.
1668                    failed_find_nodes.push(*node_id);
1669                }
1670                return false
1671            }
1672            true
1673        });
1674
1675        if failed_find_nodes.is_empty() {
1676            return
1677        }
1678
1679        trace!(target: "discv4", num=%failed_find_nodes.len(), "processing failed find nodes");
1680
1681        for node_id in failed_find_nodes {
1682            let key = kad_key(node_id);
1683            let failures = match self.kbuckets.entry(&key) {
1684                kbucket::Entry::Present(mut entry, _) => {
1685                    entry.value_mut().inc_failed_request();
1686                    entry.value().find_node_failures
1687                }
1688                kbucket::Entry::Pending(mut entry, _) => {
1689                    entry.value_mut().inc_failed_request();
1690                    entry.value().find_node_failures
1691                }
1692                _ => continue,
1693            };
1694
1695            // if the node failed to respond anything useful multiple times, remove the node from
1696            // the table, but only if there are enough other nodes in the bucket (bucket must be at
1697            // least half full)
1698            if failures > self.config.max_find_node_failures {
1699                self.soft_remove_node(node_id);
1700            }
1701        }
1702    }
1703
1704    /// Re-pings all nodes which endpoint proofs are considered expired: [`NodeEntry::is_expired`]
1705    ///
1706    /// This will send a `Ping` to the nodes, if a node fails to respond with a `Pong` to renew the
1707    /// endpoint proof it will be removed from the table.
1708    fn re_ping_oldest(&mut self) {
1709        let mut nodes = self
1710            .kbuckets
1711            .iter_ref()
1712            .filter(|entry| entry.node.value.is_expired())
1713            .map(|n| n.node.value)
1714            .collect::<Vec<_>>();
1715        nodes.sort_unstable_by_key(|a| a.last_seen);
1716        let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::<Vec<_>>();
1717        for node in to_ping {
1718            self.try_ping(node, PingReason::RePing)
1719        }
1720    }
1721
1722    /// Returns true if the expiration timestamp is in the past.
1723    fn is_expired(&self, expiration: u64) -> bool {
1724        self.ensure_not_expired(expiration).is_err()
1725    }
1726
1727    /// Validate that given timestamp is not expired.
1728    ///
1729    /// Note: this accepts the timestamp as u64 because this is used by the wire protocol, but the
1730    /// UNIX timestamp (number of non-leap seconds since January 1, 1970 0:00:00 UTC) is supposed to
1731    /// be an i64.
1732    ///
1733    /// Returns an error if:
1734    ///  - invalid UNIX timestamp (larger than `i64::MAX`)
1735    ///  - timestamp is expired (lower than current local UNIX timestamp)
1736    fn ensure_not_expired(&self, timestamp: u64) -> Result<(), ()> {
1737        // ensure the timestamp is a valid UNIX timestamp
1738        let _ = i64::try_from(timestamp).map_err(drop)?;
1739
1740        let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
1741        if self.config.enforce_expiration_timestamps && timestamp < now {
1742            trace!(target: "discv4", "Expired packet");
1743            return Err(())
1744        }
1745        Ok(())
1746    }
1747
1748    /// Pops buffered ping requests and sends them.
1749    fn ping_buffered(&mut self) {
1750        while self.pending_pings.len() < MAX_NODES_PING {
1751            match self.queued_pings.pop_front() {
1752                Some((next, reason)) => self.try_ping(next, reason),
1753                None => break,
1754            }
1755        }
1756    }
1757
1758    fn ping_expiration(&self) -> u64 {
1759        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.ping_expiration)
1760            .as_secs()
1761    }
1762
1763    fn find_node_expiration(&self) -> u64 {
1764        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.request_timeout)
1765            .as_secs()
1766    }
1767
1768    fn enr_request_expiration(&self) -> u64 {
1769        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.enr_expiration)
1770            .as_secs()
1771    }
1772
1773    fn send_neighbours_expiration(&self) -> u64 {
1774        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.neighbours_expiration)
1775            .as_secs()
1776    }
1777
1778    /// Polls the socket and advances the state.
1779    ///
1780    /// To prevent traffic amplification attacks, implementations must verify that the sender of a
1781    /// query participates in the discovery protocol. The sender of a packet is considered verified
1782    /// if it has sent a valid Pong response with matching ping hash within the last 12 hours.
1783    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Discv4Event> {
1784        loop {
1785            // drain buffered events first
1786            if let Some(event) = self.queued_events.pop_front() {
1787                return Poll::Ready(event)
1788            }
1789
1790            // trigger self lookup
1791            if self.config.enable_lookup {
1792                while self.lookup_interval.poll_tick(cx).is_ready() {
1793                    let target = self.lookup_rotator.next(&self.local_node_record.id);
1794                    self.lookup_with(target, None);
1795                }
1796            }
1797
1798            // re-ping some peers
1799            while self.ping_interval.poll_tick(cx).is_ready() {
1800                self.re_ping_oldest();
1801            }
1802
1803            if let Some(Poll::Ready(Some(ip))) =
1804                self.resolve_external_ip_interval.as_mut().map(|r| r.poll_tick(cx))
1805            {
1806                self.set_external_ip_addr(ip);
1807            }
1808
1809            // drain all incoming `Discv4` commands, this channel can never close
1810            while let Poll::Ready(Some(cmd)) = self.commands_rx.poll_recv(cx) {
1811                match cmd {
1812                    Discv4Command::Add(enr) => {
1813                        self.add_node(enr);
1814                    }
1815                    Discv4Command::AddBootNode(record) => {
1816                        self.add_boot_node(record);
1817                    }
1818                    Discv4Command::Lookup { node_id, tx } => {
1819                        let node_id = node_id.unwrap_or(self.local_node_record.id);
1820                        self.lookup_with(node_id, tx);
1821                    }
1822                    Discv4Command::SetLookupInterval(duration) => {
1823                        self.set_lookup_interval(duration);
1824                    }
1825                    Discv4Command::Updates(tx) => {
1826                        let rx = self.update_stream();
1827                        let _ = tx.send(rx);
1828                    }
1829                    Discv4Command::BanPeer(node_id) => self.ban_node(node_id),
1830                    Discv4Command::Remove(node_id) => {
1831                        self.remove_node(node_id);
1832                    }
1833                    Discv4Command::Ban(node_id, ip) => {
1834                        self.ban_node(node_id);
1835                        self.ban_ip(ip);
1836                    }
1837                    Discv4Command::BanIp(ip) => {
1838                        self.ban_ip(ip);
1839                    }
1840                    Discv4Command::SetEIP868RLPPair { key, rlp } => {
1841                        debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair");
1842
1843                        let _ = self.local_eip_868_enr.insert_raw_rlp(key, rlp, &self.secret_key);
1844                    }
1845                    Discv4Command::SetTcpPort(port) => {
1846                        debug!(target: "discv4", %port, "Update tcp port");
1847                        self.local_node_record.tcp_port = port;
1848                        if self.local_node_record.address.is_ipv4() {
1849                            let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key);
1850                        } else {
1851                            let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key);
1852                        }
1853                    }
1854
1855                    Discv4Command::Terminated => {
1856                        // terminate the service
1857                        self.queued_events.push_back(Discv4Event::Terminated);
1858                    }
1859                }
1860            }
1861
1862            // restricts how many messages we process in a single poll before yielding back control
1863            let mut udp_message_budget = UDP_MESSAGE_POLL_LOOP_BUDGET;
1864
1865            // process all incoming datagrams
1866            while let Poll::Ready(Some(event)) = self.ingress.poll_recv(cx) {
1867                match event {
1868                    IngressEvent::RecvError(err) => {
1869                        debug!(target: "discv4", %err, "failed to read datagram");
1870                    }
1871                    IngressEvent::BadPacket(from, err, data) => {
1872                        trace!(target: "discv4", ?from, %err, packet=?hex::encode(&data), "bad packet");
1873                    }
1874                    IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => {
1875                        trace!(target: "discv4", r#type=?msg.msg_type(), from=?remote_addr,"received packet");
1876                        let event = match msg {
1877                            Message::Ping(ping) => {
1878                                self.on_ping(ping, remote_addr, node_id, hash);
1879                                Discv4Event::Ping
1880                            }
1881                            Message::Pong(pong) => {
1882                                self.on_pong(pong, remote_addr, node_id);
1883                                Discv4Event::Pong
1884                            }
1885                            Message::FindNode(msg) => {
1886                                self.on_find_node(msg, remote_addr, node_id);
1887                                Discv4Event::FindNode
1888                            }
1889                            Message::Neighbours(msg) => {
1890                                self.on_neighbours(msg, remote_addr, node_id);
1891                                Discv4Event::Neighbours
1892                            }
1893                            Message::EnrRequest(msg) => {
1894                                self.on_enr_request(msg, remote_addr, node_id, hash);
1895                                Discv4Event::EnrRequest
1896                            }
1897                            Message::EnrResponse(msg) => {
1898                                self.on_enr_response(msg, remote_addr, node_id);
1899                                Discv4Event::EnrResponse
1900                            }
1901                        };
1902
1903                        self.queued_events.push_back(event);
1904                    }
1905                }
1906
1907                udp_message_budget -= 1;
1908                if udp_message_budget < 0 {
1909                    trace!(target: "discv4", budget=UDP_MESSAGE_POLL_LOOP_BUDGET, "exhausted message poll budget");
1910                    if self.queued_events.is_empty() {
1911                        // we've exceeded the message budget and have no events to process
1912                        // this will make sure we're woken up again
1913                        cx.waker().wake_by_ref();
1914                    }
1915                    break
1916                }
1917            }
1918
1919            // try resending buffered pings
1920            self.ping_buffered();
1921
1922            // evict expired requests
1923            while self.evict_expired_requests_interval.poll_tick(cx).is_ready() {
1924                self.evict_expired_requests(Instant::now());
1925            }
1926
1927            // evict expired nodes
1928            while self.expire_interval.poll_tick(cx).is_ready() {
1929                self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION);
1930            }
1931
1932            if self.queued_events.is_empty() {
1933                return Poll::Pending
1934            }
1935        }
1936    }
1937}
1938
1939/// Endless future impl
1940impl Stream for Discv4Service {
1941    type Item = Discv4Event;
1942
1943    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1944        // Poll the internal poll method
1945        match ready!(self.get_mut().poll(cx)) {
1946            // if the service is terminated, return None to terminate the stream
1947            Discv4Event::Terminated => Poll::Ready(None),
1948            // For any other event, return Poll::Ready(Some(event))
1949            ev => Poll::Ready(Some(ev)),
1950        }
1951    }
1952}
1953
1954impl fmt::Debug for Discv4Service {
1955    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1956        f.debug_struct("Discv4Service")
1957            .field("local_address", &self.local_address)
1958            .field("local_peer_id", &self.local_peer_id())
1959            .field("local_node_record", &self.local_node_record)
1960            .field("queued_pings", &self.queued_pings)
1961            .field("pending_lookup", &self.pending_lookup)
1962            .field("pending_find_nodes", &self.pending_find_nodes)
1963            .field("lookup_interval", &self.lookup_interval)
1964            .finish_non_exhaustive()
1965    }
1966}
1967
1968/// The Event type the Service stream produces.
1969///
1970/// This is mainly used for testing purposes and represents messages the service processed
1971#[derive(Debug, Eq, PartialEq)]
1972pub enum Discv4Event {
1973    /// A `Ping` message was handled.
1974    Ping,
1975    /// A `Pong` message was handled.
1976    Pong,
1977    /// A `FindNode` message was handled.
1978    FindNode,
1979    /// A `Neighbours` message was handled.
1980    Neighbours,
1981    /// A `EnrRequest` message was handled.
1982    EnrRequest,
1983    /// A `EnrResponse` message was handled.
1984    EnrResponse,
1985    /// Service is being terminated
1986    Terminated,
1987}
1988
1989/// Continuously reads new messages from the channel and writes them to the socket
1990pub(crate) async fn send_loop(udp: Arc<UdpSocket>, rx: EgressReceiver) {
1991    let mut stream = ReceiverStream::new(rx);
1992    while let Some((payload, to)) = stream.next().await {
1993        match udp.send_to(&payload, to).await {
1994            Ok(size) => {
1995                trace!(target: "discv4", ?to, ?size,"sent payload");
1996            }
1997            Err(err) => {
1998                debug!(target: "discv4", ?to, %err,"Failed to send datagram.");
1999            }
2000        }
2001    }
2002}
2003
2004/// Rate limits the number of incoming packets from individual IPs to 1 packet/second
2005const MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP: usize = 60usize;
2006
2007/// Continuously awaits new incoming messages and sends them back through the channel.
2008///
2009/// The receive loop enforces primitive rate limiting for IPs to prevent message spams from
2010/// individual IPs.
2011pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_id: PeerId) {
2012    let mut handler = IngressHandler::new(tx, local_id);
2013    let mut buf = [0; MAX_PACKET_SIZE];
2014    loop {
2015        let res = udp.recv_from(&mut buf).await;
2016        match res {
2017            Err(err) => {
2018                debug!(target: "discv4", %err, "Failed to read datagram.");
2019                handler.send(IngressEvent::RecvError(err)).await;
2020            }
2021            Ok((read, remote_addr)) => {
2022                handler.handle_packet(&buf[..read], remote_addr).await;
2023            }
2024        }
2025    }
2026}
2027
2028/// Handles decoding, rate-limiting, and deduplication of incoming discv4 packets.
2029///
2030/// Used by both the standalone receive loop and the shared-port mode via
2031/// [`Discv4::bind_shared`].
2032#[derive(Debug)]
2033pub struct IngressHandler {
2034    tx: IngressSender,
2035    local_id: PeerId,
2036    tick: usize,
2037    tick_interval: Duration,
2038    cache: ReceiveCache,
2039    last_tick: Instant,
2040}
2041
2042impl IngressHandler {
2043    fn new(tx: IngressSender, local_id: PeerId) -> Self {
2044        let tick = MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP / 2;
2045        Self {
2046            tx,
2047            local_id,
2048            tick,
2049            tick_interval: Duration::from_secs(tick as u64),
2050            cache: ReceiveCache::default(),
2051            last_tick: Instant::now(),
2052        }
2053    }
2054
2055    async fn send(&self, event: IngressEvent) {
2056        let _ = self.tx.send(event).await.map_err(|err| {
2057            debug!(target: "discv4", %err, "failed send incoming packet");
2058        });
2059    }
2060
2061    /// Handles an incoming raw packet: decodes, rate-limits, deduplicates, and forwards to the
2062    /// discv4 service. Used in shared-port mode to process unrecognized frames from discv5.
2063    pub async fn handle_packet(&mut self, data: &[u8], src: SocketAddr) {
2064        if self.last_tick.elapsed() >= self.tick_interval {
2065            self.cache.tick_ips(self.tick);
2066            self.last_tick = Instant::now();
2067        }
2068
2069        // rate limit incoming packets by IP
2070        if self.cache.inc_ip(src.ip()) > MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP {
2071            trace!(target: "discv4", ?src, "Too many incoming packets from IP.");
2072            return
2073        }
2074
2075        let event = match Message::decode(data) {
2076            Ok(packet) => {
2077                if packet.node_id == self.local_id {
2078                    debug!(target: "discv4", ?src, "Received own packet.");
2079                    return
2080                }
2081
2082                if self.cache.contains_packet(packet.hash) {
2083                    debug!(target: "discv4", ?src, "Received duplicate packet.");
2084                    return
2085                }
2086
2087                IngressEvent::Packet(src, packet)
2088            }
2089            Err(err) => {
2090                trace!(target: "discv4", %err, "Failed to decode packet");
2091                IngressEvent::BadPacket(src, err, data.to_vec())
2092            }
2093        };
2094
2095        self.send(event).await;
2096    }
2097}
2098
2099/// A cache for received packets and their source address.
2100///
2101/// This is used to discard duplicated packets and rate limit messages from the same source.
2102#[derive(Debug)]
2103struct ReceiveCache {
2104    /// keeps track of how many messages we've received from a given IP address since the last
2105    /// tick.
2106    ///
2107    /// This is used to count the number of messages received from a given IP address within an
2108    /// interval.
2109    ip_messages: HashMap<IpAddr, usize>,
2110    // keeps track of unique packet hashes
2111    unique_packets: schnellru::LruMap<B256, ()>,
2112}
2113
2114impl ReceiveCache {
2115    /// Updates the counter for each IP address and removes IPs that have exceeded the limit.
2116    ///
2117    /// This will decrement the counter for each IP address and remove IPs that have reached 0.
2118    fn tick_ips(&mut self, tick: usize) {
2119        self.ip_messages.retain(|_, count| {
2120            if let Some(reset) = count.checked_sub(tick) {
2121                *count = reset;
2122                true
2123            } else {
2124                false
2125            }
2126        });
2127    }
2128
2129    /// Increases the counter for the given IP address and returns the new count.
2130    fn inc_ip(&mut self, ip: IpAddr) -> usize {
2131        let ctn = self.ip_messages.entry(ip).or_default();
2132        *ctn = ctn.saturating_add(1);
2133        *ctn
2134    }
2135
2136    /// Returns true if we previously received the packet
2137    fn contains_packet(&mut self, hash: B256) -> bool {
2138        !self.unique_packets.insert(hash, ())
2139    }
2140}
2141
2142impl Default for ReceiveCache {
2143    fn default() -> Self {
2144        Self {
2145            ip_messages: Default::default(),
2146            unique_packets: schnellru::LruMap::new(schnellru::ByLength::new(32)),
2147        }
2148    }
2149}
2150
2151/// The commands sent from the frontend [Discv4] to the service [`Discv4Service`].
2152enum Discv4Command {
2153    Add(NodeRecord),
2154    AddBootNode(NodeRecord),
2155    SetTcpPort(u16),
2156    SetEIP868RLPPair { key: Vec<u8>, rlp: Bytes },
2157    Ban(PeerId, IpAddr),
2158    BanPeer(PeerId),
2159    BanIp(IpAddr),
2160    Remove(PeerId),
2161    Lookup { node_id: Option<PeerId>, tx: Option<NodeRecordSender> },
2162    SetLookupInterval(Duration),
2163    Updates(OneshotSender<ReceiverStream<DiscoveryUpdate>>),
2164    Terminated,
2165}
2166
2167/// Event type receiver produces
2168#[derive(Debug)]
2169pub(crate) enum IngressEvent {
2170    /// Encountered an error when reading a datagram message.
2171    RecvError(io::Error),
2172    /// Received a bad message
2173    BadPacket(SocketAddr, DecodePacketError, Vec<u8>),
2174    /// Received a datagram from an address.
2175    Packet(SocketAddr, Packet),
2176}
2177
2178/// Tracks a sent ping
2179#[derive(Debug)]
2180struct PingRequest {
2181    // Timestamp when the request was sent.
2182    sent_at: Instant,
2183    // Node to which the request was sent.
2184    node: NodeRecord,
2185    // Hash sent in the Ping request
2186    echo_hash: B256,
2187    /// Why this ping was sent.
2188    reason: PingReason,
2189}
2190
2191/// Rotates the `PeerId` that is periodically looked up.
2192///
2193/// By selecting different targets, the lookups will be seeded with different ALPHA seed nodes.
2194#[derive(Debug)]
2195struct LookupTargetRotator {
2196    interval: usize,
2197    counter: usize,
2198}
2199
2200// === impl LookupTargetRotator ===
2201
2202impl LookupTargetRotator {
2203    /// Returns a rotator that always returns the local target.
2204    const fn local_only() -> Self {
2205        Self { interval: 1, counter: 0 }
2206    }
2207}
2208
2209impl Default for LookupTargetRotator {
2210    fn default() -> Self {
2211        Self {
2212            // every 4th lookup is our own node
2213            interval: 4,
2214            counter: 3,
2215        }
2216    }
2217}
2218
2219impl LookupTargetRotator {
2220    /// This will return the next node id to lookup
2221    fn next(&mut self, local: &PeerId) -> PeerId {
2222        self.counter += 1;
2223        self.counter %= self.interval;
2224        if self.counter == 0 {
2225            return *local
2226        }
2227        PeerId::random()
2228    }
2229}
2230
2231/// Tracks lookups across multiple `FindNode` requests.
2232///
2233/// If this type is dropped by all Clones, it will send all the discovered nodes to the listener, if
2234/// one is present.
2235#[derive(Clone, Debug)]
2236struct LookupContext {
2237    inner: Rc<LookupContextInner>,
2238}
2239
2240impl LookupContext {
2241    /// Create new context for a recursive lookup
2242    fn new(
2243        target: discv5::Key<NodeKey>,
2244        nearest_nodes: impl IntoIterator<Item = (Distance, NodeRecord)>,
2245        listener: Option<NodeRecordSender>,
2246    ) -> Self {
2247        let closest_nodes = nearest_nodes
2248            .into_iter()
2249            .map(|(distance, record)| {
2250                (distance, QueryNode { record, queried: false, responded: false })
2251            })
2252            .collect();
2253
2254        let inner = Rc::new(LookupContextInner {
2255            target,
2256            closest_nodes: RefCell::new(closest_nodes),
2257            listener,
2258        });
2259        Self { inner }
2260    }
2261
2262    /// Returns the target of this lookup
2263    fn target(&self) -> PeerId {
2264        self.inner.target.preimage().0
2265    }
2266
2267    fn closest(&self, num: usize) -> Vec<NodeRecord> {
2268        self.inner
2269            .closest_nodes
2270            .borrow()
2271            .iter()
2272            .filter(|(_, node)| !node.queried)
2273            .map(|(_, n)| n.record)
2274            .take(num)
2275            .collect()
2276    }
2277
2278    /// Returns the closest nodes that have not been queried yet.
2279    fn filter_closest<P>(&self, num: usize, filter: P) -> Vec<NodeRecord>
2280    where
2281        P: FnMut(&NodeRecord) -> bool,
2282    {
2283        self.inner
2284            .closest_nodes
2285            .borrow()
2286            .iter()
2287            .filter(|(_, node)| !node.queried)
2288            .map(|(_, n)| n.record)
2289            .filter(filter)
2290            .take(num)
2291            .collect()
2292    }
2293
2294    /// Inserts the node if it's missing
2295    fn add_node(&self, record: NodeRecord) {
2296        let distance = self.inner.target.distance(&kad_key(record.id));
2297        let mut closest = self.inner.closest_nodes.borrow_mut();
2298        if let btree_map::Entry::Vacant(entry) = closest.entry(distance) {
2299            entry.insert(QueryNode { record, queried: false, responded: false });
2300        }
2301    }
2302
2303    fn set_queried(&self, id: PeerId, val: bool) {
2304        if let Some((_, node)) =
2305            self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2306        {
2307            node.queried = val;
2308        }
2309    }
2310
2311    /// Marks the node as queried
2312    fn mark_queried(&self, id: PeerId) {
2313        self.set_queried(id, true)
2314    }
2315
2316    /// Marks the node as not queried
2317    fn unmark_queried(&self, id: PeerId) {
2318        self.set_queried(id, false)
2319    }
2320
2321    /// Marks the node as responded
2322    fn mark_responded(&self, id: PeerId) {
2323        if let Some((_, node)) =
2324            self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2325        {
2326            node.responded = true;
2327        }
2328    }
2329}
2330
2331// SAFETY: The [`Discv4Service`] is intended to be spawned as task which requires `Send`.
2332// The `LookupContext` is shared by all active `FindNode` requests that are part of the lookup step.
2333// Which can modify the context. The shared context is only ever accessed mutably when a `Neighbour`
2334// response is processed and all Clones are stored inside [`Discv4Service`], in other words it is
2335// guaranteed that there's only 1 owner ([`Discv4Service`]) of all possible [`Rc`] clones of
2336// [`LookupContext`].
2337unsafe impl Send for LookupContext {}
2338#[derive(Debug)]
2339struct LookupContextInner {
2340    /// The target to lookup.
2341    target: discv5::Key<NodeKey>,
2342    /// The closest nodes
2343    closest_nodes: RefCell<BTreeMap<Distance, QueryNode>>,
2344    /// A listener for all the nodes retrieved in this lookup
2345    ///
2346    /// This is present if the lookup was triggered manually via [Discv4] and we want to return all
2347    /// the nodes once the lookup finishes.
2348    listener: Option<NodeRecordSender>,
2349}
2350
2351impl Drop for LookupContextInner {
2352    fn drop(&mut self) {
2353        if let Some(tx) = self.listener.take() {
2354            // there's only 1 instance shared across `FindNode` requests, if this is dropped then
2355            // all requests finished, and we can send all results back
2356            let nodes = self
2357                .closest_nodes
2358                .take()
2359                .into_values()
2360                .filter(|node| node.responded)
2361                .map(|node| node.record)
2362                .collect();
2363            let _ = tx.send(nodes);
2364        }
2365    }
2366}
2367
2368/// Tracks the state of a recursive lookup step
2369#[derive(Debug, Clone, Copy)]
2370struct QueryNode {
2371    record: NodeRecord,
2372    queried: bool,
2373    responded: bool,
2374}
2375
2376#[derive(Debug)]
2377struct FindNodeRequest {
2378    // Timestamp when the request was sent.
2379    sent_at: Instant,
2380    // Number of items sent by the node
2381    response_count: usize,
2382    // Whether the request has been answered yet.
2383    answered: bool,
2384    /// Response buffer
2385    lookup_context: LookupContext,
2386}
2387
2388// === impl FindNodeRequest ===
2389
2390impl FindNodeRequest {
2391    fn new(resp: LookupContext) -> Self {
2392        Self { sent_at: Instant::now(), response_count: 0, answered: false, lookup_context: resp }
2393    }
2394}
2395
2396/// Cached signed `FindNode` packet to avoid redundant ECDSA signing during Kademlia lookups.
2397#[derive(Debug)]
2398struct CachedFindNode {
2399    target: PeerId,
2400    payload: Bytes,
2401    hash: B256,
2402    cached_at: Instant,
2403}
2404
2405impl CachedFindNode {
2406    /// Returns the cached `(payload, hash)` if the target matches and the cache is still fresh,
2407    /// or signs a new packet, updates the cache, and returns it.
2408    fn get_or_sign(
2409        cache: &mut Option<Self>,
2410        target: PeerId,
2411        ttl: Duration,
2412        secret_key: &secp256k1::SecretKey,
2413        expire: u64,
2414    ) -> (Bytes, B256) {
2415        if let Some(c) = cache.as_ref() &&
2416            c.target == target &&
2417            c.cached_at.elapsed() < ttl
2418        {
2419            return (c.payload.clone(), c.hash);
2420        }
2421
2422        let msg = Message::FindNode(FindNode { id: target, expire });
2423        let (payload, hash) = msg.encode(secret_key);
2424
2425        *cache = Some(Self { target, payload: payload.clone(), hash, cached_at: Instant::now() });
2426
2427        (payload, hash)
2428    }
2429}
2430
2431#[derive(Debug)]
2432struct EnrRequestState {
2433    // Timestamp when the request was sent.
2434    sent_at: Instant,
2435    // Hash sent in the Ping request
2436    echo_hash: B256,
2437}
2438
2439/// Stored node info.
2440#[derive(Debug, Clone, Eq, PartialEq)]
2441struct NodeEntry {
2442    /// Node record info.
2443    record: NodeRecord,
2444    /// Timestamp of last pong.
2445    last_seen: Instant,
2446    /// Last enr seq we retrieved via a ENR request.
2447    last_enr_seq: Option<u64>,
2448    /// `ForkId` if retrieved via ENR requests.
2449    fork_id: Option<ForkId>,
2450    /// Counter for failed _consecutive_ findNode requests.
2451    find_node_failures: u8,
2452    /// Whether the endpoint of the peer is proven.
2453    has_endpoint_proof: bool,
2454}
2455
2456// === impl NodeEntry ===
2457
2458impl NodeEntry {
2459    /// Creates a new, unpopulated entry
2460    fn new(record: NodeRecord) -> Self {
2461        Self {
2462            record,
2463            last_seen: Instant::now(),
2464            last_enr_seq: None,
2465            fork_id: None,
2466            find_node_failures: 0,
2467            has_endpoint_proof: false,
2468        }
2469    }
2470
2471    #[cfg(test)]
2472    fn new_proven(record: NodeRecord) -> Self {
2473        let mut node = Self::new(record);
2474        node.has_endpoint_proof = true;
2475        node
2476    }
2477
2478    /// Marks the entry with an established proof and resets the consecutive failure counter.
2479    const fn establish_proof(&mut self) {
2480        self.has_endpoint_proof = true;
2481        self.find_node_failures = 0;
2482    }
2483
2484    /// Returns true if the tracked find node failures exceed the max amount
2485    const fn exceeds_find_node_failures(&self, max_failures: u8) -> bool {
2486        self.find_node_failures >= max_failures
2487    }
2488
2489    /// Updates the last timestamp and sets the enr seq
2490    fn update_with_enr(&mut self, last_enr_seq: Option<u64>) -> Option<u64> {
2491        self.update_now(|s| std::mem::replace(&mut s.last_enr_seq, last_enr_seq))
2492    }
2493
2494    /// Increases the failed request counter
2495    const fn inc_failed_request(&mut self) {
2496        self.find_node_failures += 1;
2497    }
2498
2499    /// Updates the last timestamp and sets the enr seq
2500    fn update_with_fork_id(&mut self, fork_id: Option<ForkId>) -> Option<ForkId> {
2501        self.update_now(|s| std::mem::replace(&mut s.fork_id, fork_id))
2502    }
2503
2504    /// Updates the `last_seen` timestamp and calls the closure
2505    fn update_now<F, R>(&mut self, f: F) -> R
2506    where
2507        F: FnOnce(&mut Self) -> R,
2508    {
2509        self.last_seen = Instant::now();
2510        f(self)
2511    }
2512}
2513
2514// === impl NodeEntry ===
2515
2516impl NodeEntry {
2517    /// Returns true if the node should be re-pinged.
2518    fn is_expired(&self) -> bool {
2519        self.last_seen.elapsed() > (ENDPOINT_PROOF_EXPIRATION / 2)
2520    }
2521}
2522
2523/// Represents why a ping is issued
2524#[derive(Debug)]
2525enum PingReason {
2526    /// Initial ping to a previously unknown peer that was inserted into the table.
2527    InitialInsert,
2528    /// A ping to a peer to establish a bond (endpoint proof).
2529    EstablishBond,
2530    /// Re-ping a peer.
2531    RePing,
2532    /// Part of a lookup to ensure endpoint is proven before we can send a `FindNode` request.
2533    Lookup(NodeRecord, LookupContext),
2534}
2535
2536/// Represents node related updates state changes in the underlying node table
2537#[derive(Debug, Clone)]
2538pub enum DiscoveryUpdate {
2539    /// A new node was discovered _and_ added to the table.
2540    Added(NodeRecord),
2541    /// A new node was discovered but _not_ added to the table because it is currently full.
2542    DiscoveredAtCapacity(NodeRecord),
2543    /// Received a [`ForkId`] via EIP-868 for the given [`NodeRecord`].
2544    EnrForkId(NodeRecord, ForkId),
2545    /// Node that was removed from the table
2546    Removed(PeerId),
2547    /// A series of updates
2548    Batch(Vec<Self>),
2549}
2550
2551#[cfg(test)]
2552mod tests {
2553    use super::*;
2554    use crate::test_utils::{create_discv4, create_discv4_with_config, rng_endpoint, rng_record};
2555    use alloy_primitives::hex;
2556    use alloy_rlp::{Decodable, Encodable};
2557    use rand_08::Rng;
2558    use reth_ethereum_forks::{EnrForkIdEntry, ForkHash};
2559    use reth_network_peers::mainnet_nodes;
2560    use std::future::poll_fn;
2561
2562    #[tokio::test]
2563    async fn test_configured_enr_forkid_entry() {
2564        let fork: ForkId = ForkId { hash: ForkHash([220, 233, 108, 45]), next: 0u64 };
2565        let mut disc_conf = Discv4Config::default();
2566        disc_conf.add_eip868_pair("eth", EnrForkIdEntry::from(fork));
2567        let (_discv4, service) = create_discv4_with_config(disc_conf).await;
2568        let eth = service.local_eip_868_enr.get_raw_rlp(b"eth").unwrap();
2569        let fork_entry_id = EnrForkIdEntry::decode(&mut &eth[..]).unwrap();
2570
2571        let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2572        let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2573        let expected = EnrForkIdEntry {
2574            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2575        };
2576        assert_eq!(expected, fork_entry_id);
2577        assert_eq!(expected, decoded);
2578    }
2579
2580    #[test]
2581    fn test_enr_forkid_entry_decode() {
2582        let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2583        let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2584        let expected = EnrForkIdEntry {
2585            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2586        };
2587        assert_eq!(expected, decoded);
2588    }
2589
2590    #[test]
2591    fn test_enr_forkid_entry_encode() {
2592        let original = EnrForkIdEntry {
2593            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2594        };
2595        let expected: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2596        let mut encoded = Vec::with_capacity(expected.len());
2597        original.encode(&mut encoded);
2598        assert_eq!(&expected[..], encoded.as_slice());
2599    }
2600
2601    #[test]
2602    fn test_local_rotator() {
2603        let id = PeerId::random();
2604        let mut rotator = LookupTargetRotator::local_only();
2605        assert_eq!(rotator.next(&id), id);
2606        assert_eq!(rotator.next(&id), id);
2607    }
2608
2609    #[test]
2610    fn test_rotator() {
2611        let id = PeerId::random();
2612        let mut rotator = LookupTargetRotator::default();
2613        assert_eq!(rotator.next(&id), id);
2614        assert_ne!(rotator.next(&id), id);
2615        assert_ne!(rotator.next(&id), id);
2616        assert_ne!(rotator.next(&id), id);
2617        assert_eq!(rotator.next(&id), id);
2618    }
2619
2620    #[tokio::test]
2621    async fn test_pending_ping() {
2622        let (_, mut service) = create_discv4().await;
2623
2624        let local_addr = service.local_addr();
2625
2626        let mut num_inserted = 0;
2627        loop {
2628            let node = NodeRecord::new(local_addr, PeerId::random());
2629            if service.add_node(node) {
2630                num_inserted += 1;
2631                assert!(service.pending_pings.contains_key(&node.id));
2632                assert_eq!(service.pending_pings.len(), num_inserted);
2633                if num_inserted == MAX_NODES_PING {
2634                    break
2635                }
2636            }
2637        }
2638
2639        // `pending_pings` is full, insert into `queued_pings`.
2640        num_inserted = 0;
2641        for _ in 0..MAX_NODES_PING {
2642            let node = NodeRecord::new(local_addr, PeerId::random());
2643            if service.add_node(node) {
2644                num_inserted += 1;
2645                assert!(!service.pending_pings.contains_key(&node.id));
2646                assert_eq!(service.pending_pings.len(), MAX_NODES_PING);
2647                assert_eq!(service.queued_pings.len(), num_inserted);
2648            }
2649        }
2650    }
2651
2652    // Bootstraps with mainnet boot nodes
2653    #[tokio::test(flavor = "multi_thread")]
2654    #[ignore]
2655    async fn test_mainnet_lookup() {
2656        reth_tracing::init_test_tracing();
2657        let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2658
2659        let all_nodes = mainnet_nodes();
2660        let config = Discv4Config::builder()
2661            .add_boot_nodes(all_nodes)
2662            .lookup_interval(Duration::from_secs(1))
2663            .add_eip868_pair("eth", fork_id)
2664            .build();
2665        let (_discv4, mut service) = create_discv4_with_config(config).await;
2666
2667        let mut updates = service.update_stream();
2668
2669        let _handle = service.spawn();
2670
2671        let mut table = HashMap::new();
2672        while let Some(update) = updates.next().await {
2673            match update {
2674                DiscoveryUpdate::EnrForkId(record, fork_id) => {
2675                    println!("{record:?}, {fork_id:?}");
2676                }
2677                DiscoveryUpdate::Added(record) => {
2678                    table.insert(record.id, record);
2679                }
2680                DiscoveryUpdate::Removed(id) => {
2681                    table.remove(&id);
2682                }
2683                _ => {}
2684            }
2685            println!("total peers {}", table.len());
2686        }
2687    }
2688
2689    #[tokio::test]
2690    async fn test_mapped_ipv4() {
2691        reth_tracing::init_test_tracing();
2692        let mut rng = rand_08::thread_rng();
2693        let config = Discv4Config::builder().build();
2694        let (_discv4, mut service) = create_discv4_with_config(config).await;
2695
2696        let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2697        let v6 = v4.to_ipv6_mapped();
2698        let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2699
2700        let ping = Ping {
2701            from: rng_endpoint(&mut rng),
2702            to: rng_endpoint(&mut rng),
2703            expire: service.ping_expiration(),
2704            enr_sq: Some(rng.r#gen()),
2705        };
2706
2707        let id = PeerId::random();
2708        service.on_ping(ping, addr, id, B256::random());
2709
2710        let key = kad_key(id);
2711        match service.kbuckets.entry(&key) {
2712            kbucket::Entry::Present(entry, _) => {
2713                let node_addr = entry.value().record.address;
2714                assert!(node_addr.is_ipv4());
2715                assert_eq!(node_addr, IpAddr::from(v4));
2716            }
2717            _ => unreachable!(),
2718        };
2719    }
2720
2721    #[tokio::test]
2722    async fn test_respect_ping_expiration() {
2723        reth_tracing::init_test_tracing();
2724        let mut rng = rand_08::thread_rng();
2725        let config = Discv4Config::builder().build();
2726        let (_discv4, mut service) = create_discv4_with_config(config).await;
2727
2728        let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2729        let v6 = v4.to_ipv6_mapped();
2730        let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2731
2732        let ping = Ping {
2733            from: rng_endpoint(&mut rng),
2734            to: rng_endpoint(&mut rng),
2735            expire: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() - 1,
2736            enr_sq: Some(rng.r#gen()),
2737        };
2738
2739        let id = PeerId::random();
2740        service.on_ping(ping, addr, id, B256::random());
2741
2742        let key = kad_key(id);
2743        match service.kbuckets.entry(&key) {
2744            kbucket::Entry::Absent(_) => {}
2745            _ => unreachable!(),
2746        };
2747    }
2748
2749    #[tokio::test]
2750    async fn test_single_lookups() {
2751        reth_tracing::init_test_tracing();
2752
2753        let config = Discv4Config::builder().build();
2754        let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2755
2756        let id = PeerId::random();
2757        let key = kad_key(id);
2758        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2759
2760        let _ = service.kbuckets.insert_or_update(
2761            &key,
2762            NodeEntry::new_proven(record),
2763            NodeStatus {
2764                direction: ConnectionDirection::Incoming,
2765                state: ConnectionState::Connected,
2766            },
2767        );
2768
2769        service.lookup_self();
2770        assert_eq!(service.pending_find_nodes.len(), 1);
2771
2772        poll_fn(|cx| {
2773            let _ = service.poll(cx);
2774            assert_eq!(service.pending_find_nodes.len(), 1);
2775
2776            Poll::Ready(())
2777        })
2778        .await;
2779    }
2780
2781    #[tokio::test]
2782    async fn test_on_neighbours_recursive_lookup() {
2783        reth_tracing::init_test_tracing();
2784
2785        let config = Discv4Config::builder().build();
2786        let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2787        let (_discv4, mut service2) = create_discv4_with_config(config).await;
2788
2789        let id = PeerId::random();
2790        let key = kad_key(id);
2791        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2792
2793        let _ = service.kbuckets.insert_or_update(
2794            &key,
2795            NodeEntry::new_proven(record),
2796            NodeStatus {
2797                direction: ConnectionDirection::Incoming,
2798                state: ConnectionState::Connected,
2799            },
2800        );
2801        // Needed in this test to populate self.pending_find_nodes for as a prereq to a valid
2802        // on_neighbours request
2803        service.lookup_self();
2804        assert_eq!(service.pending_find_nodes.len(), 1);
2805
2806        poll_fn(|cx| {
2807            let _ = service.poll(cx);
2808            assert_eq!(service.pending_find_nodes.len(), 1);
2809
2810            Poll::Ready(())
2811        })
2812        .await;
2813
2814        let expiry = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() +
2815            10000000000000;
2816        let msg = Neighbours { nodes: vec![service2.local_node_record], expire: expiry };
2817        service.on_neighbours(msg, record.tcp_addr(), id);
2818        // wait for the processed ping
2819        let event = poll_fn(|cx| service2.poll(cx)).await;
2820        assert_eq!(event, Discv4Event::Ping);
2821        // assert that no find_node req has been added here on top of the initial one, since both
2822        // sides of the endpoint proof is not completed here
2823        assert_eq!(service.pending_find_nodes.len(), 1);
2824        // we now wait for PONG
2825        let event = poll_fn(|cx| service.poll(cx)).await;
2826        assert_eq!(event, Discv4Event::Pong);
2827        // Ideally we want to assert against service.pending_lookup.len() here - but because the
2828        // service2 sends Pong and Ping consecutivley on_ping(), the pending_lookup table gets
2829        // drained almost immediately - and no way to grab the handle to its intermediary state here
2830        // :(
2831        let event = poll_fn(|cx| service.poll(cx)).await;
2832        assert_eq!(event, Discv4Event::Ping);
2833        // assert that we've added the find_node req here after both sides of the endpoint proof is
2834        // done
2835        assert_eq!(service.pending_find_nodes.len(), 2);
2836    }
2837
2838    #[tokio::test]
2839    async fn test_no_local_in_closest() {
2840        reth_tracing::init_test_tracing();
2841
2842        let config = Discv4Config::builder().build();
2843        let (_discv4, mut service) = create_discv4_with_config(config).await;
2844
2845        let target_key = kad_key(PeerId::random());
2846
2847        let id = PeerId::random();
2848        let key = kad_key(id);
2849        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2850
2851        let _ = service.kbuckets.insert_or_update(
2852            &key,
2853            NodeEntry::new(record),
2854            NodeStatus {
2855                direction: ConnectionDirection::Incoming,
2856                state: ConnectionState::Connected,
2857            },
2858        );
2859
2860        let closest = service
2861            .kbuckets
2862            .closest_values(&target_key)
2863            .map(|n| n.value.record)
2864            .take(MAX_NODES_PER_BUCKET)
2865            .collect::<Vec<_>>();
2866
2867        assert_eq!(closest.len(), 1);
2868        assert!(!closest.iter().any(|r| r.id == *service.local_peer_id()));
2869    }
2870
2871    #[tokio::test]
2872    async fn test_random_lookup() {
2873        reth_tracing::init_test_tracing();
2874
2875        let config = Discv4Config::builder().build();
2876        let (_discv4, mut service) = create_discv4_with_config(config).await;
2877
2878        let target = PeerId::random();
2879
2880        let id = PeerId::random();
2881        let key = kad_key(id);
2882        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2883
2884        let _ = service.kbuckets.insert_or_update(
2885            &key,
2886            NodeEntry::new_proven(record),
2887            NodeStatus {
2888                direction: ConnectionDirection::Incoming,
2889                state: ConnectionState::Connected,
2890            },
2891        );
2892
2893        service.lookup(target);
2894        assert_eq!(service.pending_find_nodes.len(), 1);
2895
2896        let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2897
2898        assert_eq!(ctx.target(), target);
2899        assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2900
2901        ctx.add_node(record);
2902        assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2903    }
2904
2905    #[tokio::test]
2906    async fn test_reping_on_find_node_failures() {
2907        reth_tracing::init_test_tracing();
2908
2909        let config = Discv4Config::builder().build();
2910        let (_discv4, mut service) = create_discv4_with_config(config).await;
2911
2912        let target = PeerId::random();
2913
2914        let id = PeerId::random();
2915        let key = kad_key(id);
2916        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2917
2918        let mut entry = NodeEntry::new_proven(record);
2919        entry.find_node_failures = u8::MAX;
2920        let _ = service.kbuckets.insert_or_update(
2921            &key,
2922            entry,
2923            NodeStatus {
2924                direction: ConnectionDirection::Incoming,
2925                state: ConnectionState::Connected,
2926            },
2927        );
2928
2929        service.lookup(target);
2930        assert_eq!(service.pending_find_nodes.len(), 0);
2931        assert_eq!(service.pending_pings.len(), 1);
2932
2933        service.update_on_pong(record, None);
2934
2935        service
2936            .on_entry(record.id, |entry| {
2937                // reset on pong
2938                assert_eq!(entry.find_node_failures, 0);
2939                assert!(entry.has_endpoint_proof);
2940            })
2941            .unwrap();
2942    }
2943
2944    #[tokio::test]
2945    async fn test_service_commands() {
2946        reth_tracing::init_test_tracing();
2947
2948        let config = Discv4Config::builder().build();
2949        let (discv4, mut service) = create_discv4_with_config(config).await;
2950
2951        service.lookup_self();
2952
2953        let _handle = service.spawn();
2954        discv4.send_lookup_self();
2955        let _ = discv4.lookup_self().await;
2956    }
2957
2958    #[tokio::test]
2959    async fn test_requests_timeout() {
2960        reth_tracing::init_test_tracing();
2961        let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2962
2963        let config = Discv4Config::builder()
2964            .request_timeout(Duration::from_millis(200))
2965            .ping_expiration(Duration::from_millis(200))
2966            .lookup_neighbours_expiration(Duration::from_millis(200))
2967            .add_eip868_pair("eth", fork_id)
2968            .build();
2969        let (_disv4, mut service) = create_discv4_with_config(config).await;
2970
2971        let id = PeerId::random();
2972        let key = kad_key(id);
2973        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2974
2975        let _ = service.kbuckets.insert_or_update(
2976            &key,
2977            NodeEntry::new_proven(record),
2978            NodeStatus {
2979                direction: ConnectionDirection::Incoming,
2980                state: ConnectionState::Connected,
2981            },
2982        );
2983
2984        service.lookup_self();
2985        assert_eq!(service.pending_find_nodes.len(), 1);
2986
2987        let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2988
2989        service.pending_lookup.insert(record.id, (Instant::now(), ctx));
2990
2991        assert_eq!(service.pending_lookup.len(), 1);
2992
2993        let ping = Ping {
2994            from: service.local_node_record.into(),
2995            to: record.into(),
2996            expire: service.ping_expiration(),
2997            enr_sq: service.enr_seq(),
2998        };
2999        let echo_hash = service.send_packet(Message::Ping(ping), record.udp_addr());
3000        let ping_request = PingRequest {
3001            sent_at: Instant::now(),
3002            node: record,
3003            echo_hash,
3004            reason: PingReason::InitialInsert,
3005        };
3006        service.pending_pings.insert(record.id, ping_request);
3007
3008        assert_eq!(service.pending_pings.len(), 1);
3009
3010        tokio::time::sleep(Duration::from_secs(1)).await;
3011
3012        poll_fn(|cx| {
3013            let _ = service.poll(cx);
3014
3015            assert_eq!(service.pending_find_nodes.len(), 0);
3016            assert_eq!(service.pending_lookup.len(), 0);
3017            assert_eq!(service.pending_pings.len(), 0);
3018
3019            Poll::Ready(())
3020        })
3021        .await;
3022    }
3023
3024    // sends a PING packet with wrong 'to' field and expects a PONG response.
3025    #[tokio::test(flavor = "multi_thread")]
3026    async fn test_check_wrong_to() {
3027        reth_tracing::init_test_tracing();
3028
3029        let config = Discv4Config::builder().external_ip_resolver(None).build();
3030        let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
3031        let (_discv4, mut service_2) = create_discv4_with_config(config).await;
3032
3033        // ping node 2 with wrong to field
3034        let mut ping = Ping {
3035            from: service_1.local_node_record.into(),
3036            to: service_2.local_node_record.into(),
3037            expire: service_1.ping_expiration(),
3038            enr_sq: service_1.enr_seq(),
3039        };
3040        ping.to.address = "192.0.2.0".parse().unwrap();
3041
3042        let echo_hash = service_1.send_packet(Message::Ping(ping), service_2.local_addr());
3043        let ping_request = PingRequest {
3044            sent_at: Instant::now(),
3045            node: service_2.local_node_record,
3046            echo_hash,
3047            reason: PingReason::InitialInsert,
3048        };
3049        service_1.pending_pings.insert(*service_2.local_peer_id(), ping_request);
3050
3051        // wait for the processed ping
3052        let event = poll_fn(|cx| service_2.poll(cx)).await;
3053        assert_eq!(event, Discv4Event::Ping);
3054
3055        // we now wait for PONG
3056        let event = poll_fn(|cx| service_1.poll(cx)).await;
3057        assert_eq!(event, Discv4Event::Pong);
3058        // followed by a ping
3059        let event = poll_fn(|cx| service_1.poll(cx)).await;
3060        assert_eq!(event, Discv4Event::Ping);
3061    }
3062
3063    #[tokio::test(flavor = "multi_thread")]
3064    async fn test_check_ping_pong() {
3065        reth_tracing::init_test_tracing();
3066
3067        let config = Discv4Config::builder().external_ip_resolver(None).build();
3068        let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
3069        let (_discv4, mut service_2) = create_discv4_with_config(config).await;
3070
3071        // send ping from 1 -> 2
3072        service_1.add_node(service_2.local_node_record);
3073
3074        // wait for the processed ping
3075        let event = poll_fn(|cx| service_2.poll(cx)).await;
3076        assert_eq!(event, Discv4Event::Ping);
3077
3078        // node is now in the table but not connected yet
3079        let key1 = kad_key(*service_1.local_peer_id());
3080        match service_2.kbuckets.entry(&key1) {
3081            kbucket::Entry::Present(_entry, status) => {
3082                assert!(!status.is_connected());
3083            }
3084            _ => unreachable!(),
3085        }
3086
3087        // we now wait for PONG
3088        let event = poll_fn(|cx| service_1.poll(cx)).await;
3089        assert_eq!(event, Discv4Event::Pong);
3090
3091        // endpoint is proven
3092        let key2 = kad_key(*service_2.local_peer_id());
3093        match service_1.kbuckets.entry(&key2) {
3094            kbucket::Entry::Present(_entry, status) => {
3095                assert!(status.is_connected());
3096            }
3097            _ => unreachable!(),
3098        }
3099
3100        // we now wait for the PING initiated by 2
3101        let event = poll_fn(|cx| service_1.poll(cx)).await;
3102        assert_eq!(event, Discv4Event::Ping);
3103
3104        // Drain events from service_2 until we see the Pong. Intermediate EnrRequest and
3105        // FindNode events are expected: ENR requests come from the ping handshake, and FindNode
3106        // arrives because service_1 resets its lookup interval on the first bootnode pong.
3107        tokio::time::timeout(Duration::from_secs(5), async {
3108            loop {
3109                let event = poll_fn(|cx| service_2.poll(cx)).await;
3110                match event {
3111                    Discv4Event::Pong => break,
3112                    Discv4Event::EnrRequest | Discv4Event::FindNode => {}
3113                    ev => unreachable!("{ev:?}"),
3114                }
3115            }
3116        })
3117        .await
3118        .expect("timed out waiting for Pong from service_2");
3119
3120        // endpoint is proven
3121        match service_2.kbuckets.entry(&key1) {
3122            kbucket::Entry::Present(_entry, status) => {
3123                assert!(status.is_connected());
3124            }
3125            ev => unreachable!("{ev:?}"),
3126        }
3127    }
3128
3129    #[test]
3130    fn test_insert() {
3131        let local_node_record = rng_record(&mut rand_08::thread_rng());
3132        let mut kbuckets: KBucketsTable<NodeKey, NodeEntry> = KBucketsTable::new(
3133            NodeKey::from(&local_node_record).into(),
3134            Duration::from_secs(60),
3135            MAX_NODES_PER_BUCKET,
3136            None,
3137            None,
3138        );
3139
3140        let new_record = rng_record(&mut rand_08::thread_rng());
3141        let key = kad_key(new_record.id);
3142        match kbuckets.entry(&key) {
3143            kbucket::Entry::Absent(entry) => {
3144                let node = NodeEntry::new(new_record);
3145                let _ = entry.insert(
3146                    node,
3147                    NodeStatus {
3148                        direction: ConnectionDirection::Outgoing,
3149                        state: ConnectionState::Disconnected,
3150                    },
3151                );
3152            }
3153            _ => {
3154                unreachable!()
3155            }
3156        };
3157        match kbuckets.entry(&key) {
3158            kbucket::Entry::Present(_, _) => {}
3159            _ => {
3160                unreachable!()
3161            }
3162        }
3163    }
3164
3165    #[tokio::test]
3166    async fn test_bootnode_not_in_update_stream() {
3167        reth_tracing::init_test_tracing();
3168        let (_, service_1) = create_discv4().await;
3169        let peerid_1 = *service_1.local_peer_id();
3170
3171        let config = Discv4Config::builder().add_boot_node(service_1.local_node_record).build();
3172        service_1.spawn();
3173
3174        let (_, mut service_2) = create_discv4_with_config(config).await;
3175
3176        let mut updates = service_2.update_stream();
3177
3178        service_2.spawn();
3179
3180        // Poll for events for a reasonable time
3181        let mut bootnode_appeared = false;
3182        let timeout = tokio::time::sleep(Duration::from_secs(1));
3183        tokio::pin!(timeout);
3184
3185        loop {
3186            tokio::select! {
3187                Some(update) = updates.next() => {
3188                    if let DiscoveryUpdate::Added(record) = update
3189                        && record.id == peerid_1 {
3190                            bootnode_appeared = true;
3191                            break;
3192                        }
3193                }
3194                _ = &mut timeout => break,
3195            }
3196        }
3197
3198        // Assert bootnode did not appear in update stream
3199        assert!(bootnode_appeared, "Bootnode should appear in update stream");
3200    }
3201
3202    fn insert_proven_node(service: &mut Discv4Service, record: NodeRecord) {
3203        let key = kad_key(record.id);
3204        let _ = service.kbuckets.insert_or_update(
3205            &key,
3206            NodeEntry::new_proven(record),
3207            NodeStatus {
3208                direction: ConnectionDirection::Incoming,
3209                state: ConnectionState::Connected,
3210            },
3211        );
3212    }
3213
3214    fn insert_initial_ping(service: &mut Discv4Service, record: NodeRecord) -> B256 {
3215        let echo_hash = B256::random();
3216        service.pending_pings.insert(
3217            record.id,
3218            PingRequest {
3219                sent_at: Instant::now(),
3220                node: record,
3221                echo_hash,
3222                reason: PingReason::InitialInsert,
3223            },
3224        );
3225        echo_hash
3226    }
3227
3228    fn make_pong(service: &Discv4Service, echo_hash: B256) -> Pong {
3229        Pong {
3230            to: rng_endpoint(&mut rand_08::thread_rng()),
3231            echo: echo_hash,
3232            expire: service.ping_expiration(),
3233            enr_sq: None,
3234        }
3235    }
3236
3237    #[tokio::test]
3238    async fn test_lookup_reset_on_first_bootnode_pong() {
3239        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
3240        let config = Discv4Config::builder().add_boot_node(record).build();
3241        let (_discv4, mut service) = create_discv4_with_config(config).await;
3242
3243        // 1. initial state
3244        assert!(service.pending_lookup_reset);
3245
3246        // 2. setup: proven bootnode + pending InitialInsert ping
3247        insert_proven_node(&mut service, record);
3248        let echo_hash = insert_initial_ping(&mut service, record);
3249
3250        // 3. input: pong arrives
3251        service.on_pong(make_pong(&service, echo_hash), record.udp_addr(), record.id);
3252
3253        // 4. flag should be consumed — interval was reset
3254        assert!(!service.pending_lookup_reset, "flag should be consumed");
3255    }
3256
3257    #[tokio::test]
3258    async fn test_lookup_reset_fires_only_once() {
3259        let records: Vec<_> = (0..2)
3260            .map(|_| NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random()))
3261            .collect();
3262        let config = Discv4Config::builder().add_boot_nodes(records.clone()).build();
3263        let (_discv4, mut service) = create_discv4_with_config(config).await;
3264
3265        // 1. setup: two proven bootnodes with pending InitialInsert pings
3266        for &r in &records {
3267            insert_proven_node(&mut service, r);
3268        }
3269        let hashes: Vec<_> =
3270            records.iter().map(|r| insert_initial_ping(&mut service, *r)).collect();
3271
3272        // 2. first pong -> consumes the flag (resets the interval)
3273        service.on_pong(make_pong(&service, hashes[0]), records[0].udp_addr(), records[0].id);
3274        assert!(!service.pending_lookup_reset);
3275
3276        // 3. second pong -> flag already consumed, no second reset
3277        service.on_pong(make_pong(&service, hashes[1]), records[1].udp_addr(), records[1].id);
3278        assert!(!service.pending_lookup_reset);
3279    }
3280
3281    #[tokio::test]
3282    async fn test_lookup_reset_not_triggered_by_non_bootnode() {
3283        let bootnode = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
3284        let config = Discv4Config::builder().add_boot_node(bootnode).build();
3285        let (_discv4, mut service) = create_discv4_with_config(config).await;
3286
3287        assert!(service.pending_lookup_reset);
3288
3289        // a non-bootnode pong should not consume the flag
3290        let stranger = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
3291        insert_proven_node(&mut service, stranger);
3292        let echo_hash = insert_initial_ping(&mut service, stranger);
3293        service.on_pong(make_pong(&service, echo_hash), stranger.udp_addr(), stranger.id);
3294
3295        assert!(service.pending_lookup_reset, "flag should not be consumed by non-bootnode");
3296    }
3297
3298    #[tokio::test]
3299    async fn test_lookup_reset_disabled_when_lookup_disabled() {
3300        let config = Discv4Config::builder().enable_lookup(false).build();
3301        let (_discv4, service) = create_discv4_with_config(config).await;
3302
3303        // flag should be false when lookups are disabled
3304        assert!(!service.pending_lookup_reset);
3305    }
3306}