reth_discv4/
lib.rs

1//! Discovery v4 implementation: <https://github.com/ethereum/devp2p/blob/master/discv4.md>
2//!
3//! Discv4 employs a kademlia-like routing table to store and manage discovered peers and topics.
4//! The protocol allows for external IP discovery in NAT environments through regular PING/PONG's
5//! with discovered nodes. Nodes return the external IP address that they have received and a simple
6//! majority is chosen as our external IP address. If an external IP address is updated, this is
7//! produced as an event to notify the swarm (if one is used for this behaviour).
8//!
9//! This implementation consists of a [`Discv4`] and [`Discv4Service`] pair. The service manages the
10//! state and drives the UDP socket. The (optional) [`Discv4`] serves as the frontend to interact
11//! with the service via a channel. Whenever the underlying table changes service produces a
12//! [`DiscoveryUpdate`] that listeners will receive.
13//!
14//! ## Feature Flags
15//!
16//! - `serde` (default): Enable serde support
17//! - `test-utils`: Export utilities for testing
18
19#![doc(
20    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
21    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
22    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
23)]
24#![cfg_attr(not(test), warn(unused_crate_dependencies))]
25#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
26
27use crate::{
28    error::{DecodePacketError, Discv4Error},
29    proto::{FindNode, Message, Neighbours, Packet, Ping, Pong},
30};
31use alloy_primitives::{bytes::Bytes, hex, B256};
32use discv5::{
33    kbucket,
34    kbucket::{
35        BucketInsertResult, Distance, Entry as BucketEntry, InsertResult, KBucketsTable,
36        NodeStatus, MAX_NODES_PER_BUCKET,
37    },
38    ConnectionDirection, ConnectionState,
39};
40use enr::Enr;
41use itertools::Itertools;
42use parking_lot::Mutex;
43use proto::{EnrRequest, EnrResponse};
44use reth_ethereum_forks::ForkId;
45use reth_network_peers::{pk2id, PeerId};
46use secp256k1::SecretKey;
47use std::{
48    cell::RefCell,
49    collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque},
50    fmt,
51    future::poll_fn,
52    io,
53    net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
54    pin::Pin,
55    rc::Rc,
56    sync::Arc,
57    task::{ready, Context, Poll},
58    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
59};
60use tokio::{
61    net::UdpSocket,
62    sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::Sender as OneshotSender},
63    task::{JoinHandle, JoinSet},
64    time::Interval,
65};
66use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
67use tracing::{debug, trace};
68
69pub mod error;
70pub mod proto;
71
72mod config;
73pub use config::{Discv4Config, Discv4ConfigBuilder};
74
75mod node;
76use node::{kad_key, NodeKey};
77
78mod table;
79
80// reexport NodeRecord primitive
81pub use reth_network_peers::NodeRecord;
82
83#[cfg(any(test, feature = "test-utils"))]
84pub mod test_utils;
85
86use crate::table::PongTable;
87use reth_net_nat::ResolveNatInterval;
88/// reexport to get public ip.
89pub use reth_net_nat::{external_ip, NatResolver};
90
91/// The default address for discv4 via UDP
92///
93/// Note: the default TCP address is the same.
94pub const DEFAULT_DISCOVERY_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
95
96/// The default port for discv4 via UDP
97///
98/// Note: the default TCP port is the same.
99pub const DEFAULT_DISCOVERY_PORT: u16 = 30303;
100
101/// The default address for discv4 via UDP: "0.0.0.0:30303"
102///
103/// Note: The default TCP address is the same.
104pub const DEFAULT_DISCOVERY_ADDRESS: SocketAddr =
105    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT));
106
107/// The maximum size of any packet is 1280 bytes.
108const MAX_PACKET_SIZE: usize = 1280;
109
110/// Length of the UDP datagram packet-header: Hash(32b) + Signature(65b) + Packet Type(1b)
111const MIN_PACKET_SIZE: usize = 32 + 65 + 1;
112
113/// Concurrency factor for `FindNode` requests to pick `ALPHA` closest nodes, <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>
114const ALPHA: usize = 3;
115
116/// Maximum number of nodes to ping at concurrently.
117///
118/// This corresponds to 2 full `Neighbours` responses with 16 _new_ nodes. This will apply some
119/// backpressure in recursive lookups.
120const MAX_NODES_PING: usize = 2 * MAX_NODES_PER_BUCKET;
121
122/// Maximum number of pings to keep queued.
123///
124/// If we are currently sending too many pings, any new pings will be queued. To prevent unbounded
125/// growth of the queue, the queue has a maximum capacity, after which any additional pings will be
126/// discarded.
127///
128/// This corresponds to 2 full `Neighbours` responses with 16 new nodes.
129const MAX_QUEUED_PINGS: usize = 2 * MAX_NODES_PER_BUCKET;
130
131/// The size of the datagram is limited [`MAX_PACKET_SIZE`], 16 nodes, as the discv4 specifies don't
132/// fit in one datagram. The safe number of nodes that always fit in a datagram is 12, with worst
133/// case all of them being IPv6 nodes. This is calculated by `(MAX_PACKET_SIZE - (header + expire +
134/// rlp overhead) / size(rlp(Node_IPv6))`
135/// Even in the best case where all nodes are IPv4, only 14 nodes fit into one packet.
136const SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS: usize = (MAX_PACKET_SIZE - 109) / 91;
137
138/// The timeout used to identify expired nodes, 24h
139///
140/// Mirrors geth's `bondExpiration` of 24h
141const ENDPOINT_PROOF_EXPIRATION: Duration = Duration::from_secs(24 * 60 * 60);
142
143/// Duration used to expire nodes from the routing table 1hr
144const EXPIRE_DURATION: Duration = Duration::from_secs(60 * 60);
145
146// Restricts how many udp messages can be processed in a single [Discv4Service::poll] call.
147//
148// This will act as a manual yield point when draining the socket messages where the most CPU
149// expensive part is handling outgoing messages: encoding and hashing the packet
150const UDP_MESSAGE_POLL_LOOP_BUDGET: i32 = 4;
151
152type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>;
153type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>;
154
155pub(crate) type IngressSender = mpsc::Sender<IngressEvent>;
156pub(crate) type IngressReceiver = mpsc::Receiver<IngressEvent>;
157
158type NodeRecordSender = OneshotSender<Vec<NodeRecord>>;
159
160/// The Discv4 frontend.
161///
162/// This is a cloneable type that communicates with the [`Discv4Service`] by sending commands over a
163/// shared channel.
164///
165/// See also [`Discv4::spawn`]
166#[derive(Debug, Clone)]
167pub struct Discv4 {
168    /// The address of the udp socket
169    local_addr: SocketAddr,
170    /// channel to send commands over to the service
171    to_service: mpsc::UnboundedSender<Discv4Command>,
172    /// Tracks the local node record.
173    ///
174    /// This includes the currently tracked external IP address of the node.
175    node_record: Arc<Mutex<NodeRecord>>,
176}
177
178impl Discv4 {
179    /// Same as [`Self::bind`] but also spawns the service onto a new task.
180    ///
181    /// See also: [`Discv4Service::spawn()`]
182    pub async fn spawn(
183        local_address: SocketAddr,
184        local_enr: NodeRecord,
185        secret_key: SecretKey,
186        config: Discv4Config,
187    ) -> io::Result<Self> {
188        let (discv4, service) = Self::bind(local_address, local_enr, secret_key, config).await?;
189
190        service.spawn();
191
192        Ok(discv4)
193    }
194
195    /// Returns a new instance with the given channel directly
196    ///
197    /// NOTE: this is only intended for test setups.
198    #[cfg(feature = "test-utils")]
199    pub fn noop() -> Self {
200        let (to_service, _rx) = mpsc::unbounded_channel();
201        let local_addr =
202            (IpAddr::from(std::net::Ipv4Addr::UNSPECIFIED), DEFAULT_DISCOVERY_PORT).into();
203        Self {
204            local_addr,
205            to_service,
206            node_record: Arc::new(Mutex::new(NodeRecord::new(
207                "127.0.0.1:3030".parse().unwrap(),
208                PeerId::random(),
209            ))),
210        }
211    }
212
213    /// Binds a new `UdpSocket` and creates the service
214    ///
215    /// ```
216    /// # use std::io;
217    /// use reth_discv4::{Discv4, Discv4Config};
218    /// use reth_network_peers::{pk2id, NodeRecord, PeerId};
219    /// use secp256k1::SECP256K1;
220    /// use std::{net::SocketAddr, str::FromStr};
221    /// # async fn t() -> io::Result<()> {
222    /// // generate a (random) keypair
223    /// let (secret_key, pk) = SECP256K1.generate_keypair(&mut rand_08::thread_rng());
224    /// let id = pk2id(&pk);
225    ///
226    /// let socket = SocketAddr::from_str("0.0.0.0:0").unwrap();
227    /// let local_enr =
228    ///     NodeRecord { address: socket.ip(), tcp_port: socket.port(), udp_port: socket.port(), id };
229    /// let config = Discv4Config::default();
230    ///
231    /// let (discv4, mut service) = Discv4::bind(socket, local_enr, secret_key, config).await.unwrap();
232    ///
233    /// // get an update strea
234    /// let updates = service.update_stream();
235    ///
236    /// let _handle = service.spawn();
237    ///
238    /// // lookup the local node in the DHT
239    /// let _discovered = discv4.lookup_self().await.unwrap();
240    ///
241    /// # Ok(())
242    /// # }
243    /// ```
244    pub async fn bind(
245        local_address: SocketAddr,
246        mut local_node_record: NodeRecord,
247        secret_key: SecretKey,
248        config: Discv4Config,
249    ) -> io::Result<(Self, Discv4Service)> {
250        let socket = UdpSocket::bind(local_address).await?;
251        let local_addr = socket.local_addr()?;
252        local_node_record.udp_port = local_addr.port();
253        trace!(target: "discv4", ?local_addr,"opened UDP socket");
254
255        let mut service =
256            Discv4Service::new(socket, local_addr, local_node_record, secret_key, config);
257
258        // resolve the external address immediately
259        service.resolve_external_ip();
260
261        let discv4 = service.handle();
262        Ok((discv4, service))
263    }
264
265    /// Returns the address of the UDP socket.
266    pub const fn local_addr(&self) -> SocketAddr {
267        self.local_addr
268    }
269
270    /// Returns the [`NodeRecord`] of the local node.
271    ///
272    /// This includes the currently tracked external IP address of the node.
273    pub fn node_record(&self) -> NodeRecord {
274        *self.node_record.lock()
275    }
276
277    /// Returns the currently tracked external IP of the node.
278    pub fn external_ip(&self) -> IpAddr {
279        self.node_record.lock().address
280    }
281
282    /// Sets the [Interval] used for periodically looking up targets over the network
283    pub fn set_lookup_interval(&self, duration: Duration) {
284        self.send_to_service(Discv4Command::SetLookupInterval(duration))
285    }
286
287    /// Starts a `FindNode` recursive lookup that locates the closest nodes to the given node id. See also: <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>
288    ///
289    /// The lookup initiator starts by picking α closest nodes to the target it knows of. The
290    /// initiator then sends concurrent `FindNode` packets to those nodes. α is a system-wide
291    /// concurrency parameter, such as 3. In the recursive step, the initiator resends `FindNode` to
292    /// nodes it has learned about from previous queries. Of the k nodes the initiator has heard of
293    /// closest to the target, it picks α that it has not yet queried and resends `FindNode` to
294    /// them. Nodes that fail to respond quickly are removed from consideration until and unless
295    /// they do respond.
296    //
297    // If a round of FindNode queries fails to return a node any closer than the closest already
298    // seen, the initiator resends the find node to all of the k closest nodes it has not already
299    // queried. The lookup terminates when the initiator has queried and gotten responses from the k
300    // closest nodes it has seen.
301    pub async fn lookup_self(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
302        self.lookup_node(None).await
303    }
304
305    /// Looks up the given node id.
306    ///
307    /// Returning the closest nodes to the given node id.
308    pub async fn lookup(&self, node_id: PeerId) -> Result<Vec<NodeRecord>, Discv4Error> {
309        self.lookup_node(Some(node_id)).await
310    }
311
312    /// Performs a random lookup for node records.
313    pub async fn lookup_random(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
314        let target = PeerId::random();
315        self.lookup_node(Some(target)).await
316    }
317
318    /// Sends a message to the service to lookup the closest nodes
319    pub fn send_lookup(&self, node_id: PeerId) {
320        let cmd = Discv4Command::Lookup { node_id: Some(node_id), tx: None };
321        self.send_to_service(cmd);
322    }
323
324    async fn lookup_node(&self, node_id: Option<PeerId>) -> Result<Vec<NodeRecord>, Discv4Error> {
325        let (tx, rx) = oneshot::channel();
326        let cmd = Discv4Command::Lookup { node_id, tx: Some(tx) };
327        self.to_service.send(cmd)?;
328        Ok(rx.await?)
329    }
330
331    /// Triggers a new self lookup without expecting a response
332    pub fn send_lookup_self(&self) {
333        let cmd = Discv4Command::Lookup { node_id: None, tx: None };
334        self.send_to_service(cmd);
335    }
336
337    /// Removes the peer from the table, if it exists.
338    pub fn remove_peer(&self, node_id: PeerId) {
339        let cmd = Discv4Command::Remove(node_id);
340        self.send_to_service(cmd);
341    }
342
343    /// Adds the node to the table, if it is not already present.
344    pub fn add_node(&self, node_record: NodeRecord) {
345        let cmd = Discv4Command::Add(node_record);
346        self.send_to_service(cmd);
347    }
348
349    /// Adds the peer and id to the ban list.
350    ///
351    /// This will prevent any future inclusion in the table
352    pub fn ban(&self, node_id: PeerId, ip: IpAddr) {
353        let cmd = Discv4Command::Ban(node_id, ip);
354        self.send_to_service(cmd);
355    }
356
357    /// Adds the ip to the ban list.
358    ///
359    /// This will prevent any future inclusion in the table
360    pub fn ban_ip(&self, ip: IpAddr) {
361        let cmd = Discv4Command::BanIp(ip);
362        self.send_to_service(cmd);
363    }
364
365    /// Adds the peer to the ban list.
366    ///
367    /// This will prevent any future inclusion in the table
368    pub fn ban_node(&self, node_id: PeerId) {
369        let cmd = Discv4Command::BanPeer(node_id);
370        self.send_to_service(cmd);
371    }
372
373    /// Sets the tcp port
374    ///
375    /// This will update our [`NodeRecord`]'s tcp port.
376    pub fn set_tcp_port(&self, port: u16) {
377        let cmd = Discv4Command::SetTcpPort(port);
378        self.send_to_service(cmd);
379    }
380
381    /// Sets the pair in the EIP-868 [`Enr`] of the node.
382    ///
383    /// If the key already exists, this will update it.
384    ///
385    /// CAUTION: The value **must** be rlp encoded
386    pub fn set_eip868_rlp_pair(&self, key: Vec<u8>, rlp: Bytes) {
387        let cmd = Discv4Command::SetEIP868RLPPair { key, rlp };
388        self.send_to_service(cmd);
389    }
390
391    /// Sets the pair in the EIP-868 [`Enr`] of the node.
392    ///
393    /// If the key already exists, this will update it.
394    pub fn set_eip868_rlp(&self, key: Vec<u8>, value: impl alloy_rlp::Encodable) {
395        self.set_eip868_rlp_pair(key, Bytes::from(alloy_rlp::encode(&value)))
396    }
397
398    #[inline]
399    fn send_to_service(&self, cmd: Discv4Command) {
400        let _ = self.to_service.send(cmd).map_err(|err| {
401            debug!(
402                target: "discv4",
403                %err,
404                "channel capacity reached, dropping command",
405            )
406        });
407    }
408
409    /// Returns the receiver half of new listener channel that streams [`DiscoveryUpdate`]s.
410    pub async fn update_stream(&self) -> Result<ReceiverStream<DiscoveryUpdate>, Discv4Error> {
411        let (tx, rx) = oneshot::channel();
412        let cmd = Discv4Command::Updates(tx);
413        self.to_service.send(cmd)?;
414        Ok(rx.await?)
415    }
416
417    /// Terminates the spawned [`Discv4Service`].
418    pub fn terminate(&self) {
419        self.send_to_service(Discv4Command::Terminated);
420    }
421}
422
423/// Manages discv4 peer discovery over UDP.
424///
425/// This is a [Stream] to handles incoming and outgoing discv4 messages and emits updates via:
426/// [`Discv4Service::update_stream`].
427///
428/// This type maintains the discv Kademlia routing table and is responsible for performing lookups.
429///
430/// ## Lookups
431///
432/// See also [Recursive Lookups](https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup).
433/// Lookups are either triggered periodically or performaned on demand: [`Discv4::lookup`]
434/// Newly discovered nodes are emitted as [`DiscoveryUpdate::Added`] event to all subscribers:
435/// [`Discv4Service::update_stream`].
436#[must_use = "Stream does nothing unless polled"]
437pub struct Discv4Service {
438    /// Local address of the UDP socket.
439    local_address: SocketAddr,
440    /// The local ENR for EIP-868 <https://eips.ethereum.org/EIPS/eip-868>
441    local_eip_868_enr: Enr<SecretKey>,
442    /// Local ENR of the server.
443    local_node_record: NodeRecord,
444    /// Keeps track of the node record of the local node.
445    shared_node_record: Arc<Mutex<NodeRecord>>,
446    /// The secret key used to sign payloads
447    secret_key: SecretKey,
448    /// The UDP socket for sending and receiving messages.
449    _socket: Arc<UdpSocket>,
450    /// The spawned UDP tasks.
451    ///
452    /// Note: If dropped, the spawned send+receive tasks are aborted.
453    _tasks: JoinSet<()>,
454    /// The routing table.
455    kbuckets: KBucketsTable<NodeKey, NodeEntry>,
456    /// Receiver for incoming messages
457    ///
458    /// Receives incoming messages from the UDP task.
459    ingress: IngressReceiver,
460    /// Sender for sending outgoing messages
461    ///
462    /// Sends outgoind messages to the UDP task.
463    egress: EgressSender,
464    /// Buffered pending pings to apply backpressure.
465    ///
466    /// Lookups behave like bursts of requests: Endpoint proof followed by `FindNode` request. [Recursive lookups](https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup) can trigger multiple followup Pings+FindNode requests.
467    /// A cap on concurrent `Ping` prevents escalation where: A large number of new nodes
468    /// discovered via `FindNode` in a recursive lookup triggers a large number of `Ping`s, and
469    /// followup `FindNode` requests.... Buffering them effectively prevents high `Ping` peaks.
470    queued_pings: VecDeque<(NodeRecord, PingReason)>,
471    /// Currently active pings to specific nodes.
472    pending_pings: HashMap<PeerId, PingRequest>,
473    /// Currently active endpoint proof verification lookups to specific nodes.
474    ///
475    /// Entries here means we've proven the peer's endpoint but haven't completed our end of the
476    /// endpoint proof
477    pending_lookup: HashMap<PeerId, (Instant, LookupContext)>,
478    /// Currently active `FindNode` requests
479    pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
480    /// Currently active ENR requests
481    pending_enr_requests: HashMap<PeerId, EnrRequestState>,
482    /// Copy of he sender half of the commands channel for [Discv4]
483    to_service: mpsc::UnboundedSender<Discv4Command>,
484    /// Receiver half of the commands channel for [Discv4]
485    commands_rx: mpsc::UnboundedReceiver<Discv4Command>,
486    /// All subscribers for table updates
487    update_listeners: Vec<mpsc::Sender<DiscoveryUpdate>>,
488    /// The interval when to trigger random lookups
489    lookup_interval: Interval,
490    /// Used to rotate targets to lookup
491    lookup_rotator: LookupTargetRotator,
492    /// Interval when to recheck active requests
493    evict_expired_requests_interval: Interval,
494    /// Interval when to resend pings.
495    ping_interval: Interval,
496    /// The interval at which to attempt resolving external IP again.
497    resolve_external_ip_interval: Option<ResolveNatInterval>,
498    /// How this services is configured
499    config: Discv4Config,
500    /// Buffered events populated during poll.
501    queued_events: VecDeque<Discv4Event>,
502    /// Keeps track of nodes from which we have received a `Pong` message.
503    received_pongs: PongTable,
504    /// Interval used to expire additionally tracked nodes
505    expire_interval: Interval,
506}
507
508impl Discv4Service {
509    /// Create a new instance for a bound [`UdpSocket`].
510    pub(crate) fn new(
511        socket: UdpSocket,
512        local_address: SocketAddr,
513        local_node_record: NodeRecord,
514        secret_key: SecretKey,
515        config: Discv4Config,
516    ) -> Self {
517        let socket = Arc::new(socket);
518        let (ingress_tx, ingress_rx) = mpsc::channel(config.udp_ingress_message_buffer);
519        let (egress_tx, egress_rx) = mpsc::channel(config.udp_egress_message_buffer);
520        let mut tasks = JoinSet::<()>::new();
521
522        let udp = Arc::clone(&socket);
523        tasks.spawn(receive_loop(udp, ingress_tx, local_node_record.id));
524
525        let udp = Arc::clone(&socket);
526        tasks.spawn(send_loop(udp, egress_rx));
527
528        let kbuckets = KBucketsTable::new(
529            NodeKey::from(&local_node_record).into(),
530            Duration::from_secs(60),
531            MAX_NODES_PER_BUCKET,
532            None,
533            None,
534        );
535
536        let self_lookup_interval = tokio::time::interval(config.lookup_interval);
537
538        // Wait `ping_interval` and then start pinging every `ping_interval` because we want to wait
539        // for
540        let ping_interval = tokio::time::interval_at(
541            tokio::time::Instant::now() + config.ping_interval,
542            config.ping_interval,
543        );
544
545        let evict_expired_requests_interval = tokio::time::interval_at(
546            tokio::time::Instant::now() + config.request_timeout,
547            config.request_timeout,
548        );
549
550        let lookup_rotator = if config.enable_dht_random_walk {
551            LookupTargetRotator::default()
552        } else {
553            LookupTargetRotator::local_only()
554        };
555
556        // for EIP-868 construct an ENR
557        let local_eip_868_enr = {
558            let mut builder = Enr::builder();
559            builder.ip(local_node_record.address);
560            if local_node_record.address.is_ipv4() {
561                builder.udp4(local_node_record.udp_port);
562                builder.tcp4(local_node_record.tcp_port);
563            } else {
564                builder.udp6(local_node_record.udp_port);
565                builder.tcp6(local_node_record.tcp_port);
566            }
567
568            for (key, val) in &config.additional_eip868_rlp_pairs {
569                builder.add_value_rlp(key, val.clone());
570            }
571            builder.build(&secret_key).expect("v4 is set")
572        };
573
574        let (to_service, commands_rx) = mpsc::unbounded_channel();
575
576        let shared_node_record = Arc::new(Mutex::new(local_node_record));
577
578        Self {
579            local_address,
580            local_eip_868_enr,
581            local_node_record,
582            shared_node_record,
583            _socket: socket,
584            kbuckets,
585            secret_key,
586            _tasks: tasks,
587            ingress: ingress_rx,
588            egress: egress_tx,
589            queued_pings: VecDeque::with_capacity(MAX_QUEUED_PINGS),
590            pending_pings: Default::default(),
591            pending_lookup: Default::default(),
592            pending_find_nodes: Default::default(),
593            pending_enr_requests: Default::default(),
594            commands_rx,
595            to_service,
596            update_listeners: Vec::with_capacity(1),
597            lookup_interval: self_lookup_interval,
598            ping_interval,
599            evict_expired_requests_interval,
600            lookup_rotator,
601            resolve_external_ip_interval: config.resolve_external_ip_interval(),
602            config,
603            queued_events: Default::default(),
604            received_pongs: Default::default(),
605            expire_interval: tokio::time::interval(EXPIRE_DURATION),
606        }
607    }
608
609    /// Returns the frontend handle that can communicate with the service via commands.
610    pub fn handle(&self) -> Discv4 {
611        Discv4 {
612            local_addr: self.local_address,
613            to_service: self.to_service.clone(),
614            node_record: self.shared_node_record.clone(),
615        }
616    }
617
618    /// Returns the current enr sequence of the local record.
619    fn enr_seq(&self) -> Option<u64> {
620        self.config.enable_eip868.then(|| self.local_eip_868_enr.seq())
621    }
622
623    /// Sets the [Interval] used for periodically looking up targets over the network
624    pub fn set_lookup_interval(&mut self, duration: Duration) {
625        self.lookup_interval = tokio::time::interval(duration);
626    }
627
628    /// Sets the external Ip to the configured external IP if [`NatResolver::ExternalIp`].
629    fn resolve_external_ip(&mut self) {
630        if let Some(r) = &self.resolve_external_ip_interval {
631            if let Some(external_ip) = r.resolver().as_external_ip() {
632                self.set_external_ip_addr(external_ip);
633            }
634        }
635    }
636
637    /// Sets the given ip address as the node's external IP in the node record announced in
638    /// discovery
639    pub fn set_external_ip_addr(&mut self, external_ip: IpAddr) {
640        if self.local_node_record.address != external_ip {
641            debug!(target: "discv4", ?external_ip, "Updating external ip");
642            self.local_node_record.address = external_ip;
643            let _ = self.local_eip_868_enr.set_ip(external_ip, &self.secret_key);
644            let mut lock = self.shared_node_record.lock();
645            *lock = self.local_node_record;
646            debug!(target: "discv4", enr=?self.local_eip_868_enr, "Updated local ENR");
647        }
648    }
649
650    /// Returns the [`PeerId`] that identifies this node
651    pub const fn local_peer_id(&self) -> &PeerId {
652        &self.local_node_record.id
653    }
654
655    /// Returns the address of the UDP socket
656    pub const fn local_addr(&self) -> SocketAddr {
657        self.local_address
658    }
659
660    /// Returns the ENR of this service.
661    ///
662    /// Note: this will include the external address if resolved.
663    pub const fn local_enr(&self) -> NodeRecord {
664        self.local_node_record
665    }
666
667    /// Returns mutable reference to ENR for testing.
668    #[cfg(test)]
669    pub const fn local_enr_mut(&mut self) -> &mut NodeRecord {
670        &mut self.local_node_record
671    }
672
673    /// Returns true if the given `PeerId` is currently in the bucket
674    pub fn contains_node(&self, id: PeerId) -> bool {
675        let key = kad_key(id);
676        self.kbuckets.get_index(&key).is_some()
677    }
678
679    /// Bootstraps the local node to join the DHT.
680    ///
681    /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
682    /// own ID in the DHT. This introduces the local node to the other nodes
683    /// in the DHT and populates its routing table with the closest proven neighbours.
684    ///
685    /// This is similar to adding all bootnodes via [`Self::add_node`], but does not fire a
686    /// [`DiscoveryUpdate::Added`] event for the given bootnodes. So boot nodes don't appear in the
687    /// update stream, which is usually desirable, since bootnodes should not be connected to.
688    ///
689    /// If adding the configured bootnodes should result in a [`DiscoveryUpdate::Added`], see
690    /// [`Self::add_all_nodes`].
691    ///
692    /// **Note:** This is a noop if there are no bootnodes.
693    pub fn bootstrap(&mut self) {
694        for record in self.config.bootstrap_nodes.clone() {
695            debug!(target: "discv4", ?record, "pinging boot node");
696            let key = kad_key(record.id);
697            let entry = NodeEntry::new(record);
698
699            // insert the boot node in the table
700            match self.kbuckets.insert_or_update(
701                &key,
702                entry,
703                NodeStatus {
704                    state: ConnectionState::Disconnected,
705                    direction: ConnectionDirection::Outgoing,
706                },
707            ) {
708                InsertResult::Failed(_) => {}
709                _ => {
710                    self.try_ping(record, PingReason::InitialInsert);
711                }
712            }
713        }
714    }
715
716    /// Spawns this services onto a new task
717    ///
718    /// Note: requires a running tokio runtime
719    pub fn spawn(mut self) -> JoinHandle<()> {
720        tokio::task::spawn(async move {
721            self.bootstrap();
722
723            while let Some(event) = self.next().await {
724                trace!(target: "discv4", ?event, "processed");
725            }
726            trace!(target: "discv4", "service terminated");
727        })
728    }
729
730    /// Creates a new bounded channel for [`DiscoveryUpdate`]s.
731    pub fn update_stream(&mut self) -> ReceiverStream<DiscoveryUpdate> {
732        let (tx, rx) = mpsc::channel(512);
733        self.update_listeners.push(tx);
734        ReceiverStream::new(rx)
735    }
736
737    /// Looks up the local node in the DHT.
738    pub fn lookup_self(&mut self) {
739        self.lookup(self.local_node_record.id)
740    }
741
742    /// Looks up the given node in the DHT
743    ///
744    /// A `FindNode` packet requests information about nodes close to target. The target is a
745    /// 64-byte secp256k1 public key. When `FindNode` is received, the recipient should reply
746    /// with Neighbors packets containing the closest 16 nodes to target found in its local
747    /// table.
748    //
749    // To guard against traffic amplification attacks, Neighbors replies should only be sent if the
750    // sender of FindNode has been verified by the endpoint proof procedure.
751    pub fn lookup(&mut self, target: PeerId) {
752        self.lookup_with(target, None)
753    }
754
755    /// Starts the recursive lookup process for the given target, <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>.
756    ///
757    /// At first the `ALPHA` (==3, defined concurrency factor) nodes that are closest to the target
758    /// in the underlying DHT are selected to seed the lookup via `FindNode` requests. In the
759    /// recursive step, the initiator resends `FindNode` to nodes it has learned about from previous
760    /// queries.
761    ///
762    /// This takes an optional Sender through which all successfully discovered nodes are sent once
763    /// the request has finished.
764    fn lookup_with(&mut self, target: PeerId, tx: Option<NodeRecordSender>) {
765        trace!(target: "discv4", ?target, "Starting lookup");
766        let target_key = kad_key(target);
767
768        // Start a lookup context with the 16 (MAX_NODES_PER_BUCKET) closest nodes to which we have
769        // a valid endpoint proof
770        let ctx = LookupContext::new(
771            target_key.clone(),
772            self.kbuckets
773                .closest_values(&target_key)
774                .filter(|node| {
775                    node.value.has_endpoint_proof &&
776                        !self.pending_find_nodes.contains_key(&node.key.preimage().0)
777                })
778                .take(MAX_NODES_PER_BUCKET)
779                .map(|n| (target_key.distance(&n.key), n.value.record)),
780            tx,
781        );
782
783        // From those 16, pick the 3 closest to start the concurrent lookup.
784        let closest = ctx.closest(ALPHA);
785
786        if closest.is_empty() && self.pending_find_nodes.is_empty() {
787            // no closest nodes, and no lookup in progress: table is empty.
788            // This could happen if all records were deleted from the table due to missed pongs
789            // (e.g. connectivity problems over a long period of time, or issues during initial
790            // bootstrapping) so we attempt to bootstrap again
791            self.bootstrap();
792            return
793        }
794
795        trace!(target: "discv4", ?target, num = closest.len(), "Start lookup closest nodes");
796
797        for node in closest {
798            // here we still want to check against previous request failures and if necessary
799            // re-establish a new endpoint proof because it can be the case that the other node lost
800            // our entry and no longer has an endpoint proof on their end
801            self.find_node_checked(&node, ctx.clone());
802        }
803    }
804
805    /// Sends a new `FindNode` packet to the node with `target` as the lookup target.
806    ///
807    /// CAUTION: This expects there's a valid Endpoint proof to the given `node`.
808    fn find_node(&mut self, node: &NodeRecord, ctx: LookupContext) {
809        trace!(target: "discv4", ?node, lookup=?ctx.target(), "Sending FindNode");
810        ctx.mark_queried(node.id);
811        let id = ctx.target();
812        let msg = Message::FindNode(FindNode { id, expire: self.find_node_expiration() });
813        self.send_packet(msg, node.udp_addr());
814        self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx));
815    }
816
817    /// Sends a new `FindNode` packet to the node with `target` as the lookup target but checks
818    /// whether we should send a new ping first to renew the endpoint proof by checking the
819    /// previously failed findNode requests. It could be that the node is no longer reachable or
820    /// lost our entry.
821    fn find_node_checked(&mut self, node: &NodeRecord, ctx: LookupContext) {
822        let max_failures = self.config.max_find_node_failures;
823        let needs_ping = self
824            .on_entry(node.id, |entry| entry.exceeds_find_node_failures(max_failures))
825            .unwrap_or(true);
826        if needs_ping {
827            self.try_ping(*node, PingReason::Lookup(*node, ctx))
828        } else {
829            self.find_node(node, ctx)
830        }
831    }
832
833    /// Notifies all listeners.
834    ///
835    /// Removes all listeners that are closed.
836    fn notify(&mut self, update: DiscoveryUpdate) {
837        self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) {
838            Ok(()) => true,
839            Err(err) => match err {
840                TrySendError::Full(_) => true,
841                TrySendError::Closed(_) => false,
842            },
843        });
844    }
845
846    /// Adds the ip to the ban list indefinitely
847    pub fn ban_ip(&mut self, ip: IpAddr) {
848        self.config.ban_list.ban_ip(ip);
849    }
850
851    /// Adds the peer to the ban list indefinitely.
852    pub fn ban_node(&mut self, node_id: PeerId) {
853        self.remove_node(node_id);
854        self.config.ban_list.ban_peer(node_id);
855    }
856
857    /// Adds the ip to the ban list until the given timestamp.
858    pub fn ban_ip_until(&mut self, ip: IpAddr, until: Instant) {
859        self.config.ban_list.ban_ip_until(ip, until);
860    }
861
862    /// Adds the peer to the ban list and bans it until the given timestamp
863    pub fn ban_node_until(&mut self, node_id: PeerId, until: Instant) {
864        self.remove_node(node_id);
865        self.config.ban_list.ban_peer_until(node_id, until);
866    }
867
868    /// Removes a `node_id` from the routing table.
869    ///
870    /// This allows applications, for whatever reason, to remove nodes from the local routing
871    /// table. Returns `true` if the node was in the table and `false` otherwise.
872    pub fn remove_node(&mut self, node_id: PeerId) -> bool {
873        let key = kad_key(node_id);
874        self.remove_key(node_id, key)
875    }
876
877    /// Removes a `node_id` from the routing table but only if there are enough other nodes in the
878    /// bucket (bucket must be at least half full)
879    ///
880    /// Returns `true` if the node was removed
881    pub fn soft_remove_node(&mut self, node_id: PeerId) -> bool {
882        let key = kad_key(node_id);
883        let Some(bucket) = self.kbuckets.get_bucket(&key) else { return false };
884        if bucket.num_entries() < MAX_NODES_PER_BUCKET / 2 {
885            // skip half empty bucket
886            return false
887        }
888        self.remove_key(node_id, key)
889    }
890
891    fn remove_key(&mut self, node_id: PeerId, key: discv5::Key<NodeKey>) -> bool {
892        let removed = self.kbuckets.remove(&key);
893        if removed {
894            trace!(target: "discv4", ?node_id, "removed node");
895            self.notify(DiscoveryUpdate::Removed(node_id));
896        }
897        removed
898    }
899
900    /// Gets the number of entries that are considered connected.
901    pub fn num_connected(&self) -> usize {
902        self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected())
903    }
904
905    /// Check if the peer has an active bond.
906    fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool {
907        if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) {
908            if timestamp.elapsed() < self.config.bond_expiration {
909                return true
910            }
911        }
912        false
913    }
914
915    /// Applies a closure on the pending or present [`NodeEntry`].
916    fn on_entry<F, R>(&mut self, peer_id: PeerId, f: F) -> Option<R>
917    where
918        F: FnOnce(&NodeEntry) -> R,
919    {
920        let key = kad_key(peer_id);
921        match self.kbuckets.entry(&key) {
922            BucketEntry::Present(entry, _) => Some(f(entry.value())),
923            BucketEntry::Pending(mut entry, _) => Some(f(entry.value())),
924            _ => None,
925        }
926    }
927
928    /// Update the entry on RE-ping.
929    ///
930    /// Invoked when we received the Pong to our [`PingReason::RePing`] ping.
931    ///
932    /// On re-ping we check for a changed `enr_seq` if eip868 is enabled and when it changed we sent
933    /// a followup request to retrieve the updated ENR
934    fn update_on_reping(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
935        if record.id == self.local_node_record.id {
936            return
937        }
938
939        // If EIP868 extension is disabled then we want to ignore this
940        if !self.config.enable_eip868 {
941            last_enr_seq = None;
942        }
943
944        let key = kad_key(record.id);
945        let old_enr = match self.kbuckets.entry(&key) {
946            kbucket::Entry::Present(mut entry, _) => {
947                entry.value_mut().update_with_enr(last_enr_seq)
948            }
949            kbucket::Entry::Pending(mut entry, _) => entry.value().update_with_enr(last_enr_seq),
950            _ => return,
951        };
952
953        // Check if ENR was updated
954        match (last_enr_seq, old_enr) {
955            (Some(new), Some(old)) => {
956                if new > old {
957                    self.send_enr_request(record);
958                }
959            }
960            (Some(_), None) => {
961                // got an ENR
962                self.send_enr_request(record);
963            }
964            _ => {}
965        };
966    }
967
968    /// Callback invoked when we receive a pong from the peer.
969    fn update_on_pong(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
970        if record.id == *self.local_peer_id() {
971            return
972        }
973
974        // If EIP868 extension is disabled then we want to ignore this
975        if !self.config.enable_eip868 {
976            last_enr_seq = None;
977        }
978
979        // if the peer included a enr seq in the pong then we can try to request the ENR of that
980        // node
981        let has_enr_seq = last_enr_seq.is_some();
982
983        let key = kad_key(record.id);
984        match self.kbuckets.entry(&key) {
985            kbucket::Entry::Present(mut entry, old_status) => {
986                // endpoint is now proven
987                entry.value_mut().establish_proof();
988                entry.value_mut().update_with_enr(last_enr_seq);
989
990                if !old_status.is_connected() {
991                    let _ = entry.update(ConnectionState::Connected, Some(old_status.direction));
992                    trace!(target: "discv4", ?record, "added after successful endpoint proof");
993                    self.notify(DiscoveryUpdate::Added(record));
994
995                    if has_enr_seq {
996                        // request the ENR of the node
997                        self.send_enr_request(record);
998                    }
999                }
1000            }
1001            kbucket::Entry::Pending(mut entry, mut status) => {
1002                // endpoint is now proven
1003                entry.value().establish_proof();
1004                entry.value().update_with_enr(last_enr_seq);
1005
1006                if !status.is_connected() {
1007                    status.state = ConnectionState::Connected;
1008                    let _ = entry.update(status);
1009                    trace!(target: "discv4", ?record, "added after successful endpoint proof");
1010                    self.notify(DiscoveryUpdate::Added(record));
1011
1012                    if has_enr_seq {
1013                        // request the ENR of the node
1014                        self.send_enr_request(record);
1015                    }
1016                }
1017            }
1018            _ => {}
1019        };
1020    }
1021
1022    /// Adds all nodes
1023    ///
1024    /// See [`Self::add_node`]
1025    pub fn add_all_nodes(&mut self, records: impl IntoIterator<Item = NodeRecord>) {
1026        for record in records {
1027            self.add_node(record);
1028        }
1029    }
1030
1031    /// If the node's not in the table yet, this will add it to the table and start the endpoint
1032    /// proof by sending a ping to the node.
1033    ///
1034    /// Returns `true` if the record was added successfully, and `false` if the node is either
1035    /// already in the table or the record's bucket is full.
1036    pub fn add_node(&mut self, record: NodeRecord) -> bool {
1037        let key = kad_key(record.id);
1038        match self.kbuckets.entry(&key) {
1039            kbucket::Entry::Absent(entry) => {
1040                let node = NodeEntry::new(record);
1041                match entry.insert(
1042                    node,
1043                    NodeStatus {
1044                        direction: ConnectionDirection::Outgoing,
1045                        state: ConnectionState::Disconnected,
1046                    },
1047                ) {
1048                    BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1049                        trace!(target: "discv4", ?record, "inserted new record");
1050                    }
1051                    _ => return false,
1052                }
1053            }
1054            _ => return false,
1055        }
1056
1057        // send the initial ping to the _new_ node
1058        self.try_ping(record, PingReason::InitialInsert);
1059        true
1060    }
1061
1062    /// Encodes the packet, sends it and returns the hash.
1063    pub(crate) fn send_packet(&self, msg: Message, to: SocketAddr) -> B256 {
1064        let (payload, hash) = msg.encode(&self.secret_key);
1065        trace!(target: "discv4", r#type=?msg.msg_type(), ?to, ?hash, "sending packet");
1066        let _ = self.egress.try_send((payload, to)).map_err(|err| {
1067            debug!(
1068                target: "discv4",
1069                %err,
1070                "dropped outgoing packet",
1071            );
1072        });
1073        hash
1074    }
1075
1076    /// Message handler for an incoming `Ping`
1077    fn on_ping(&mut self, ping: Ping, remote_addr: SocketAddr, remote_id: PeerId, hash: B256) {
1078        if self.is_expired(ping.expire) {
1079            // ping's expiration timestamp is in the past
1080            return
1081        }
1082
1083        // create the record
1084        let record = NodeRecord {
1085            address: remote_addr.ip(),
1086            udp_port: remote_addr.port(),
1087            tcp_port: ping.from.tcp_port,
1088            id: remote_id,
1089        }
1090        .into_ipv4_mapped();
1091
1092        let key = kad_key(record.id);
1093
1094        // See also <https://github.com/ethereum/devp2p/blob/master/discv4.md#ping-packet-0x01>:
1095        // > If no communication with the sender of this ping has occurred within the last 12h, a
1096        // > ping should be sent in addition to pong in order to receive an endpoint proof.
1097        //
1098        // Note: we only mark if the node is absent because the `last 12h` condition is handled by
1099        // the ping interval
1100        let mut is_new_insert = false;
1101        let mut needs_bond = false;
1102        let mut is_proven = false;
1103
1104        let old_enr = match self.kbuckets.entry(&key) {
1105            kbucket::Entry::Present(mut entry, _) => {
1106                if entry.value().is_expired() {
1107                    // If no communication with the sender has occurred within the last 12h, a ping
1108                    // should be sent in addition to pong in order to receive an endpoint proof.
1109                    needs_bond = true;
1110                } else {
1111                    is_proven = entry.value().has_endpoint_proof;
1112                }
1113                entry.value_mut().update_with_enr(ping.enr_sq)
1114            }
1115            kbucket::Entry::Pending(mut entry, _) => {
1116                if entry.value().is_expired() {
1117                    // If no communication with the sender has occurred within the last 12h, a ping
1118                    // should be sent in addition to pong in order to receive an endpoint proof.
1119                    needs_bond = true;
1120                } else {
1121                    is_proven = entry.value().has_endpoint_proof;
1122                }
1123                entry.value().update_with_enr(ping.enr_sq)
1124            }
1125            kbucket::Entry::Absent(entry) => {
1126                let mut node = NodeEntry::new(record);
1127                node.last_enr_seq = ping.enr_sq;
1128
1129                match entry.insert(
1130                    node,
1131                    NodeStatus {
1132                        direction: ConnectionDirection::Incoming,
1133                        // mark as disconnected until endpoint proof established on pong
1134                        state: ConnectionState::Disconnected,
1135                    },
1136                ) {
1137                    BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1138                        // mark as new insert if insert was successful
1139                        is_new_insert = true;
1140                    }
1141                    BucketInsertResult::Full => {
1142                        // we received a ping but the corresponding bucket for the peer is already
1143                        // full, we can't add any additional peers to that bucket, but we still want
1144                        // to emit an event that we discovered the node
1145                        trace!(target: "discv4", ?record, "discovered new record but bucket is full");
1146                        self.notify(DiscoveryUpdate::DiscoveredAtCapacity(record));
1147                        needs_bond = true;
1148                    }
1149                    BucketInsertResult::TooManyIncoming | BucketInsertResult::NodeExists => {
1150                        needs_bond = true;
1151                        // insert unsuccessful but we still want to send the pong
1152                    }
1153                    BucketInsertResult::FailedFilter => return,
1154                }
1155
1156                None
1157            }
1158            kbucket::Entry::SelfEntry => return,
1159        };
1160
1161        // send the pong first, but the PONG and optionally PING don't need to be send in a
1162        // particular order
1163        let pong = Message::Pong(Pong {
1164            // we use the actual address of the peer
1165            to: record.into(),
1166            echo: hash,
1167            expire: ping.expire,
1168            enr_sq: self.enr_seq(),
1169        });
1170        self.send_packet(pong, remote_addr);
1171
1172        // if node was absent also send a ping to establish the endpoint proof from our end
1173        if is_new_insert {
1174            self.try_ping(record, PingReason::InitialInsert);
1175        } else if needs_bond {
1176            self.try_ping(record, PingReason::EstablishBond);
1177        } else if is_proven {
1178            // if node has been proven, this means we've received a pong and verified its endpoint
1179            // proof. We've also sent a pong above to verify our endpoint proof, so we can now
1180            // send our find_nodes request if PingReason::Lookup
1181            if let Some((_, ctx)) = self.pending_lookup.remove(&record.id) {
1182                if self.pending_find_nodes.contains_key(&record.id) {
1183                    // there's already another pending request, unmark it so the next round can
1184                    // try to send it
1185                    ctx.unmark_queried(record.id);
1186                } else {
1187                    // we just received a ping from that peer so we can send a find node request
1188                    // directly
1189                    self.find_node(&record, ctx);
1190                }
1191            }
1192        } else {
1193            // Request ENR if included in the ping
1194            match (ping.enr_sq, old_enr) {
1195                (Some(new), Some(old)) => {
1196                    if new > old {
1197                        self.send_enr_request(record);
1198                    }
1199                }
1200                (Some(_), None) => {
1201                    self.send_enr_request(record);
1202                }
1203                _ => {}
1204            };
1205        }
1206    }
1207
1208    // Guarding function for [`Self::send_ping`] that applies pre-checks
1209    fn try_ping(&mut self, node: NodeRecord, reason: PingReason) {
1210        if node.id == *self.local_peer_id() {
1211            // don't ping ourselves
1212            return
1213        }
1214
1215        if self.pending_pings.contains_key(&node.id) ||
1216            self.pending_find_nodes.contains_key(&node.id)
1217        {
1218            return
1219        }
1220
1221        if self.queued_pings.iter().any(|(n, _)| n.id == node.id) {
1222            return
1223        }
1224
1225        if self.pending_pings.len() < MAX_NODES_PING {
1226            self.send_ping(node, reason);
1227        } else if self.queued_pings.len() < MAX_QUEUED_PINGS {
1228            self.queued_pings.push_back((node, reason));
1229        }
1230    }
1231
1232    /// Sends a ping message to the node's UDP address.
1233    ///
1234    /// Returns the echo hash of the ping message.
1235    pub(crate) fn send_ping(&mut self, node: NodeRecord, reason: PingReason) -> B256 {
1236        let remote_addr = node.udp_addr();
1237        let id = node.id;
1238        let ping = Ping {
1239            from: self.local_node_record.into(),
1240            to: node.into(),
1241            expire: self.ping_expiration(),
1242            enr_sq: self.enr_seq(),
1243        };
1244        trace!(target: "discv4", ?ping, "sending ping");
1245        let echo_hash = self.send_packet(Message::Ping(ping), remote_addr);
1246
1247        self.pending_pings
1248            .insert(id, PingRequest { sent_at: Instant::now(), node, echo_hash, reason });
1249        echo_hash
1250    }
1251
1252    /// Sends an enr request message to the node's UDP address.
1253    ///
1254    /// Returns the echo hash of the ping message.
1255    pub(crate) fn send_enr_request(&mut self, node: NodeRecord) {
1256        if !self.config.enable_eip868 {
1257            return
1258        }
1259        let remote_addr = node.udp_addr();
1260        let enr_request = EnrRequest { expire: self.enr_request_expiration() };
1261
1262        trace!(target: "discv4", ?enr_request, "sending enr request");
1263        let echo_hash = self.send_packet(Message::EnrRequest(enr_request), remote_addr);
1264
1265        self.pending_enr_requests
1266            .insert(node.id, EnrRequestState { sent_at: Instant::now(), echo_hash });
1267    }
1268
1269    /// Message handler for an incoming `Pong`.
1270    fn on_pong(&mut self, pong: Pong, remote_addr: SocketAddr, remote_id: PeerId) {
1271        if self.is_expired(pong.expire) {
1272            return
1273        }
1274
1275        let PingRequest { node, reason, .. } = match self.pending_pings.entry(remote_id) {
1276            Entry::Occupied(entry) => {
1277                {
1278                    let request = entry.get();
1279                    if request.echo_hash != pong.echo {
1280                        trace!(target: "discv4", from=?remote_addr, expected=?request.echo_hash, echo_hash=?pong.echo,"Got unexpected Pong");
1281                        return
1282                    }
1283                }
1284                entry.remove()
1285            }
1286            Entry::Vacant(_) => return,
1287        };
1288
1289        // keep track of the pong
1290        self.received_pongs.on_pong(remote_id, remote_addr.ip());
1291
1292        match reason {
1293            PingReason::InitialInsert => {
1294                self.update_on_pong(node, pong.enr_sq);
1295            }
1296            PingReason::EstablishBond => {
1297                // same as `InitialInsert` which renews the bond if the peer is in the table
1298                self.update_on_pong(node, pong.enr_sq);
1299            }
1300            PingReason::RePing => {
1301                self.update_on_reping(node, pong.enr_sq);
1302            }
1303            PingReason::Lookup(node, ctx) => {
1304                self.update_on_pong(node, pong.enr_sq);
1305                // insert node and assoc. lookup_context into the pending_lookup table to complete
1306                // our side of the endpoint proof verification.
1307                // Start the lookup timer here - and evict accordingly. Note that this is a separate
1308                // timer than the ping_request timer.
1309                self.pending_lookup.insert(node.id, (Instant::now(), ctx));
1310            }
1311        }
1312    }
1313
1314    /// Handler for an incoming `FindNode` message
1315    fn on_find_node(&mut self, msg: FindNode, remote_addr: SocketAddr, node_id: PeerId) {
1316        if self.is_expired(msg.expire) {
1317            // expiration timestamp is in the past
1318            return
1319        }
1320        if node_id == *self.local_peer_id() {
1321            // ignore find node requests to ourselves
1322            return
1323        }
1324
1325        if self.has_bond(node_id, remote_addr.ip()) {
1326            self.respond_closest(msg.id, remote_addr)
1327        }
1328    }
1329
1330    /// Handler for incoming `EnrResponse` message
1331    fn on_enr_response(&mut self, msg: EnrResponse, remote_addr: SocketAddr, id: PeerId) {
1332        trace!(target: "discv4", ?remote_addr, ?msg, "received ENR response");
1333        if let Some(resp) = self.pending_enr_requests.remove(&id) {
1334            // ensure the ENR's public key matches the expected node id
1335            let enr_id = pk2id(&msg.enr.public_key());
1336            if id != enr_id {
1337                return
1338            }
1339
1340            if resp.echo_hash == msg.request_hash {
1341                let key = kad_key(id);
1342                let fork_id = msg.eth_fork_id();
1343                let (record, old_fork_id) = match self.kbuckets.entry(&key) {
1344                    kbucket::Entry::Present(mut entry, _) => {
1345                        let id = entry.value_mut().update_with_fork_id(fork_id);
1346                        (entry.value().record, id)
1347                    }
1348                    kbucket::Entry::Pending(mut entry, _) => {
1349                        let id = entry.value().update_with_fork_id(fork_id);
1350                        (entry.value().record, id)
1351                    }
1352                    _ => return,
1353                };
1354                match (fork_id, old_fork_id) {
1355                    (Some(new), Some(old)) => {
1356                        if new != old {
1357                            self.notify(DiscoveryUpdate::EnrForkId(record, new))
1358                        }
1359                    }
1360                    (Some(new), None) => self.notify(DiscoveryUpdate::EnrForkId(record, new)),
1361                    _ => {}
1362                }
1363            }
1364        }
1365    }
1366
1367    /// Handler for incoming `EnrRequest` message
1368    fn on_enr_request(
1369        &self,
1370        msg: EnrRequest,
1371        remote_addr: SocketAddr,
1372        id: PeerId,
1373        request_hash: B256,
1374    ) {
1375        if !self.config.enable_eip868 || self.is_expired(msg.expire) {
1376            return
1377        }
1378
1379        if self.has_bond(id, remote_addr.ip()) {
1380            self.send_packet(
1381                Message::EnrResponse(EnrResponse {
1382                    request_hash,
1383                    enr: self.local_eip_868_enr.clone(),
1384                }),
1385                remote_addr,
1386            );
1387        }
1388    }
1389
1390    /// Handler for incoming `Neighbours` messages that are handled if they're responses to
1391    /// `FindNode` requests.
1392    fn on_neighbours(&mut self, msg: Neighbours, remote_addr: SocketAddr, node_id: PeerId) {
1393        if self.is_expired(msg.expire) {
1394            // response is expired
1395            return
1396        }
1397        // check if this request was expected
1398        let ctx = match self.pending_find_nodes.entry(node_id) {
1399            Entry::Occupied(mut entry) => {
1400                {
1401                    let request = entry.get_mut();
1402                    // Mark the request as answered
1403                    request.answered = true;
1404                    let total = request.response_count + msg.nodes.len();
1405
1406                    // Neighbours response is exactly 1 bucket (16 entries).
1407                    if total <= MAX_NODES_PER_BUCKET {
1408                        request.response_count = total;
1409                    } else {
1410                        trace!(target: "discv4", total, from=?remote_addr, "Received neighbors packet entries exceeds max nodes per bucket");
1411                        return
1412                    }
1413                };
1414
1415                if entry.get().response_count == MAX_NODES_PER_BUCKET {
1416                    // node responding with a full bucket of records
1417                    let ctx = entry.remove().lookup_context;
1418                    ctx.mark_responded(node_id);
1419                    ctx
1420                } else {
1421                    entry.get().lookup_context.clone()
1422                }
1423            }
1424            Entry::Vacant(_) => {
1425                // received neighbours response without requesting it
1426                trace!(target: "discv4", from=?remote_addr, "Received unsolicited Neighbours");
1427                return
1428            }
1429        };
1430
1431        // log the peers we discovered
1432        trace!(target: "discv4",
1433            target=format!("{:#?}", node_id),
1434            peers_count=msg.nodes.len(),
1435            peers=format!("[{:#}]", msg.nodes.iter()
1436                .map(|node_rec| node_rec.id
1437            ).format(", ")),
1438            "Received peers from Neighbours packet"
1439        );
1440
1441        // This is the recursive lookup step where we initiate new FindNode requests for new nodes
1442        // that were discovered.
1443        for node in msg.nodes.into_iter().map(NodeRecord::into_ipv4_mapped) {
1444            // prevent banned peers from being added to the context
1445            if self.config.ban_list.is_banned(&node.id, &node.address) {
1446                trace!(target: "discv4", peer_id=?node.id, ip=?node.address, "ignoring banned record");
1447                continue
1448            }
1449
1450            ctx.add_node(node);
1451        }
1452
1453        // get the next closest nodes, not yet queried nodes and start over.
1454        let closest =
1455            ctx.filter_closest(ALPHA, |node| !self.pending_find_nodes.contains_key(&node.id));
1456
1457        for closest in closest {
1458            let key = kad_key(closest.id);
1459            match self.kbuckets.entry(&key) {
1460                BucketEntry::Absent(entry) => {
1461                    // the node's endpoint is not proven yet, so we need to ping it first, on
1462                    // success, we will add the node to the pending_lookup table, and wait to send
1463                    // back a Pong before initiating a FindNode request.
1464                    // In order to prevent that this node is selected again on subsequent responses,
1465                    // while the ping is still active, we always mark it as queried.
1466                    ctx.mark_queried(closest.id);
1467                    let node = NodeEntry::new(closest);
1468                    match entry.insert(
1469                        node,
1470                        NodeStatus {
1471                            direction: ConnectionDirection::Outgoing,
1472                            state: ConnectionState::Disconnected,
1473                        },
1474                    ) {
1475                        BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1476                            // only ping if the node was added to the table
1477                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1478                        }
1479                        BucketInsertResult::Full => {
1480                            // new node but the node's bucket is already full
1481                            self.notify(DiscoveryUpdate::DiscoveredAtCapacity(closest))
1482                        }
1483                        _ => {}
1484                    }
1485                }
1486                BucketEntry::SelfEntry => {
1487                    // we received our own node entry
1488                }
1489                BucketEntry::Present(entry, _) => {
1490                    if entry.value().has_endpoint_proof {
1491                        if entry
1492                            .value()
1493                            .exceeds_find_node_failures(self.config.max_find_node_failures)
1494                        {
1495                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1496                        } else {
1497                            self.find_node(&closest, ctx.clone());
1498                        }
1499                    }
1500                }
1501                BucketEntry::Pending(mut entry, _) => {
1502                    if entry.value().has_endpoint_proof {
1503                        if entry
1504                            .value()
1505                            .exceeds_find_node_failures(self.config.max_find_node_failures)
1506                        {
1507                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1508                        } else {
1509                            self.find_node(&closest, ctx.clone());
1510                        }
1511                    }
1512                }
1513            }
1514        }
1515    }
1516
1517    /// Sends a Neighbours packet for `target` to the given addr
1518    fn respond_closest(&mut self, target: PeerId, to: SocketAddr) {
1519        let key = kad_key(target);
1520        let expire = self.send_neighbours_expiration();
1521
1522        // get the MAX_NODES_PER_BUCKET closest nodes to the target
1523        let closest_nodes =
1524            self.kbuckets.closest_values(&key).take(MAX_NODES_PER_BUCKET).collect::<Vec<_>>();
1525
1526        for nodes in closest_nodes.chunks(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) {
1527            let nodes = nodes.iter().map(|node| node.value.record).collect::<Vec<NodeRecord>>();
1528            trace!(target: "discv4", len = nodes.len(), to=?to,"Sent neighbours packet");
1529            let msg = Message::Neighbours(Neighbours { nodes, expire });
1530            self.send_packet(msg, to);
1531        }
1532    }
1533
1534    fn evict_expired_requests(&mut self, now: Instant) {
1535        self.pending_enr_requests.retain(|_node_id, enr_request| {
1536            now.duration_since(enr_request.sent_at) < self.config.enr_expiration
1537        });
1538
1539        let mut failed_pings = Vec::new();
1540        self.pending_pings.retain(|node_id, ping_request| {
1541            if now.duration_since(ping_request.sent_at) > self.config.ping_expiration {
1542                failed_pings.push(*node_id);
1543                return false
1544            }
1545            true
1546        });
1547
1548        if !failed_pings.is_empty() {
1549            // remove nodes that failed to pong
1550            trace!(target: "discv4", num=%failed_pings.len(), "evicting nodes due to failed pong");
1551            for node_id in failed_pings {
1552                self.remove_node(node_id);
1553            }
1554        }
1555
1556        let mut failed_lookups = Vec::new();
1557        self.pending_lookup.retain(|node_id, (lookup_sent_at, _)| {
1558            if now.duration_since(*lookup_sent_at) > self.config.request_timeout {
1559                failed_lookups.push(*node_id);
1560                return false
1561            }
1562            true
1563        });
1564
1565        if !failed_lookups.is_empty() {
1566            // remove nodes that failed the e2e lookup process, so we can restart it
1567            trace!(target: "discv4", num=%failed_lookups.len(), "evicting nodes due to failed lookup");
1568            for node_id in failed_lookups {
1569                self.remove_node(node_id);
1570            }
1571        }
1572
1573        self.evict_failed_find_nodes(now);
1574    }
1575
1576    /// Handles failed responses to `FindNode`
1577    fn evict_failed_find_nodes(&mut self, now: Instant) {
1578        let mut failed_find_nodes = Vec::new();
1579        self.pending_find_nodes.retain(|node_id, find_node_request| {
1580            if now.duration_since(find_node_request.sent_at) > self.config.neighbours_expiration {
1581                if !find_node_request.answered {
1582                    // node actually responded but with fewer entries than expected, but we don't
1583                    // treat this as an hard error since it responded.
1584                    failed_find_nodes.push(*node_id);
1585                }
1586                return false
1587            }
1588            true
1589        });
1590
1591        if failed_find_nodes.is_empty() {
1592            return
1593        }
1594
1595        trace!(target: "discv4", num=%failed_find_nodes.len(), "processing failed find nodes");
1596
1597        for node_id in failed_find_nodes {
1598            let key = kad_key(node_id);
1599            let failures = match self.kbuckets.entry(&key) {
1600                kbucket::Entry::Present(mut entry, _) => {
1601                    entry.value_mut().inc_failed_request();
1602                    entry.value().find_node_failures
1603                }
1604                kbucket::Entry::Pending(mut entry, _) => {
1605                    entry.value().inc_failed_request();
1606                    entry.value().find_node_failures
1607                }
1608                _ => continue,
1609            };
1610
1611            // if the node failed to respond anything useful multiple times, remove the node from
1612            // the table, but only if there are enough other nodes in the bucket (bucket must be at
1613            // least half full)
1614            if failures > self.config.max_find_node_failures {
1615                self.soft_remove_node(node_id);
1616            }
1617        }
1618    }
1619
1620    /// Re-pings all nodes which endpoint proofs are considered expired: [`NodeEntry::is_expired`]
1621    ///
1622    /// This will send a `Ping` to the nodes, if a node fails to respond with a `Pong` to renew the
1623    /// endpoint proof it will be removed from the table.
1624    fn re_ping_oldest(&mut self) {
1625        let mut nodes = self
1626            .kbuckets
1627            .iter_ref()
1628            .filter(|entry| entry.node.value.is_expired())
1629            .map(|n| n.node.value)
1630            .collect::<Vec<_>>();
1631        nodes.sort_by(|a, b| a.last_seen.cmp(&b.last_seen));
1632        let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::<Vec<_>>();
1633        for node in to_ping {
1634            self.try_ping(node, PingReason::RePing)
1635        }
1636    }
1637
1638    /// Returns true if the expiration timestamp is in the past.
1639    fn is_expired(&self, expiration: u64) -> bool {
1640        self.ensure_not_expired(expiration).is_err()
1641    }
1642
1643    /// Validate that given timestamp is not expired.
1644    ///
1645    /// Note: this accepts the timestamp as u64 because this is used by the wire protocol, but the
1646    /// UNIX timestamp (number of non-leap seconds since January 1, 1970 0:00:00 UTC) is supposed to
1647    /// be an i64.
1648    ///
1649    /// Returns an error if:
1650    ///  - invalid UNIX timestamp (larger than `i64::MAX`)
1651    ///  - timestamp is expired (lower than current local UNIX timestamp)
1652    fn ensure_not_expired(&self, timestamp: u64) -> Result<(), ()> {
1653        // ensure the timestamp is a valid UNIX timestamp
1654        let _ = i64::try_from(timestamp).map_err(drop)?;
1655
1656        let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
1657        if self.config.enforce_expiration_timestamps && timestamp < now {
1658            trace!(target: "discv4", "Expired packet");
1659            return Err(())
1660        }
1661        Ok(())
1662    }
1663
1664    /// Pops buffered ping requests and sends them.
1665    fn ping_buffered(&mut self) {
1666        while self.pending_pings.len() < MAX_NODES_PING {
1667            match self.queued_pings.pop_front() {
1668                Some((next, reason)) => self.try_ping(next, reason),
1669                None => break,
1670            }
1671        }
1672    }
1673
1674    fn ping_expiration(&self) -> u64 {
1675        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.ping_expiration)
1676            .as_secs()
1677    }
1678
1679    fn find_node_expiration(&self) -> u64 {
1680        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.request_timeout)
1681            .as_secs()
1682    }
1683
1684    fn enr_request_expiration(&self) -> u64 {
1685        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.enr_expiration)
1686            .as_secs()
1687    }
1688
1689    fn send_neighbours_expiration(&self) -> u64 {
1690        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.neighbours_expiration)
1691            .as_secs()
1692    }
1693
1694    /// Polls the socket and advances the state.
1695    ///
1696    /// To prevent traffic amplification attacks, implementations must verify that the sender of a
1697    /// query participates in the discovery protocol. The sender of a packet is considered verified
1698    /// if it has sent a valid Pong response with matching ping hash within the last 12 hours.
1699    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Discv4Event> {
1700        loop {
1701            // drain buffered events first
1702            if let Some(event) = self.queued_events.pop_front() {
1703                return Poll::Ready(event)
1704            }
1705
1706            // trigger self lookup
1707            if self.config.enable_lookup {
1708                while self.lookup_interval.poll_tick(cx).is_ready() {
1709                    let target = self.lookup_rotator.next(&self.local_node_record.id);
1710                    self.lookup_with(target, None);
1711                }
1712            }
1713
1714            // re-ping some peers
1715            while self.ping_interval.poll_tick(cx).is_ready() {
1716                self.re_ping_oldest();
1717            }
1718
1719            if let Some(Poll::Ready(Some(ip))) =
1720                self.resolve_external_ip_interval.as_mut().map(|r| r.poll_tick(cx))
1721            {
1722                self.set_external_ip_addr(ip);
1723            }
1724
1725            // drain all incoming `Discv4` commands, this channel can never close
1726            while let Poll::Ready(Some(cmd)) = self.commands_rx.poll_recv(cx) {
1727                match cmd {
1728                    Discv4Command::Add(enr) => {
1729                        self.add_node(enr);
1730                    }
1731                    Discv4Command::Lookup { node_id, tx } => {
1732                        let node_id = node_id.unwrap_or(self.local_node_record.id);
1733                        self.lookup_with(node_id, tx);
1734                    }
1735                    Discv4Command::SetLookupInterval(duration) => {
1736                        self.set_lookup_interval(duration);
1737                    }
1738                    Discv4Command::Updates(tx) => {
1739                        let rx = self.update_stream();
1740                        let _ = tx.send(rx);
1741                    }
1742                    Discv4Command::BanPeer(node_id) => self.ban_node(node_id),
1743                    Discv4Command::Remove(node_id) => {
1744                        self.remove_node(node_id);
1745                    }
1746                    Discv4Command::Ban(node_id, ip) => {
1747                        self.ban_node(node_id);
1748                        self.ban_ip(ip);
1749                    }
1750                    Discv4Command::BanIp(ip) => {
1751                        self.ban_ip(ip);
1752                    }
1753                    Discv4Command::SetEIP868RLPPair { key, rlp } => {
1754                        debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair");
1755
1756                        let _ = self.local_eip_868_enr.insert_raw_rlp(key, rlp, &self.secret_key);
1757                    }
1758                    Discv4Command::SetTcpPort(port) => {
1759                        debug!(target: "discv4", %port, "Update tcp port");
1760                        self.local_node_record.tcp_port = port;
1761                        if self.local_node_record.address.is_ipv4() {
1762                            let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key);
1763                        } else {
1764                            let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key);
1765                        }
1766                    }
1767
1768                    Discv4Command::Terminated => {
1769                        // terminate the service
1770                        self.queued_events.push_back(Discv4Event::Terminated);
1771                    }
1772                }
1773            }
1774
1775            // restricts how many messages we process in a single poll before yielding back control
1776            let mut udp_message_budget = UDP_MESSAGE_POLL_LOOP_BUDGET;
1777
1778            // process all incoming datagrams
1779            while let Poll::Ready(Some(event)) = self.ingress.poll_recv(cx) {
1780                match event {
1781                    IngressEvent::RecvError(err) => {
1782                        debug!(target: "discv4", %err, "failed to read datagram");
1783                    }
1784                    IngressEvent::BadPacket(from, err, data) => {
1785                        trace!(target: "discv4", ?from, %err, packet=?hex::encode(&data), "bad packet");
1786                    }
1787                    IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => {
1788                        trace!(target: "discv4", r#type=?msg.msg_type(), from=?remote_addr,"received packet");
1789                        let event = match msg {
1790                            Message::Ping(ping) => {
1791                                self.on_ping(ping, remote_addr, node_id, hash);
1792                                Discv4Event::Ping
1793                            }
1794                            Message::Pong(pong) => {
1795                                self.on_pong(pong, remote_addr, node_id);
1796                                Discv4Event::Pong
1797                            }
1798                            Message::FindNode(msg) => {
1799                                self.on_find_node(msg, remote_addr, node_id);
1800                                Discv4Event::FindNode
1801                            }
1802                            Message::Neighbours(msg) => {
1803                                self.on_neighbours(msg, remote_addr, node_id);
1804                                Discv4Event::Neighbours
1805                            }
1806                            Message::EnrRequest(msg) => {
1807                                self.on_enr_request(msg, remote_addr, node_id, hash);
1808                                Discv4Event::EnrRequest
1809                            }
1810                            Message::EnrResponse(msg) => {
1811                                self.on_enr_response(msg, remote_addr, node_id);
1812                                Discv4Event::EnrResponse
1813                            }
1814                        };
1815
1816                        self.queued_events.push_back(event);
1817                    }
1818                }
1819
1820                udp_message_budget -= 1;
1821                if udp_message_budget < 0 {
1822                    trace!(target: "discv4", budget=UDP_MESSAGE_POLL_LOOP_BUDGET, "exhausted message poll budget");
1823                    if self.queued_events.is_empty() {
1824                        // we've exceeded the message budget and have no events to process
1825                        // this will make sure we're woken up again
1826                        cx.waker().wake_by_ref();
1827                    }
1828                    break
1829                }
1830            }
1831
1832            // try resending buffered pings
1833            self.ping_buffered();
1834
1835            // evict expired requests
1836            while self.evict_expired_requests_interval.poll_tick(cx).is_ready() {
1837                self.evict_expired_requests(Instant::now());
1838            }
1839
1840            // evict expired nodes
1841            while self.expire_interval.poll_tick(cx).is_ready() {
1842                self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION);
1843            }
1844
1845            if self.queued_events.is_empty() {
1846                return Poll::Pending
1847            }
1848        }
1849    }
1850}
1851
1852/// Endless future impl
1853impl Stream for Discv4Service {
1854    type Item = Discv4Event;
1855
1856    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1857        // Poll the internal poll method
1858        match ready!(self.get_mut().poll(cx)) {
1859            // if the service is terminated, return None to terminate the stream
1860            Discv4Event::Terminated => Poll::Ready(None),
1861            // For any other event, return Poll::Ready(Some(event))
1862            ev => Poll::Ready(Some(ev)),
1863        }
1864    }
1865}
1866
1867impl fmt::Debug for Discv4Service {
1868    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1869        f.debug_struct("Discv4Service")
1870            .field("local_address", &self.local_address)
1871            .field("local_peer_id", &self.local_peer_id())
1872            .field("local_node_record", &self.local_node_record)
1873            .field("queued_pings", &self.queued_pings)
1874            .field("pending_lookup", &self.pending_lookup)
1875            .field("pending_find_nodes", &self.pending_find_nodes)
1876            .field("lookup_interval", &self.lookup_interval)
1877            .finish_non_exhaustive()
1878    }
1879}
1880
1881/// The Event type the Service stream produces.
1882///
1883/// This is mainly used for testing purposes and represents messages the service processed
1884#[derive(Debug, Eq, PartialEq)]
1885pub enum Discv4Event {
1886    /// A `Ping` message was handled.
1887    Ping,
1888    /// A `Pong` message was handled.
1889    Pong,
1890    /// A `FindNode` message was handled.
1891    FindNode,
1892    /// A `Neighbours` message was handled.
1893    Neighbours,
1894    /// A `EnrRequest` message was handled.
1895    EnrRequest,
1896    /// A `EnrResponse` message was handled.
1897    EnrResponse,
1898    /// Service is being terminated
1899    Terminated,
1900}
1901
1902/// Continuously reads new messages from the channel and writes them to the socket
1903pub(crate) async fn send_loop(udp: Arc<UdpSocket>, rx: EgressReceiver) {
1904    let mut stream = ReceiverStream::new(rx);
1905    while let Some((payload, to)) = stream.next().await {
1906        match udp.send_to(&payload, to).await {
1907            Ok(size) => {
1908                trace!(target: "discv4", ?to, ?size,"sent payload");
1909            }
1910            Err(err) => {
1911                debug!(target: "discv4", ?to, %err,"Failed to send datagram.");
1912            }
1913        }
1914    }
1915}
1916
1917/// Rate limits the number of incoming packets from individual IPs to 1 packet/second
1918const MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP: usize = 60usize;
1919
1920/// Continuously awaits new incoming messages and sends them back through the channel.
1921///
1922/// The receive loop enforce primitive rate limiting for ips to prevent message spams from
1923/// individual IPs
1924pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_id: PeerId) {
1925    let send = |event: IngressEvent| async {
1926        let _ = tx.send(event).await.map_err(|err| {
1927            debug!(
1928                target: "discv4",
1929                 %err,
1930                "failed send incoming packet",
1931            )
1932        });
1933    };
1934
1935    let mut cache = ReceiveCache::default();
1936
1937    // tick at half the rate of the limit
1938    let tick = MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP / 2;
1939    let mut interval = tokio::time::interval(Duration::from_secs(tick as u64));
1940
1941    let mut buf = [0; MAX_PACKET_SIZE];
1942    loop {
1943        let res = udp.recv_from(&mut buf).await;
1944        match res {
1945            Err(err) => {
1946                debug!(target: "discv4", %err, "Failed to read datagram.");
1947                send(IngressEvent::RecvError(err)).await;
1948            }
1949            Ok((read, remote_addr)) => {
1950                // rate limit incoming packets by IP
1951                if cache.inc_ip(remote_addr.ip()) > MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP {
1952                    trace!(target: "discv4", ?remote_addr, "Too many incoming packets from IP.");
1953                    continue
1954                }
1955
1956                let packet = &buf[..read];
1957                match Message::decode(packet) {
1958                    Ok(packet) => {
1959                        if packet.node_id == local_id {
1960                            // received our own message
1961                            debug!(target: "discv4", ?remote_addr, "Received own packet.");
1962                            continue
1963                        }
1964
1965                        // skip if we've already received the same packet
1966                        if cache.contains_packet(packet.hash) {
1967                            debug!(target: "discv4", ?remote_addr, "Received duplicate packet.");
1968                            continue
1969                        }
1970
1971                        send(IngressEvent::Packet(remote_addr, packet)).await;
1972                    }
1973                    Err(err) => {
1974                        trace!(target: "discv4", %err,"Failed to decode packet");
1975                        send(IngressEvent::BadPacket(remote_addr, err, packet.to_vec())).await
1976                    }
1977                }
1978            }
1979        }
1980
1981        // reset the tracked ips if the interval has passed
1982        if poll_fn(|cx| match interval.poll_tick(cx) {
1983            Poll::Ready(_) => Poll::Ready(true),
1984            Poll::Pending => Poll::Ready(false),
1985        })
1986        .await
1987        {
1988            cache.tick_ips(tick);
1989        }
1990    }
1991}
1992
1993/// A cache for received packets and their source address.
1994///
1995/// This is used to discard duplicated packets and rate limit messages from the same source.
1996struct ReceiveCache {
1997    /// keeps track of how many messages we've received from a given IP address since the last
1998    /// tick.
1999    ///
2000    /// This is used to count the number of messages received from a given IP address within an
2001    /// interval.
2002    ip_messages: HashMap<IpAddr, usize>,
2003    // keeps track of unique packet hashes
2004    unique_packets: schnellru::LruMap<B256, ()>,
2005}
2006
2007impl ReceiveCache {
2008    /// Updates the counter for each IP address and removes IPs that have exceeded the limit.
2009    ///
2010    /// This will decrement the counter for each IP address and remove IPs that have reached 0.
2011    fn tick_ips(&mut self, tick: usize) {
2012        self.ip_messages.retain(|_, count| {
2013            if let Some(reset) = count.checked_sub(tick) {
2014                *count = reset;
2015                true
2016            } else {
2017                false
2018            }
2019        });
2020    }
2021
2022    /// Increases the counter for the given IP address and returns the new count.
2023    fn inc_ip(&mut self, ip: IpAddr) -> usize {
2024        let ctn = self.ip_messages.entry(ip).or_default();
2025        *ctn = ctn.saturating_add(1);
2026        *ctn
2027    }
2028
2029    /// Returns true if we previously received the packet
2030    fn contains_packet(&mut self, hash: B256) -> bool {
2031        !self.unique_packets.insert(hash, ())
2032    }
2033}
2034
2035impl Default for ReceiveCache {
2036    fn default() -> Self {
2037        Self {
2038            ip_messages: Default::default(),
2039            unique_packets: schnellru::LruMap::new(schnellru::ByLength::new(32)),
2040        }
2041    }
2042}
2043
2044/// The commands sent from the frontend [Discv4] to the service [`Discv4Service`].
2045enum Discv4Command {
2046    Add(NodeRecord),
2047    SetTcpPort(u16),
2048    SetEIP868RLPPair { key: Vec<u8>, rlp: Bytes },
2049    Ban(PeerId, IpAddr),
2050    BanPeer(PeerId),
2051    BanIp(IpAddr),
2052    Remove(PeerId),
2053    Lookup { node_id: Option<PeerId>, tx: Option<NodeRecordSender> },
2054    SetLookupInterval(Duration),
2055    Updates(OneshotSender<ReceiverStream<DiscoveryUpdate>>),
2056    Terminated,
2057}
2058
2059/// Event type receiver produces
2060#[derive(Debug)]
2061pub(crate) enum IngressEvent {
2062    /// Encountered an error when reading a datagram message.
2063    RecvError(io::Error),
2064    /// Received a bad message
2065    BadPacket(SocketAddr, DecodePacketError, Vec<u8>),
2066    /// Received a datagram from an address.
2067    Packet(SocketAddr, Packet),
2068}
2069
2070/// Tracks a sent ping
2071#[derive(Debug)]
2072struct PingRequest {
2073    // Timestamp when the request was sent.
2074    sent_at: Instant,
2075    // Node to which the request was sent.
2076    node: NodeRecord,
2077    // Hash sent in the Ping request
2078    echo_hash: B256,
2079    /// Why this ping was sent.
2080    reason: PingReason,
2081}
2082
2083/// Rotates the `PeerId` that is periodically looked up.
2084///
2085/// By selecting different targets, the lookups will be seeded with different ALPHA seed nodes.
2086#[derive(Debug)]
2087struct LookupTargetRotator {
2088    interval: usize,
2089    counter: usize,
2090}
2091
2092// === impl LookupTargetRotator ===
2093
2094impl LookupTargetRotator {
2095    /// Returns a rotator that always returns the local target.
2096    const fn local_only() -> Self {
2097        Self { interval: 1, counter: 0 }
2098    }
2099}
2100
2101impl Default for LookupTargetRotator {
2102    fn default() -> Self {
2103        Self {
2104            // every 4th lookup is our own node
2105            interval: 4,
2106            counter: 3,
2107        }
2108    }
2109}
2110
2111impl LookupTargetRotator {
2112    /// this will return the next node id to lookup
2113    fn next(&mut self, local: &PeerId) -> PeerId {
2114        self.counter += 1;
2115        self.counter %= self.interval;
2116        if self.counter == 0 {
2117            return *local
2118        }
2119        PeerId::random()
2120    }
2121}
2122
2123/// Tracks lookups across multiple `FindNode` requests.
2124///
2125/// If this type is dropped by all Clones, it will send all the discovered nodes to the listener, if
2126/// one is present.
2127#[derive(Clone, Debug)]
2128struct LookupContext {
2129    inner: Rc<LookupContextInner>,
2130}
2131
2132impl LookupContext {
2133    /// Create new context for a recursive lookup
2134    fn new(
2135        target: discv5::Key<NodeKey>,
2136        nearest_nodes: impl IntoIterator<Item = (Distance, NodeRecord)>,
2137        listener: Option<NodeRecordSender>,
2138    ) -> Self {
2139        let closest_nodes = nearest_nodes
2140            .into_iter()
2141            .map(|(distance, record)| {
2142                (distance, QueryNode { record, queried: false, responded: false })
2143            })
2144            .collect();
2145
2146        let inner = Rc::new(LookupContextInner {
2147            target,
2148            closest_nodes: RefCell::new(closest_nodes),
2149            listener,
2150        });
2151        Self { inner }
2152    }
2153
2154    /// Returns the target of this lookup
2155    fn target(&self) -> PeerId {
2156        self.inner.target.preimage().0
2157    }
2158
2159    fn closest(&self, num: usize) -> Vec<NodeRecord> {
2160        self.inner
2161            .closest_nodes
2162            .borrow()
2163            .iter()
2164            .filter(|(_, node)| !node.queried)
2165            .map(|(_, n)| n.record)
2166            .take(num)
2167            .collect()
2168    }
2169
2170    /// Returns the closest nodes that have not been queried yet.
2171    fn filter_closest<P>(&self, num: usize, filter: P) -> Vec<NodeRecord>
2172    where
2173        P: FnMut(&NodeRecord) -> bool,
2174    {
2175        self.inner
2176            .closest_nodes
2177            .borrow()
2178            .iter()
2179            .filter(|(_, node)| !node.queried)
2180            .map(|(_, n)| n.record)
2181            .filter(filter)
2182            .take(num)
2183            .collect()
2184    }
2185
2186    /// Inserts the node if it's missing
2187    fn add_node(&self, record: NodeRecord) {
2188        let distance = self.inner.target.distance(&kad_key(record.id));
2189        let mut closest = self.inner.closest_nodes.borrow_mut();
2190        if let btree_map::Entry::Vacant(entry) = closest.entry(distance) {
2191            entry.insert(QueryNode { record, queried: false, responded: false });
2192        }
2193    }
2194
2195    fn set_queried(&self, id: PeerId, val: bool) {
2196        if let Some((_, node)) =
2197            self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2198        {
2199            node.queried = val;
2200        }
2201    }
2202
2203    /// Marks the node as queried
2204    fn mark_queried(&self, id: PeerId) {
2205        self.set_queried(id, true)
2206    }
2207
2208    /// Marks the node as not queried
2209    fn unmark_queried(&self, id: PeerId) {
2210        self.set_queried(id, false)
2211    }
2212
2213    /// Marks the node as responded
2214    fn mark_responded(&self, id: PeerId) {
2215        if let Some((_, node)) =
2216            self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2217        {
2218            node.responded = true;
2219        }
2220    }
2221}
2222
2223// SAFETY: The [`Discv4Service`] is intended to be spawned as task which requires `Send`.
2224// The `LookupContext` is shared by all active `FindNode` requests that are part of the lookup step.
2225// Which can modify the context. The shared context is only ever accessed mutably when a `Neighbour`
2226// response is processed and all Clones are stored inside [`Discv4Service`], in other words it is
2227// guaranteed that there's only 1 owner ([`Discv4Service`]) of all possible [`Rc`] clones of
2228// [`LookupContext`].
2229unsafe impl Send for LookupContext {}
2230#[derive(Debug)]
2231struct LookupContextInner {
2232    /// The target to lookup.
2233    target: discv5::Key<NodeKey>,
2234    /// The closest nodes
2235    closest_nodes: RefCell<BTreeMap<Distance, QueryNode>>,
2236    /// A listener for all the nodes retrieved in this lookup
2237    ///
2238    /// This is present if the lookup was triggered manually via [Discv4] and we want to return all
2239    /// the nodes once the lookup finishes.
2240    listener: Option<NodeRecordSender>,
2241}
2242
2243impl Drop for LookupContextInner {
2244    fn drop(&mut self) {
2245        if let Some(tx) = self.listener.take() {
2246            // there's only 1 instance shared across `FindNode` requests, if this is dropped then
2247            // all requests finished, and we can send all results back
2248            let nodes = self
2249                .closest_nodes
2250                .take()
2251                .into_values()
2252                .filter(|node| node.responded)
2253                .map(|node| node.record)
2254                .collect();
2255            let _ = tx.send(nodes);
2256        }
2257    }
2258}
2259
2260/// Tracks the state of a recursive lookup step
2261#[derive(Debug, Clone, Copy)]
2262struct QueryNode {
2263    record: NodeRecord,
2264    queried: bool,
2265    responded: bool,
2266}
2267
2268#[derive(Debug)]
2269struct FindNodeRequest {
2270    // Timestamp when the request was sent.
2271    sent_at: Instant,
2272    // Number of items sent by the node
2273    response_count: usize,
2274    // Whether the request has been answered yet.
2275    answered: bool,
2276    /// Response buffer
2277    lookup_context: LookupContext,
2278}
2279
2280// === impl FindNodeRequest ===
2281
2282impl FindNodeRequest {
2283    fn new(resp: LookupContext) -> Self {
2284        Self { sent_at: Instant::now(), response_count: 0, answered: false, lookup_context: resp }
2285    }
2286}
2287
2288#[derive(Debug)]
2289struct EnrRequestState {
2290    // Timestamp when the request was sent.
2291    sent_at: Instant,
2292    // Hash sent in the Ping request
2293    echo_hash: B256,
2294}
2295
2296/// Stored node info.
2297#[derive(Debug, Clone, Eq, PartialEq)]
2298struct NodeEntry {
2299    /// Node record info.
2300    record: NodeRecord,
2301    /// Timestamp of last pong.
2302    last_seen: Instant,
2303    /// Last enr seq we retrieved via a ENR request.
2304    last_enr_seq: Option<u64>,
2305    /// `ForkId` if retrieved via ENR requests.
2306    fork_id: Option<ForkId>,
2307    /// Counter for failed _consecutive_ findNode requests.
2308    find_node_failures: u8,
2309    /// Whether the endpoint of the peer is proven.
2310    has_endpoint_proof: bool,
2311}
2312
2313// === impl NodeEntry ===
2314
2315impl NodeEntry {
2316    /// Creates a new, unpopulated entry
2317    fn new(record: NodeRecord) -> Self {
2318        Self {
2319            record,
2320            last_seen: Instant::now(),
2321            last_enr_seq: None,
2322            fork_id: None,
2323            find_node_failures: 0,
2324            has_endpoint_proof: false,
2325        }
2326    }
2327
2328    #[cfg(test)]
2329    fn new_proven(record: NodeRecord) -> Self {
2330        let mut node = Self::new(record);
2331        node.has_endpoint_proof = true;
2332        node
2333    }
2334
2335    /// Marks the entry with an established proof and resets the consecutive failure counter.
2336    const fn establish_proof(&mut self) {
2337        self.has_endpoint_proof = true;
2338        self.find_node_failures = 0;
2339    }
2340
2341    /// Returns true if the tracked find node failures exceed the max amount
2342    const fn exceeds_find_node_failures(&self, max_failures: u8) -> bool {
2343        self.find_node_failures >= max_failures
2344    }
2345
2346    /// Updates the last timestamp and sets the enr seq
2347    fn update_with_enr(&mut self, last_enr_seq: Option<u64>) -> Option<u64> {
2348        self.update_now(|s| std::mem::replace(&mut s.last_enr_seq, last_enr_seq))
2349    }
2350
2351    /// Increases the failed request counter
2352    const fn inc_failed_request(&mut self) {
2353        self.find_node_failures += 1;
2354    }
2355
2356    /// Updates the last timestamp and sets the enr seq
2357    fn update_with_fork_id(&mut self, fork_id: Option<ForkId>) -> Option<ForkId> {
2358        self.update_now(|s| std::mem::replace(&mut s.fork_id, fork_id))
2359    }
2360
2361    /// Updates the `last_seen` timestamp and calls the closure
2362    fn update_now<F, R>(&mut self, f: F) -> R
2363    where
2364        F: FnOnce(&mut Self) -> R,
2365    {
2366        self.last_seen = Instant::now();
2367        f(self)
2368    }
2369}
2370
2371// === impl NodeEntry ===
2372
2373impl NodeEntry {
2374    /// Returns true if the node should be re-pinged.
2375    fn is_expired(&self) -> bool {
2376        self.last_seen.elapsed() > (ENDPOINT_PROOF_EXPIRATION / 2)
2377    }
2378}
2379
2380/// Represents why a ping is issued
2381#[derive(Debug)]
2382enum PingReason {
2383    /// Initial ping to a previously unknown peer that was inserted into the table.
2384    InitialInsert,
2385    /// A ping to a peer to establish a bond (endpoint proof).
2386    EstablishBond,
2387    /// Re-ping a peer.
2388    RePing,
2389    /// Part of a lookup to ensure endpoint is proven before we can send a `FindNode` request.
2390    Lookup(NodeRecord, LookupContext),
2391}
2392
2393/// Represents node related updates state changes in the underlying node table
2394#[derive(Debug, Clone)]
2395pub enum DiscoveryUpdate {
2396    /// A new node was discovered _and_ added to the table.
2397    Added(NodeRecord),
2398    /// A new node was discovered but _not_ added to the table because it is currently full.
2399    DiscoveredAtCapacity(NodeRecord),
2400    /// Received a [`ForkId`] via EIP-868 for the given [`NodeRecord`].
2401    EnrForkId(NodeRecord, ForkId),
2402    /// Node that was removed from the table
2403    Removed(PeerId),
2404    /// A series of updates
2405    Batch(Vec<DiscoveryUpdate>),
2406}
2407
2408#[cfg(test)]
2409mod tests {
2410    use super::*;
2411    use crate::test_utils::{create_discv4, create_discv4_with_config, rng_endpoint, rng_record};
2412    use alloy_primitives::hex;
2413    use alloy_rlp::{Decodable, Encodable};
2414    use rand_08::Rng;
2415    use reth_ethereum_forks::{EnrForkIdEntry, ForkHash};
2416    use reth_network_peers::mainnet_nodes;
2417    use std::future::poll_fn;
2418
2419    #[tokio::test]
2420    async fn test_configured_enr_forkid_entry() {
2421        let fork: ForkId = ForkId { hash: ForkHash([220, 233, 108, 45]), next: 0u64 };
2422        let mut disc_conf = Discv4Config::default();
2423        disc_conf.add_eip868_pair("eth", EnrForkIdEntry::from(fork));
2424        let (_discv4, service) = create_discv4_with_config(disc_conf).await;
2425        let eth = service.local_eip_868_enr.get_raw_rlp(b"eth").unwrap();
2426        let fork_entry_id = EnrForkIdEntry::decode(&mut &eth[..]).unwrap();
2427
2428        let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2429        let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2430        let expected = EnrForkIdEntry {
2431            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2432        };
2433        assert_eq!(expected, fork_entry_id);
2434        assert_eq!(expected, decoded);
2435    }
2436
2437    #[test]
2438    fn test_enr_forkid_entry_decode() {
2439        let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2440        let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2441        let expected = EnrForkIdEntry {
2442            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2443        };
2444        assert_eq!(expected, decoded);
2445    }
2446
2447    #[test]
2448    fn test_enr_forkid_entry_encode() {
2449        let original = EnrForkIdEntry {
2450            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2451        };
2452        let expected: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2453        let mut encoded = Vec::with_capacity(expected.len());
2454        original.encode(&mut encoded);
2455        assert_eq!(&expected[..], encoded.as_slice());
2456    }
2457
2458    #[test]
2459    fn test_local_rotator() {
2460        let id = PeerId::random();
2461        let mut rotator = LookupTargetRotator::local_only();
2462        assert_eq!(rotator.next(&id), id);
2463        assert_eq!(rotator.next(&id), id);
2464    }
2465
2466    #[test]
2467    fn test_rotator() {
2468        let id = PeerId::random();
2469        let mut rotator = LookupTargetRotator::default();
2470        assert_eq!(rotator.next(&id), id);
2471        assert_ne!(rotator.next(&id), id);
2472        assert_ne!(rotator.next(&id), id);
2473        assert_ne!(rotator.next(&id), id);
2474        assert_eq!(rotator.next(&id), id);
2475    }
2476
2477    #[tokio::test]
2478    async fn test_pending_ping() {
2479        let (_, mut service) = create_discv4().await;
2480
2481        let local_addr = service.local_addr();
2482
2483        let mut num_inserted = 0;
2484        loop {
2485            let node = NodeRecord::new(local_addr, PeerId::random());
2486            if service.add_node(node) {
2487                num_inserted += 1;
2488                assert!(service.pending_pings.contains_key(&node.id));
2489                assert_eq!(service.pending_pings.len(), num_inserted);
2490                if num_inserted == MAX_NODES_PING {
2491                    break
2492                }
2493            }
2494        }
2495
2496        // `pending_pings` is full, insert into `queued_pings`.
2497        num_inserted = 0;
2498        for _ in 0..MAX_NODES_PING {
2499            let node = NodeRecord::new(local_addr, PeerId::random());
2500            if service.add_node(node) {
2501                num_inserted += 1;
2502                assert!(!service.pending_pings.contains_key(&node.id));
2503                assert_eq!(service.pending_pings.len(), MAX_NODES_PING);
2504                assert_eq!(service.queued_pings.len(), num_inserted);
2505            }
2506        }
2507    }
2508
2509    // Bootstraps with mainnet boot nodes
2510    #[tokio::test(flavor = "multi_thread")]
2511    #[ignore]
2512    async fn test_mainnet_lookup() {
2513        reth_tracing::init_test_tracing();
2514        let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2515
2516        let all_nodes = mainnet_nodes();
2517        let config = Discv4Config::builder()
2518            .add_boot_nodes(all_nodes)
2519            .lookup_interval(Duration::from_secs(1))
2520            .add_eip868_pair("eth", fork_id)
2521            .build();
2522        let (_discv4, mut service) = create_discv4_with_config(config).await;
2523
2524        let mut updates = service.update_stream();
2525
2526        let _handle = service.spawn();
2527
2528        let mut table = HashMap::new();
2529        while let Some(update) = updates.next().await {
2530            match update {
2531                DiscoveryUpdate::EnrForkId(record, fork_id) => {
2532                    println!("{record:?}, {fork_id:?}");
2533                }
2534                DiscoveryUpdate::Added(record) => {
2535                    table.insert(record.id, record);
2536                }
2537                DiscoveryUpdate::Removed(id) => {
2538                    table.remove(&id);
2539                }
2540                _ => {}
2541            }
2542            println!("total peers {}", table.len());
2543        }
2544    }
2545
2546    #[tokio::test]
2547    async fn test_mapped_ipv4() {
2548        reth_tracing::init_test_tracing();
2549        let mut rng = rand_08::thread_rng();
2550        let config = Discv4Config::builder().build();
2551        let (_discv4, mut service) = create_discv4_with_config(config).await;
2552
2553        let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2554        let v6 = v4.to_ipv6_mapped();
2555        let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2556
2557        let ping = Ping {
2558            from: rng_endpoint(&mut rng),
2559            to: rng_endpoint(&mut rng),
2560            expire: service.ping_expiration(),
2561            enr_sq: Some(rng.r#gen()),
2562        };
2563
2564        let id = PeerId::random();
2565        service.on_ping(ping, addr, id, B256::random());
2566
2567        let key = kad_key(id);
2568        match service.kbuckets.entry(&key) {
2569            kbucket::Entry::Present(entry, _) => {
2570                let node_addr = entry.value().record.address;
2571                assert!(node_addr.is_ipv4());
2572                assert_eq!(node_addr, IpAddr::from(v4));
2573            }
2574            _ => unreachable!(),
2575        };
2576    }
2577
2578    #[tokio::test]
2579    async fn test_respect_ping_expiration() {
2580        reth_tracing::init_test_tracing();
2581        let mut rng = rand_08::thread_rng();
2582        let config = Discv4Config::builder().build();
2583        let (_discv4, mut service) = create_discv4_with_config(config).await;
2584
2585        let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2586        let v6 = v4.to_ipv6_mapped();
2587        let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2588
2589        let ping = Ping {
2590            from: rng_endpoint(&mut rng),
2591            to: rng_endpoint(&mut rng),
2592            expire: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() - 1,
2593            enr_sq: Some(rng.r#gen()),
2594        };
2595
2596        let id = PeerId::random();
2597        service.on_ping(ping, addr, id, B256::random());
2598
2599        let key = kad_key(id);
2600        match service.kbuckets.entry(&key) {
2601            kbucket::Entry::Absent(_) => {}
2602            _ => unreachable!(),
2603        };
2604    }
2605
2606    #[tokio::test]
2607    async fn test_single_lookups() {
2608        reth_tracing::init_test_tracing();
2609
2610        let config = Discv4Config::builder().build();
2611        let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2612
2613        let id = PeerId::random();
2614        let key = kad_key(id);
2615        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2616
2617        let _ = service.kbuckets.insert_or_update(
2618            &key,
2619            NodeEntry::new_proven(record),
2620            NodeStatus {
2621                direction: ConnectionDirection::Incoming,
2622                state: ConnectionState::Connected,
2623            },
2624        );
2625
2626        service.lookup_self();
2627        assert_eq!(service.pending_find_nodes.len(), 1);
2628
2629        poll_fn(|cx| {
2630            let _ = service.poll(cx);
2631            assert_eq!(service.pending_find_nodes.len(), 1);
2632
2633            Poll::Ready(())
2634        })
2635        .await;
2636    }
2637
2638    #[tokio::test]
2639    async fn test_on_neighbours_recursive_lookup() {
2640        reth_tracing::init_test_tracing();
2641
2642        let config = Discv4Config::builder().build();
2643        let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2644        let (_discv4, mut service2) = create_discv4_with_config(config).await;
2645
2646        let id = PeerId::random();
2647        let key = kad_key(id);
2648        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2649
2650        let _ = service.kbuckets.insert_or_update(
2651            &key,
2652            NodeEntry::new_proven(record),
2653            NodeStatus {
2654                direction: ConnectionDirection::Incoming,
2655                state: ConnectionState::Connected,
2656            },
2657        );
2658        // Needed in this test to populate self.pending_find_nodes for as a prereq to a valid
2659        // on_neighbours request
2660        service.lookup_self();
2661        assert_eq!(service.pending_find_nodes.len(), 1);
2662
2663        poll_fn(|cx| {
2664            let _ = service.poll(cx);
2665            assert_eq!(service.pending_find_nodes.len(), 1);
2666
2667            Poll::Ready(())
2668        })
2669        .await;
2670
2671        let expiry = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() +
2672            10000000000000;
2673        let msg = Neighbours { nodes: vec![service2.local_node_record], expire: expiry };
2674        service.on_neighbours(msg, record.tcp_addr(), id);
2675        // wait for the processed ping
2676        let event = poll_fn(|cx| service2.poll(cx)).await;
2677        assert_eq!(event, Discv4Event::Ping);
2678        // assert that no find_node req has been added here on top of the initial one, since both
2679        // sides of the endpoint proof is not completed here
2680        assert_eq!(service.pending_find_nodes.len(), 1);
2681        // we now wait for PONG
2682        let event = poll_fn(|cx| service.poll(cx)).await;
2683        assert_eq!(event, Discv4Event::Pong);
2684        // Ideally we want to assert against service.pending_lookup.len() here - but because the
2685        // service2 sends Pong and Ping consecutivley on_ping(), the pending_lookup table gets
2686        // drained almost immediately - and no way to grab the handle to its intermediary state here
2687        // :(
2688        let event = poll_fn(|cx| service.poll(cx)).await;
2689        assert_eq!(event, Discv4Event::Ping);
2690        // assert that we've added the find_node req here after both sides of the endpoint proof is
2691        // done
2692        assert_eq!(service.pending_find_nodes.len(), 2);
2693    }
2694
2695    #[tokio::test]
2696    async fn test_no_local_in_closest() {
2697        reth_tracing::init_test_tracing();
2698
2699        let config = Discv4Config::builder().build();
2700        let (_discv4, mut service) = create_discv4_with_config(config).await;
2701
2702        let target_key = kad_key(PeerId::random());
2703
2704        let id = PeerId::random();
2705        let key = kad_key(id);
2706        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2707
2708        let _ = service.kbuckets.insert_or_update(
2709            &key,
2710            NodeEntry::new(record),
2711            NodeStatus {
2712                direction: ConnectionDirection::Incoming,
2713                state: ConnectionState::Connected,
2714            },
2715        );
2716
2717        let closest = service
2718            .kbuckets
2719            .closest_values(&target_key)
2720            .map(|n| n.value.record)
2721            .take(MAX_NODES_PER_BUCKET)
2722            .collect::<Vec<_>>();
2723
2724        assert_eq!(closest.len(), 1);
2725        assert!(!closest.iter().any(|r| r.id == *service.local_peer_id()));
2726    }
2727
2728    #[tokio::test]
2729    async fn test_random_lookup() {
2730        reth_tracing::init_test_tracing();
2731
2732        let config = Discv4Config::builder().build();
2733        let (_discv4, mut service) = create_discv4_with_config(config).await;
2734
2735        let target = PeerId::random();
2736
2737        let id = PeerId::random();
2738        let key = kad_key(id);
2739        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2740
2741        let _ = service.kbuckets.insert_or_update(
2742            &key,
2743            NodeEntry::new_proven(record),
2744            NodeStatus {
2745                direction: ConnectionDirection::Incoming,
2746                state: ConnectionState::Connected,
2747            },
2748        );
2749
2750        service.lookup(target);
2751        assert_eq!(service.pending_find_nodes.len(), 1);
2752
2753        let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2754
2755        assert_eq!(ctx.target(), target);
2756        assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2757
2758        ctx.add_node(record);
2759        assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2760    }
2761
2762    #[tokio::test]
2763    async fn test_reping_on_find_node_failures() {
2764        reth_tracing::init_test_tracing();
2765
2766        let config = Discv4Config::builder().build();
2767        let (_discv4, mut service) = create_discv4_with_config(config).await;
2768
2769        let target = PeerId::random();
2770
2771        let id = PeerId::random();
2772        let key = kad_key(id);
2773        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2774
2775        let mut entry = NodeEntry::new_proven(record);
2776        entry.find_node_failures = u8::MAX;
2777        let _ = service.kbuckets.insert_or_update(
2778            &key,
2779            entry,
2780            NodeStatus {
2781                direction: ConnectionDirection::Incoming,
2782                state: ConnectionState::Connected,
2783            },
2784        );
2785
2786        service.lookup(target);
2787        assert_eq!(service.pending_find_nodes.len(), 0);
2788        assert_eq!(service.pending_pings.len(), 1);
2789
2790        service.update_on_pong(record, None);
2791
2792        service
2793            .on_entry(record.id, |entry| {
2794                // reset on pong
2795                assert_eq!(entry.find_node_failures, 0);
2796                assert!(entry.has_endpoint_proof);
2797            })
2798            .unwrap();
2799    }
2800
2801    #[tokio::test]
2802    async fn test_service_commands() {
2803        reth_tracing::init_test_tracing();
2804
2805        let config = Discv4Config::builder().build();
2806        let (discv4, mut service) = create_discv4_with_config(config).await;
2807
2808        service.lookup_self();
2809
2810        let _handle = service.spawn();
2811        discv4.send_lookup_self();
2812        let _ = discv4.lookup_self().await;
2813    }
2814
2815    #[tokio::test]
2816    async fn test_requests_timeout() {
2817        reth_tracing::init_test_tracing();
2818        let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2819
2820        let config = Discv4Config::builder()
2821            .request_timeout(Duration::from_millis(200))
2822            .ping_expiration(Duration::from_millis(200))
2823            .lookup_neighbours_expiration(Duration::from_millis(200))
2824            .add_eip868_pair("eth", fork_id)
2825            .build();
2826        let (_disv4, mut service) = create_discv4_with_config(config).await;
2827
2828        let id = PeerId::random();
2829        let key = kad_key(id);
2830        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2831
2832        let _ = service.kbuckets.insert_or_update(
2833            &key,
2834            NodeEntry::new_proven(record),
2835            NodeStatus {
2836                direction: ConnectionDirection::Incoming,
2837                state: ConnectionState::Connected,
2838            },
2839        );
2840
2841        service.lookup_self();
2842        assert_eq!(service.pending_find_nodes.len(), 1);
2843
2844        let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2845
2846        service.pending_lookup.insert(record.id, (Instant::now(), ctx));
2847
2848        assert_eq!(service.pending_lookup.len(), 1);
2849
2850        let ping = Ping {
2851            from: service.local_node_record.into(),
2852            to: record.into(),
2853            expire: service.ping_expiration(),
2854            enr_sq: service.enr_seq(),
2855        };
2856        let echo_hash = service.send_packet(Message::Ping(ping), record.udp_addr());
2857        let ping_request = PingRequest {
2858            sent_at: Instant::now(),
2859            node: record,
2860            echo_hash,
2861            reason: PingReason::InitialInsert,
2862        };
2863        service.pending_pings.insert(record.id, ping_request);
2864
2865        assert_eq!(service.pending_pings.len(), 1);
2866
2867        tokio::time::sleep(Duration::from_secs(1)).await;
2868
2869        poll_fn(|cx| {
2870            let _ = service.poll(cx);
2871
2872            assert_eq!(service.pending_find_nodes.len(), 0);
2873            assert_eq!(service.pending_lookup.len(), 0);
2874            assert_eq!(service.pending_pings.len(), 0);
2875
2876            Poll::Ready(())
2877        })
2878        .await;
2879    }
2880
2881    // sends a PING packet with wrong 'to' field and expects a PONG response.
2882    #[tokio::test(flavor = "multi_thread")]
2883    async fn test_check_wrong_to() {
2884        reth_tracing::init_test_tracing();
2885
2886        let config = Discv4Config::builder().external_ip_resolver(None).build();
2887        let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2888        let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2889
2890        // ping node 2 with wrong to field
2891        let mut ping = Ping {
2892            from: service_1.local_node_record.into(),
2893            to: service_2.local_node_record.into(),
2894            expire: service_1.ping_expiration(),
2895            enr_sq: service_1.enr_seq(),
2896        };
2897        ping.to.address = "192.0.2.0".parse().unwrap();
2898
2899        let echo_hash = service_1.send_packet(Message::Ping(ping), service_2.local_addr());
2900        let ping_request = PingRequest {
2901            sent_at: Instant::now(),
2902            node: service_2.local_node_record,
2903            echo_hash,
2904            reason: PingReason::InitialInsert,
2905        };
2906        service_1.pending_pings.insert(*service_2.local_peer_id(), ping_request);
2907
2908        // wait for the processed ping
2909        let event = poll_fn(|cx| service_2.poll(cx)).await;
2910        assert_eq!(event, Discv4Event::Ping);
2911
2912        // we now wait for PONG
2913        let event = poll_fn(|cx| service_1.poll(cx)).await;
2914        assert_eq!(event, Discv4Event::Pong);
2915        // followed by a ping
2916        let event = poll_fn(|cx| service_1.poll(cx)).await;
2917        assert_eq!(event, Discv4Event::Ping);
2918    }
2919
2920    #[tokio::test(flavor = "multi_thread")]
2921    async fn test_check_ping_pong() {
2922        reth_tracing::init_test_tracing();
2923
2924        let config = Discv4Config::builder().external_ip_resolver(None).build();
2925        let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2926        let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2927
2928        // send ping from 1 -> 2
2929        service_1.add_node(service_2.local_node_record);
2930
2931        // wait for the processed ping
2932        let event = poll_fn(|cx| service_2.poll(cx)).await;
2933        assert_eq!(event, Discv4Event::Ping);
2934
2935        // node is now in the table but not connected yet
2936        let key1 = kad_key(*service_1.local_peer_id());
2937        match service_2.kbuckets.entry(&key1) {
2938            kbucket::Entry::Present(_entry, status) => {
2939                assert!(!status.is_connected());
2940            }
2941            _ => unreachable!(),
2942        }
2943
2944        // we now wait for PONG
2945        let event = poll_fn(|cx| service_1.poll(cx)).await;
2946        assert_eq!(event, Discv4Event::Pong);
2947
2948        // endpoint is proven
2949        let key2 = kad_key(*service_2.local_peer_id());
2950        match service_1.kbuckets.entry(&key2) {
2951            kbucket::Entry::Present(_entry, status) => {
2952                assert!(status.is_connected());
2953            }
2954            _ => unreachable!(),
2955        }
2956
2957        // we now wait for the PING initiated by 2
2958        let event = poll_fn(|cx| service_1.poll(cx)).await;
2959        assert_eq!(event, Discv4Event::Ping);
2960
2961        // we now wait for PONG
2962        let event = poll_fn(|cx| service_2.poll(cx)).await;
2963
2964        match event {
2965            Discv4Event::EnrRequest => {
2966                // since we support enr in the ping it may also request the enr
2967                let event = poll_fn(|cx| service_2.poll(cx)).await;
2968                match event {
2969                    Discv4Event::EnrRequest => {
2970                        let event = poll_fn(|cx| service_2.poll(cx)).await;
2971                        assert_eq!(event, Discv4Event::Pong);
2972                    }
2973                    Discv4Event::Pong => {}
2974                    _ => {
2975                        unreachable!()
2976                    }
2977                }
2978            }
2979            Discv4Event::Pong => {}
2980            ev => unreachable!("{ev:?}"),
2981        }
2982
2983        // endpoint is proven
2984        match service_2.kbuckets.entry(&key1) {
2985            kbucket::Entry::Present(_entry, status) => {
2986                assert!(status.is_connected());
2987            }
2988            ev => unreachable!("{ev:?}"),
2989        }
2990    }
2991
2992    #[test]
2993    fn test_insert() {
2994        let local_node_record = rng_record(&mut rand_08::thread_rng());
2995        let mut kbuckets: KBucketsTable<NodeKey, NodeEntry> = KBucketsTable::new(
2996            NodeKey::from(&local_node_record).into(),
2997            Duration::from_secs(60),
2998            MAX_NODES_PER_BUCKET,
2999            None,
3000            None,
3001        );
3002
3003        let new_record = rng_record(&mut rand_08::thread_rng());
3004        let key = kad_key(new_record.id);
3005        match kbuckets.entry(&key) {
3006            kbucket::Entry::Absent(entry) => {
3007                let node = NodeEntry::new(new_record);
3008                let _ = entry.insert(
3009                    node,
3010                    NodeStatus {
3011                        direction: ConnectionDirection::Outgoing,
3012                        state: ConnectionState::Disconnected,
3013                    },
3014                );
3015            }
3016            _ => {
3017                unreachable!()
3018            }
3019        };
3020        match kbuckets.entry(&key) {
3021            kbucket::Entry::Present(_, _) => {}
3022            _ => {
3023                unreachable!()
3024            }
3025        }
3026    }
3027
3028    #[tokio::test]
3029    async fn test_bootnode_not_in_update_stream() {
3030        reth_tracing::init_test_tracing();
3031        let (_, service_1) = create_discv4().await;
3032        let peerid_1 = *service_1.local_peer_id();
3033
3034        let config = Discv4Config::builder().add_boot_node(service_1.local_node_record).build();
3035        service_1.spawn();
3036
3037        let (_, mut service_2) = create_discv4_with_config(config).await;
3038
3039        let mut updates = service_2.update_stream();
3040
3041        service_2.spawn();
3042
3043        // Poll for events for a reasonable time
3044        let mut bootnode_appeared = false;
3045        let timeout = tokio::time::sleep(Duration::from_secs(1));
3046        tokio::pin!(timeout);
3047
3048        loop {
3049            tokio::select! {
3050                Some(update) = updates.next() => {
3051                    if let DiscoveryUpdate::Added(record) = update {
3052                        if record.id == peerid_1 {
3053                            bootnode_appeared = true;
3054                            break;
3055                        }
3056                    }
3057                }
3058                _ = &mut timeout => break,
3059            }
3060        }
3061
3062        // Assert bootnode did not appear in update stream
3063        assert!(bootnode_appeared, "Bootnode should appear in update stream");
3064    }
3065}