reth_discv4/
lib.rs

1//! Discovery v4 implementation: <https://github.com/ethereum/devp2p/blob/master/discv4.md>
2//!
3//! Discv4 employs a kademlia-like routing table to store and manage discovered peers and topics.
4//! The protocol allows for external IP discovery in NAT environments through regular PING/PONG's
5//! with discovered nodes. Nodes return the external IP address that they have received and a simple
6//! majority is chosen as our external IP address. If an external IP address is updated, this is
7//! produced as an event to notify the swarm (if one is used for this behaviour).
8//!
9//! This implementation consists of a [`Discv4`] and [`Discv4Service`] pair. The service manages the
10//! state and drives the UDP socket. The (optional) [`Discv4`] serves as the frontend to interact
11//! with the service via a channel. Whenever the underlying table changes service produces a
12//! [`DiscoveryUpdate`] that listeners will receive.
13//!
14//! ## Feature Flags
15//!
16//! - `serde` (default): Enable serde support
17//! - `test-utils`: Export utilities for testing
18
19#![doc(
20    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
21    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
22    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
23)]
24#![cfg_attr(not(test), warn(unused_crate_dependencies))]
25#![cfg_attr(docsrs, feature(doc_cfg))]
26
27use crate::{
28    error::{DecodePacketError, Discv4Error},
29    proto::{FindNode, Message, Neighbours, Packet, Ping, Pong},
30};
31use alloy_primitives::{bytes::Bytes, hex, B256};
32use discv5::{
33    kbucket,
34    kbucket::{
35        BucketInsertResult, Distance, Entry as BucketEntry, InsertResult, KBucketsTable,
36        NodeStatus, MAX_NODES_PER_BUCKET,
37    },
38    ConnectionDirection, ConnectionState,
39};
40use enr::Enr;
41use itertools::Itertools;
42use parking_lot::Mutex;
43use proto::{EnrRequest, EnrResponse};
44use reth_ethereum_forks::ForkId;
45use reth_network_peers::{pk2id, PeerId};
46use secp256k1::SecretKey;
47use std::{
48    cell::RefCell,
49    collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque},
50    fmt,
51    future::poll_fn,
52    io,
53    net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
54    pin::Pin,
55    rc::Rc,
56    sync::Arc,
57    task::{ready, Context, Poll},
58    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
59};
60use tokio::{
61    net::UdpSocket,
62    sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::Sender as OneshotSender},
63    task::{JoinHandle, JoinSet},
64    time::Interval,
65};
66use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
67use tracing::{debug, trace};
68
69pub mod error;
70pub mod proto;
71
72mod config;
73pub use config::{Discv4Config, Discv4ConfigBuilder};
74
75mod node;
76use node::{kad_key, NodeKey};
77
78mod table;
79
80// reexport NodeRecord primitive
81pub use reth_network_peers::NodeRecord;
82
83#[cfg(any(test, feature = "test-utils"))]
84pub mod test_utils;
85
86use crate::table::PongTable;
87use reth_net_nat::ResolveNatInterval;
88/// reexport to get public ip.
89pub use reth_net_nat::{external_ip, NatResolver};
90
91/// The default address for discv4 via UDP
92///
93/// Note: the default TCP address is the same.
94pub const DEFAULT_DISCOVERY_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
95
96/// The default port for discv4 via UDP
97///
98/// Note: the default TCP port is the same.
99pub const DEFAULT_DISCOVERY_PORT: u16 = 30303;
100
101/// The default address for discv4 via UDP: "0.0.0.0:30303"
102///
103/// Note: The default TCP address is the same.
104pub const DEFAULT_DISCOVERY_ADDRESS: SocketAddr =
105    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT));
106
107/// The maximum size of any packet is 1280 bytes.
108const MAX_PACKET_SIZE: usize = 1280;
109
110/// Length of the UDP datagram packet-header: Hash(32b) + Signature(65b) + Packet Type(1b)
111const MIN_PACKET_SIZE: usize = 32 + 65 + 1;
112
113/// Concurrency factor for `FindNode` requests to pick `ALPHA` closest nodes, <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>
114const ALPHA: usize = 3;
115
116/// Maximum number of nodes to ping at concurrently.
117///
118/// This corresponds to 2 full `Neighbours` responses with 16 _new_ nodes. This will apply some
119/// backpressure in recursive lookups.
120const MAX_NODES_PING: usize = 2 * MAX_NODES_PER_BUCKET;
121
122/// Maximum number of pings to keep queued.
123///
124/// If we are currently sending too many pings, any new pings will be queued. To prevent unbounded
125/// growth of the queue, the queue has a maximum capacity, after which any additional pings will be
126/// discarded.
127///
128/// This corresponds to 2 full `Neighbours` responses with 16 new nodes.
129const MAX_QUEUED_PINGS: usize = 2 * MAX_NODES_PER_BUCKET;
130
131/// The size of the datagram is limited [`MAX_PACKET_SIZE`], 16 nodes, as the discv4 specifies don't
132/// fit in one datagram. The safe number of nodes that always fit in a datagram is 12, with worst
133/// case all of them being IPv6 nodes. This is calculated by `(MAX_PACKET_SIZE - (header + expire +
134/// rlp overhead) / size(rlp(Node_IPv6))`
135/// Even in the best case where all nodes are IPv4, only 14 nodes fit into one packet.
136const SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS: usize = (MAX_PACKET_SIZE - 109) / 91;
137
138/// The timeout used to identify expired nodes, 24h
139///
140/// Mirrors geth's `bondExpiration` of 24h
141const ENDPOINT_PROOF_EXPIRATION: Duration = Duration::from_secs(24 * 60 * 60);
142
143/// Duration used to expire nodes from the routing table 1hr
144const EXPIRE_DURATION: Duration = Duration::from_secs(60 * 60);
145
146// Restricts how many udp messages can be processed in a single [Discv4Service::poll] call.
147//
148// This will act as a manual yield point when draining the socket messages where the most CPU
149// expensive part is handling outgoing messages: encoding and hashing the packet
150const UDP_MESSAGE_POLL_LOOP_BUDGET: i32 = 4;
151
152type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>;
153type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>;
154
155pub(crate) type IngressSender = mpsc::Sender<IngressEvent>;
156pub(crate) type IngressReceiver = mpsc::Receiver<IngressEvent>;
157
158type NodeRecordSender = OneshotSender<Vec<NodeRecord>>;
159
160/// The Discv4 frontend.
161///
162/// This is a cloneable type that communicates with the [`Discv4Service`] by sending commands over a
163/// shared channel.
164///
165/// See also [`Discv4::spawn`]
166#[derive(Debug, Clone)]
167pub struct Discv4 {
168    /// The address of the udp socket
169    local_addr: SocketAddr,
170    /// channel to send commands over to the service
171    to_service: mpsc::UnboundedSender<Discv4Command>,
172    /// Tracks the local node record.
173    ///
174    /// This includes the currently tracked external IP address of the node.
175    node_record: Arc<Mutex<NodeRecord>>,
176}
177
178impl Discv4 {
179    /// Same as [`Self::bind`] but also spawns the service onto a new task.
180    ///
181    /// See also: [`Discv4Service::spawn()`]
182    pub async fn spawn(
183        local_address: SocketAddr,
184        local_enr: NodeRecord,
185        secret_key: SecretKey,
186        config: Discv4Config,
187    ) -> io::Result<Self> {
188        let (discv4, service) = Self::bind(local_address, local_enr, secret_key, config).await?;
189
190        service.spawn();
191
192        Ok(discv4)
193    }
194
195    /// Returns a new instance with the given channel directly
196    ///
197    /// NOTE: this is only intended for test setups.
198    #[cfg(feature = "test-utils")]
199    pub fn noop() -> Self {
200        let (to_service, _rx) = mpsc::unbounded_channel();
201        let local_addr =
202            (IpAddr::from(std::net::Ipv4Addr::UNSPECIFIED), DEFAULT_DISCOVERY_PORT).into();
203        Self {
204            local_addr,
205            to_service,
206            node_record: Arc::new(Mutex::new(NodeRecord::new(
207                "127.0.0.1:3030".parse().unwrap(),
208                PeerId::random(),
209            ))),
210        }
211    }
212
213    /// Binds a new `UdpSocket` and creates the service
214    ///
215    /// ```
216    /// use reth_discv4::{Discv4, Discv4Config};
217    /// use reth_network_peers::{pk2id, NodeRecord, PeerId};
218    /// use secp256k1::SECP256K1;
219    /// use std::{net::SocketAddr, str::FromStr};
220    /// # async fn t() -> std:: io::Result<()> {
221    ///
222    /// // generate a (random) keypair
223    /// let (secret_key, pk) = SECP256K1.generate_keypair(&mut rand_08::thread_rng());
224    /// let id = pk2id(&pk);
225    ///
226    /// let socket = SocketAddr::from_str("0.0.0.0:0").unwrap();
227    /// let local_enr =
228    ///     NodeRecord { address: socket.ip(), tcp_port: socket.port(), udp_port: socket.port(), id };
229    /// let config = Discv4Config::default();
230    ///
231    /// let (discv4, mut service) = Discv4::bind(socket, local_enr, secret_key, config).await.unwrap();
232    ///
233    /// // get an update strea
234    /// let updates = service.update_stream();
235    ///
236    /// let _handle = service.spawn();
237    ///
238    /// // lookup the local node in the DHT
239    /// let _discovered = discv4.lookup_self().await.unwrap();
240    ///
241    /// # Ok(())
242    /// # }
243    /// ```
244    pub async fn bind(
245        local_address: SocketAddr,
246        mut local_node_record: NodeRecord,
247        secret_key: SecretKey,
248        config: Discv4Config,
249    ) -> io::Result<(Self, Discv4Service)> {
250        let socket = UdpSocket::bind(local_address).await?;
251        let local_addr = socket.local_addr()?;
252        local_node_record.udp_port = local_addr.port();
253        trace!(target: "discv4", ?local_addr,"opened UDP socket");
254
255        let mut service =
256            Discv4Service::new(socket, local_addr, local_node_record, secret_key, config);
257
258        // resolve the external address immediately
259        service.resolve_external_ip();
260
261        let discv4 = service.handle();
262        Ok((discv4, service))
263    }
264
265    /// Returns the address of the UDP socket.
266    pub const fn local_addr(&self) -> SocketAddr {
267        self.local_addr
268    }
269
270    /// Returns the [`NodeRecord`] of the local node.
271    ///
272    /// This includes the currently tracked external IP address of the node.
273    pub fn node_record(&self) -> NodeRecord {
274        *self.node_record.lock()
275    }
276
277    /// Returns the currently tracked external IP of the node.
278    pub fn external_ip(&self) -> IpAddr {
279        self.node_record.lock().address
280    }
281
282    /// Sets the [Interval] used for periodically looking up targets over the network
283    pub fn set_lookup_interval(&self, duration: Duration) {
284        self.send_to_service(Discv4Command::SetLookupInterval(duration))
285    }
286
287    /// Starts a `FindNode` recursive lookup that locates the closest nodes to the given node id. See also: <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>
288    ///
289    /// The lookup initiator starts by picking α closest nodes to the target it knows of. The
290    /// initiator then sends concurrent `FindNode` packets to those nodes. α is a system-wide
291    /// concurrency parameter, such as 3. In the recursive step, the initiator resends `FindNode` to
292    /// nodes it has learned about from previous queries. Of the k nodes the initiator has heard of
293    /// closest to the target, it picks α that it has not yet queried and resends `FindNode` to
294    /// them. Nodes that fail to respond quickly are removed from consideration until and unless
295    /// they do respond.
296    //
297    // If a round of FindNode queries fails to return a node any closer than the closest already
298    // seen, the initiator resends the find node to all of the k closest nodes it has not already
299    // queried. The lookup terminates when the initiator has queried and gotten responses from the k
300    // closest nodes it has seen.
301    pub async fn lookup_self(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
302        self.lookup_node(None).await
303    }
304
305    /// Looks up the given node id.
306    ///
307    /// Returning the closest nodes to the given node id.
308    pub async fn lookup(&self, node_id: PeerId) -> Result<Vec<NodeRecord>, Discv4Error> {
309        self.lookup_node(Some(node_id)).await
310    }
311
312    /// Performs a random lookup for node records.
313    pub async fn lookup_random(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
314        let target = PeerId::random();
315        self.lookup_node(Some(target)).await
316    }
317
318    /// Sends a message to the service to lookup the closest nodes
319    pub fn send_lookup(&self, node_id: PeerId) {
320        let cmd = Discv4Command::Lookup { node_id: Some(node_id), tx: None };
321        self.send_to_service(cmd);
322    }
323
324    async fn lookup_node(&self, node_id: Option<PeerId>) -> Result<Vec<NodeRecord>, Discv4Error> {
325        let (tx, rx) = oneshot::channel();
326        let cmd = Discv4Command::Lookup { node_id, tx: Some(tx) };
327        self.to_service.send(cmd)?;
328        Ok(rx.await?)
329    }
330
331    /// Triggers a new self lookup without expecting a response
332    pub fn send_lookup_self(&self) {
333        let cmd = Discv4Command::Lookup { node_id: None, tx: None };
334        self.send_to_service(cmd);
335    }
336
337    /// Removes the peer from the table, if it exists.
338    pub fn remove_peer(&self, node_id: PeerId) {
339        let cmd = Discv4Command::Remove(node_id);
340        self.send_to_service(cmd);
341    }
342
343    /// Adds the node to the table, if it is not already present.
344    pub fn add_node(&self, node_record: NodeRecord) {
345        let cmd = Discv4Command::Add(node_record);
346        self.send_to_service(cmd);
347    }
348
349    /// Adds the peer and id to the ban list.
350    ///
351    /// This will prevent any future inclusion in the table
352    pub fn ban(&self, node_id: PeerId, ip: IpAddr) {
353        let cmd = Discv4Command::Ban(node_id, ip);
354        self.send_to_service(cmd);
355    }
356
357    /// Adds the ip to the ban list.
358    ///
359    /// This will prevent any future inclusion in the table
360    pub fn ban_ip(&self, ip: IpAddr) {
361        let cmd = Discv4Command::BanIp(ip);
362        self.send_to_service(cmd);
363    }
364
365    /// Adds the peer to the ban list.
366    ///
367    /// This will prevent any future inclusion in the table
368    pub fn ban_node(&self, node_id: PeerId) {
369        let cmd = Discv4Command::BanPeer(node_id);
370        self.send_to_service(cmd);
371    }
372
373    /// Sets the tcp port
374    ///
375    /// This will update our [`NodeRecord`]'s tcp port.
376    pub fn set_tcp_port(&self, port: u16) {
377        let cmd = Discv4Command::SetTcpPort(port);
378        self.send_to_service(cmd);
379    }
380
381    /// Sets the pair in the EIP-868 [`Enr`] of the node.
382    ///
383    /// If the key already exists, this will update it.
384    ///
385    /// CAUTION: The value **must** be rlp encoded
386    pub fn set_eip868_rlp_pair(&self, key: Vec<u8>, rlp: Bytes) {
387        let cmd = Discv4Command::SetEIP868RLPPair { key, rlp };
388        self.send_to_service(cmd);
389    }
390
391    /// Sets the pair in the EIP-868 [`Enr`] of the node.
392    ///
393    /// If the key already exists, this will update it.
394    pub fn set_eip868_rlp(&self, key: Vec<u8>, value: impl alloy_rlp::Encodable) {
395        self.set_eip868_rlp_pair(key, Bytes::from(alloy_rlp::encode(&value)))
396    }
397
398    #[inline]
399    fn send_to_service(&self, cmd: Discv4Command) {
400        let _ = self.to_service.send(cmd).map_err(|err| {
401            debug!(
402                target: "discv4",
403                %err,
404                "channel capacity reached, dropping command",
405            )
406        });
407    }
408
409    /// Returns the receiver half of new listener channel that streams [`DiscoveryUpdate`]s.
410    pub async fn update_stream(&self) -> Result<ReceiverStream<DiscoveryUpdate>, Discv4Error> {
411        let (tx, rx) = oneshot::channel();
412        let cmd = Discv4Command::Updates(tx);
413        self.to_service.send(cmd)?;
414        Ok(rx.await?)
415    }
416
417    /// Terminates the spawned [`Discv4Service`].
418    pub fn terminate(&self) {
419        self.send_to_service(Discv4Command::Terminated);
420    }
421}
422
423/// Manages discv4 peer discovery over UDP.
424///
425/// This is a [Stream] to handles incoming and outgoing discv4 messages and emits updates via:
426/// [`Discv4Service::update_stream`].
427///
428/// This type maintains the discv Kademlia routing table and is responsible for performing lookups.
429///
430/// ## Lookups
431///
432/// See also [Recursive Lookups](https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup).
433/// Lookups are either triggered periodically or performaned on demand: [`Discv4::lookup`]
434/// Newly discovered nodes are emitted as [`DiscoveryUpdate::Added`] event to all subscribers:
435/// [`Discv4Service::update_stream`].
436#[must_use = "Stream does nothing unless polled"]
437pub struct Discv4Service {
438    /// Local address of the UDP socket.
439    local_address: SocketAddr,
440    /// The local ENR for EIP-868 <https://eips.ethereum.org/EIPS/eip-868>
441    local_eip_868_enr: Enr<SecretKey>,
442    /// Local ENR of the server.
443    local_node_record: NodeRecord,
444    /// Keeps track of the node record of the local node.
445    shared_node_record: Arc<Mutex<NodeRecord>>,
446    /// The secret key used to sign payloads
447    secret_key: SecretKey,
448    /// The UDP socket for sending and receiving messages.
449    _socket: Arc<UdpSocket>,
450    /// The spawned UDP tasks.
451    ///
452    /// Note: If dropped, the spawned send+receive tasks are aborted.
453    _tasks: JoinSet<()>,
454    /// The routing table.
455    kbuckets: KBucketsTable<NodeKey, NodeEntry>,
456    /// Receiver for incoming messages
457    ///
458    /// Receives incoming messages from the UDP task.
459    ingress: IngressReceiver,
460    /// Sender for sending outgoing messages
461    ///
462    /// Sends outgoind messages to the UDP task.
463    egress: EgressSender,
464    /// Buffered pending pings to apply backpressure.
465    ///
466    /// Lookups behave like bursts of requests: Endpoint proof followed by `FindNode` request. [Recursive lookups](https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup) can trigger multiple followup Pings+FindNode requests.
467    /// A cap on concurrent `Ping` prevents escalation where: A large number of new nodes
468    /// discovered via `FindNode` in a recursive lookup triggers a large number of `Ping`s, and
469    /// followup `FindNode` requests.... Buffering them effectively prevents high `Ping` peaks.
470    queued_pings: VecDeque<(NodeRecord, PingReason)>,
471    /// Currently active pings to specific nodes.
472    pending_pings: HashMap<PeerId, PingRequest>,
473    /// Currently active endpoint proof verification lookups to specific nodes.
474    ///
475    /// Entries here means we've proven the peer's endpoint but haven't completed our end of the
476    /// endpoint proof
477    pending_lookup: HashMap<PeerId, (Instant, LookupContext)>,
478    /// Currently active `FindNode` requests
479    pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
480    /// Currently active ENR requests
481    pending_enr_requests: HashMap<PeerId, EnrRequestState>,
482    /// Copy of he sender half of the commands channel for [Discv4]
483    to_service: mpsc::UnboundedSender<Discv4Command>,
484    /// Receiver half of the commands channel for [Discv4]
485    commands_rx: mpsc::UnboundedReceiver<Discv4Command>,
486    /// All subscribers for table updates
487    update_listeners: Vec<mpsc::Sender<DiscoveryUpdate>>,
488    /// The interval when to trigger random lookups
489    lookup_interval: Interval,
490    /// Used to rotate targets to lookup
491    lookup_rotator: LookupTargetRotator,
492    /// Interval when to recheck active requests
493    evict_expired_requests_interval: Interval,
494    /// Interval when to resend pings.
495    ping_interval: Interval,
496    /// The interval at which to attempt resolving external IP again.
497    resolve_external_ip_interval: Option<ResolveNatInterval>,
498    /// How this services is configured
499    config: Discv4Config,
500    /// Buffered events populated during poll.
501    queued_events: VecDeque<Discv4Event>,
502    /// Keeps track of nodes from which we have received a `Pong` message.
503    received_pongs: PongTable,
504    /// Interval used to expire additionally tracked nodes
505    expire_interval: Interval,
506}
507
508impl Discv4Service {
509    /// Create a new instance for a bound [`UdpSocket`].
510    pub(crate) fn new(
511        socket: UdpSocket,
512        local_address: SocketAddr,
513        local_node_record: NodeRecord,
514        secret_key: SecretKey,
515        config: Discv4Config,
516    ) -> Self {
517        let socket = Arc::new(socket);
518        let (ingress_tx, ingress_rx) = mpsc::channel(config.udp_ingress_message_buffer);
519        let (egress_tx, egress_rx) = mpsc::channel(config.udp_egress_message_buffer);
520        let mut tasks = JoinSet::<()>::new();
521
522        let udp = Arc::clone(&socket);
523        tasks.spawn(receive_loop(udp, ingress_tx, local_node_record.id));
524
525        let udp = Arc::clone(&socket);
526        tasks.spawn(send_loop(udp, egress_rx));
527
528        let kbuckets = KBucketsTable::new(
529            NodeKey::from(&local_node_record).into(),
530            Duration::from_secs(60),
531            MAX_NODES_PER_BUCKET,
532            None,
533            None,
534        );
535
536        let self_lookup_interval = tokio::time::interval(config.lookup_interval);
537
538        // Wait `ping_interval` and then start pinging every `ping_interval` because we want to wait
539        // for
540        let ping_interval = tokio::time::interval_at(
541            tokio::time::Instant::now() + config.ping_interval,
542            config.ping_interval,
543        );
544
545        let evict_expired_requests_interval = tokio::time::interval_at(
546            tokio::time::Instant::now() + config.request_timeout,
547            config.request_timeout,
548        );
549
550        let lookup_rotator = if config.enable_dht_random_walk {
551            LookupTargetRotator::default()
552        } else {
553            LookupTargetRotator::local_only()
554        };
555
556        // for EIP-868 construct an ENR
557        let local_eip_868_enr = {
558            let mut builder = Enr::builder();
559            builder.ip(local_node_record.address);
560            if local_node_record.address.is_ipv4() {
561                builder.udp4(local_node_record.udp_port);
562                builder.tcp4(local_node_record.tcp_port);
563            } else {
564                builder.udp6(local_node_record.udp_port);
565                builder.tcp6(local_node_record.tcp_port);
566            }
567
568            for (key, val) in &config.additional_eip868_rlp_pairs {
569                builder.add_value_rlp(key, val.clone());
570            }
571            builder.build(&secret_key).expect("v4 is set")
572        };
573
574        let (to_service, commands_rx) = mpsc::unbounded_channel();
575
576        let shared_node_record = Arc::new(Mutex::new(local_node_record));
577
578        Self {
579            local_address,
580            local_eip_868_enr,
581            local_node_record,
582            shared_node_record,
583            _socket: socket,
584            kbuckets,
585            secret_key,
586            _tasks: tasks,
587            ingress: ingress_rx,
588            egress: egress_tx,
589            queued_pings: VecDeque::with_capacity(MAX_QUEUED_PINGS),
590            pending_pings: Default::default(),
591            pending_lookup: Default::default(),
592            pending_find_nodes: Default::default(),
593            pending_enr_requests: Default::default(),
594            commands_rx,
595            to_service,
596            update_listeners: Vec::with_capacity(1),
597            lookup_interval: self_lookup_interval,
598            ping_interval,
599            evict_expired_requests_interval,
600            lookup_rotator,
601            resolve_external_ip_interval: config.resolve_external_ip_interval(),
602            config,
603            queued_events: Default::default(),
604            received_pongs: Default::default(),
605            expire_interval: tokio::time::interval(EXPIRE_DURATION),
606        }
607    }
608
609    /// Returns the frontend handle that can communicate with the service via commands.
610    pub fn handle(&self) -> Discv4 {
611        Discv4 {
612            local_addr: self.local_address,
613            to_service: self.to_service.clone(),
614            node_record: self.shared_node_record.clone(),
615        }
616    }
617
618    /// Returns the current enr sequence of the local record.
619    fn enr_seq(&self) -> Option<u64> {
620        self.config.enable_eip868.then(|| self.local_eip_868_enr.seq())
621    }
622
623    /// Sets the [Interval] used for periodically looking up targets over the network
624    pub fn set_lookup_interval(&mut self, duration: Duration) {
625        self.lookup_interval = tokio::time::interval(duration);
626    }
627
628    /// Sets the external Ip to the configured external IP if [`NatResolver::ExternalIp`] or
629    /// [`NatResolver::ExternalAddr`]. In the case of [`NatResolver::ExternalAddr`], it will return
630    /// the first IP address found for the domain associated with the discv4 UDP port.
631    fn resolve_external_ip(&mut self) {
632        if let Some(r) = &self.resolve_external_ip_interval &&
633            let Some(external_ip) =
634                r.resolver().clone().as_external_ip(self.local_node_record.udp_port)
635        {
636            self.set_external_ip_addr(external_ip);
637        }
638    }
639
640    /// Sets the given ip address as the node's external IP in the node record announced in
641    /// discovery
642    pub fn set_external_ip_addr(&mut self, external_ip: IpAddr) {
643        if self.local_node_record.address != external_ip {
644            debug!(target: "discv4", ?external_ip, "Updating external ip");
645            self.local_node_record.address = external_ip;
646            let _ = self.local_eip_868_enr.set_ip(external_ip, &self.secret_key);
647            let mut lock = self.shared_node_record.lock();
648            *lock = self.local_node_record;
649            debug!(target: "discv4", enr=?self.local_eip_868_enr, "Updated local ENR");
650        }
651    }
652
653    /// Returns the [`PeerId`] that identifies this node
654    pub const fn local_peer_id(&self) -> &PeerId {
655        &self.local_node_record.id
656    }
657
658    /// Returns the address of the UDP socket
659    pub const fn local_addr(&self) -> SocketAddr {
660        self.local_address
661    }
662
663    /// Returns the ENR of this service.
664    ///
665    /// Note: this will include the external address if resolved.
666    pub const fn local_enr(&self) -> NodeRecord {
667        self.local_node_record
668    }
669
670    /// Returns mutable reference to ENR for testing.
671    #[cfg(test)]
672    pub const fn local_enr_mut(&mut self) -> &mut NodeRecord {
673        &mut self.local_node_record
674    }
675
676    /// Returns true if the given `PeerId` is currently in the bucket
677    pub fn contains_node(&self, id: PeerId) -> bool {
678        let key = kad_key(id);
679        self.kbuckets.get_index(&key).is_some()
680    }
681
682    /// Bootstraps the local node to join the DHT.
683    ///
684    /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
685    /// own ID in the DHT. This introduces the local node to the other nodes
686    /// in the DHT and populates its routing table with the closest proven neighbours.
687    ///
688    /// This is similar to adding all bootnodes via [`Self::add_node`], but does not fire a
689    /// [`DiscoveryUpdate::Added`] event for the given bootnodes. So boot nodes don't appear in the
690    /// update stream, which is usually desirable, since bootnodes should not be connected to.
691    ///
692    /// If adding the configured bootnodes should result in a [`DiscoveryUpdate::Added`], see
693    /// [`Self::add_all_nodes`].
694    ///
695    /// **Note:** This is a noop if there are no bootnodes.
696    pub fn bootstrap(&mut self) {
697        for record in self.config.bootstrap_nodes.clone() {
698            debug!(target: "discv4", ?record, "pinging boot node");
699            let key = kad_key(record.id);
700            let entry = NodeEntry::new(record);
701
702            // insert the boot node in the table
703            match self.kbuckets.insert_or_update(
704                &key,
705                entry,
706                NodeStatus {
707                    state: ConnectionState::Disconnected,
708                    direction: ConnectionDirection::Outgoing,
709                },
710            ) {
711                InsertResult::Failed(_) => {}
712                _ => {
713                    self.try_ping(record, PingReason::InitialInsert);
714                }
715            }
716        }
717    }
718
719    /// Spawns this services onto a new task
720    ///
721    /// Note: requires a running tokio runtime
722    pub fn spawn(mut self) -> JoinHandle<()> {
723        tokio::task::spawn(async move {
724            self.bootstrap();
725
726            while let Some(event) = self.next().await {
727                trace!(target: "discv4", ?event, "processed");
728            }
729            trace!(target: "discv4", "service terminated");
730        })
731    }
732
733    /// Creates a new bounded channel for [`DiscoveryUpdate`]s.
734    pub fn update_stream(&mut self) -> ReceiverStream<DiscoveryUpdate> {
735        let (tx, rx) = mpsc::channel(512);
736        self.update_listeners.push(tx);
737        ReceiverStream::new(rx)
738    }
739
740    /// Looks up the local node in the DHT.
741    pub fn lookup_self(&mut self) {
742        self.lookup(self.local_node_record.id)
743    }
744
745    /// Looks up the given node in the DHT
746    ///
747    /// A `FindNode` packet requests information about nodes close to target. The target is a
748    /// 64-byte secp256k1 public key. When `FindNode` is received, the recipient should reply
749    /// with Neighbors packets containing the closest 16 nodes to target found in its local
750    /// table.
751    //
752    // To guard against traffic amplification attacks, Neighbors replies should only be sent if the
753    // sender of FindNode has been verified by the endpoint proof procedure.
754    pub fn lookup(&mut self, target: PeerId) {
755        self.lookup_with(target, None)
756    }
757
758    /// Starts the recursive lookup process for the given target, <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>.
759    ///
760    /// At first the `ALPHA` (==3, defined concurrency factor) nodes that are closest to the target
761    /// in the underlying DHT are selected to seed the lookup via `FindNode` requests. In the
762    /// recursive step, the initiator resends `FindNode` to nodes it has learned about from previous
763    /// queries.
764    ///
765    /// This takes an optional Sender through which all successfully discovered nodes are sent once
766    /// the request has finished.
767    fn lookup_with(&mut self, target: PeerId, tx: Option<NodeRecordSender>) {
768        trace!(target: "discv4", ?target, "Starting lookup");
769        let target_key = kad_key(target);
770
771        // Start a lookup context with the 16 (MAX_NODES_PER_BUCKET) closest nodes to which we have
772        // a valid endpoint proof
773        let ctx = LookupContext::new(
774            target_key.clone(),
775            self.kbuckets
776                .closest_values(&target_key)
777                .filter(|node| {
778                    node.value.has_endpoint_proof &&
779                        !self.pending_find_nodes.contains_key(&node.key.preimage().0)
780                })
781                .take(MAX_NODES_PER_BUCKET)
782                .map(|n| (target_key.distance(&n.key), n.value.record)),
783            tx,
784        );
785
786        // From those 16, pick the 3 closest to start the concurrent lookup.
787        let closest = ctx.closest(ALPHA);
788
789        if closest.is_empty() && self.pending_find_nodes.is_empty() {
790            // no closest nodes, and no lookup in progress: table is empty.
791            // This could happen if all records were deleted from the table due to missed pongs
792            // (e.g. connectivity problems over a long period of time, or issues during initial
793            // bootstrapping) so we attempt to bootstrap again
794            self.bootstrap();
795            return
796        }
797
798        trace!(target: "discv4", ?target, num = closest.len(), "Start lookup closest nodes");
799
800        for node in closest {
801            // here we still want to check against previous request failures and if necessary
802            // re-establish a new endpoint proof because it can be the case that the other node lost
803            // our entry and no longer has an endpoint proof on their end
804            self.find_node_checked(&node, ctx.clone());
805        }
806    }
807
808    /// Sends a new `FindNode` packet to the node with `target` as the lookup target.
809    ///
810    /// CAUTION: This expects there's a valid Endpoint proof to the given `node`.
811    fn find_node(&mut self, node: &NodeRecord, ctx: LookupContext) {
812        trace!(target: "discv4", ?node, lookup=?ctx.target(), "Sending FindNode");
813        ctx.mark_queried(node.id);
814        let id = ctx.target();
815        let msg = Message::FindNode(FindNode { id, expire: self.find_node_expiration() });
816        self.send_packet(msg, node.udp_addr());
817        self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx));
818    }
819
820    /// Sends a new `FindNode` packet to the node with `target` as the lookup target but checks
821    /// whether we should send a new ping first to renew the endpoint proof by checking the
822    /// previously failed findNode requests. It could be that the node is no longer reachable or
823    /// lost our entry.
824    fn find_node_checked(&mut self, node: &NodeRecord, ctx: LookupContext) {
825        let max_failures = self.config.max_find_node_failures;
826        let needs_ping = self
827            .on_entry(node.id, |entry| entry.exceeds_find_node_failures(max_failures))
828            .unwrap_or(true);
829        if needs_ping {
830            self.try_ping(*node, PingReason::Lookup(*node, ctx))
831        } else {
832            self.find_node(node, ctx)
833        }
834    }
835
836    /// Notifies all listeners.
837    ///
838    /// Removes all listeners that are closed.
839    fn notify(&mut self, update: DiscoveryUpdate) {
840        self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) {
841            Ok(()) => true,
842            Err(err) => match err {
843                TrySendError::Full(_) => true,
844                TrySendError::Closed(_) => false,
845            },
846        });
847    }
848
849    /// Adds the ip to the ban list indefinitely
850    pub fn ban_ip(&mut self, ip: IpAddr) {
851        self.config.ban_list.ban_ip(ip);
852    }
853
854    /// Adds the peer to the ban list indefinitely.
855    pub fn ban_node(&mut self, node_id: PeerId) {
856        self.remove_node(node_id);
857        self.config.ban_list.ban_peer(node_id);
858    }
859
860    /// Adds the ip to the ban list until the given timestamp.
861    pub fn ban_ip_until(&mut self, ip: IpAddr, until: Instant) {
862        self.config.ban_list.ban_ip_until(ip, until);
863    }
864
865    /// Adds the peer to the ban list and bans it until the given timestamp
866    pub fn ban_node_until(&mut self, node_id: PeerId, until: Instant) {
867        self.remove_node(node_id);
868        self.config.ban_list.ban_peer_until(node_id, until);
869    }
870
871    /// Removes a `node_id` from the routing table.
872    ///
873    /// This allows applications, for whatever reason, to remove nodes from the local routing
874    /// table. Returns `true` if the node was in the table and `false` otherwise.
875    pub fn remove_node(&mut self, node_id: PeerId) -> bool {
876        let key = kad_key(node_id);
877        self.remove_key(node_id, key)
878    }
879
880    /// Removes a `node_id` from the routing table but only if there are enough other nodes in the
881    /// bucket (bucket must be at least half full)
882    ///
883    /// Returns `true` if the node was removed
884    pub fn soft_remove_node(&mut self, node_id: PeerId) -> bool {
885        let key = kad_key(node_id);
886        let Some(bucket) = self.kbuckets.get_bucket(&key) else { return false };
887        if bucket.num_entries() < MAX_NODES_PER_BUCKET / 2 {
888            // skip half empty bucket
889            return false
890        }
891        self.remove_key(node_id, key)
892    }
893
894    fn remove_key(&mut self, node_id: PeerId, key: discv5::Key<NodeKey>) -> bool {
895        let removed = self.kbuckets.remove(&key);
896        if removed {
897            trace!(target: "discv4", ?node_id, "removed node");
898            self.notify(DiscoveryUpdate::Removed(node_id));
899        }
900        removed
901    }
902
903    /// Gets the number of entries that are considered connected.
904    pub fn num_connected(&self) -> usize {
905        self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected())
906    }
907
908    /// Check if the peer has an active bond.
909    fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool {
910        if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) &&
911            timestamp.elapsed() < self.config.bond_expiration
912        {
913            return true
914        }
915        false
916    }
917
918    /// Applies a closure on the pending or present [`NodeEntry`].
919    fn on_entry<F, R>(&mut self, peer_id: PeerId, f: F) -> Option<R>
920    where
921        F: FnOnce(&NodeEntry) -> R,
922    {
923        let key = kad_key(peer_id);
924        match self.kbuckets.entry(&key) {
925            BucketEntry::Present(entry, _) => Some(f(entry.value())),
926            BucketEntry::Pending(mut entry, _) => Some(f(entry.value())),
927            _ => None,
928        }
929    }
930
931    /// Update the entry on RE-ping.
932    ///
933    /// Invoked when we received the Pong to our [`PingReason::RePing`] ping.
934    ///
935    /// On re-ping we check for a changed `enr_seq` if eip868 is enabled and when it changed we sent
936    /// a followup request to retrieve the updated ENR
937    fn update_on_reping(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
938        if record.id == self.local_node_record.id {
939            return
940        }
941
942        // If EIP868 extension is disabled then we want to ignore this
943        if !self.config.enable_eip868 {
944            last_enr_seq = None;
945        }
946
947        let key = kad_key(record.id);
948        let old_enr = match self.kbuckets.entry(&key) {
949            kbucket::Entry::Present(mut entry, _) => {
950                entry.value_mut().update_with_enr(last_enr_seq)
951            }
952            kbucket::Entry::Pending(mut entry, _) => entry.value().update_with_enr(last_enr_seq),
953            _ => return,
954        };
955
956        // Check if ENR was updated
957        match (last_enr_seq, old_enr) {
958            (Some(new), Some(old)) => {
959                if new > old {
960                    self.send_enr_request(record);
961                }
962            }
963            (Some(_), None) => {
964                // got an ENR
965                self.send_enr_request(record);
966            }
967            _ => {}
968        };
969    }
970
971    /// Callback invoked when we receive a pong from the peer.
972    fn update_on_pong(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
973        if record.id == *self.local_peer_id() {
974            return
975        }
976
977        // If EIP868 extension is disabled then we want to ignore this
978        if !self.config.enable_eip868 {
979            last_enr_seq = None;
980        }
981
982        // if the peer included a enr seq in the pong then we can try to request the ENR of that
983        // node
984        let has_enr_seq = last_enr_seq.is_some();
985
986        let key = kad_key(record.id);
987        match self.kbuckets.entry(&key) {
988            kbucket::Entry::Present(mut entry, old_status) => {
989                // endpoint is now proven
990                entry.value_mut().establish_proof();
991                entry.value_mut().update_with_enr(last_enr_seq);
992
993                if !old_status.is_connected() {
994                    let _ = entry.update(ConnectionState::Connected, Some(old_status.direction));
995                    trace!(target: "discv4", ?record, "added after successful endpoint proof");
996                    self.notify(DiscoveryUpdate::Added(record));
997
998                    if has_enr_seq {
999                        // request the ENR of the node
1000                        self.send_enr_request(record);
1001                    }
1002                }
1003            }
1004            kbucket::Entry::Pending(mut entry, mut status) => {
1005                // endpoint is now proven
1006                entry.value().establish_proof();
1007                entry.value().update_with_enr(last_enr_seq);
1008
1009                if !status.is_connected() {
1010                    status.state = ConnectionState::Connected;
1011                    let _ = entry.update(status);
1012                    trace!(target: "discv4", ?record, "added after successful endpoint proof");
1013                    self.notify(DiscoveryUpdate::Added(record));
1014
1015                    if has_enr_seq {
1016                        // request the ENR of the node
1017                        self.send_enr_request(record);
1018                    }
1019                }
1020            }
1021            _ => {}
1022        };
1023    }
1024
1025    /// Adds all nodes
1026    ///
1027    /// See [`Self::add_node`]
1028    pub fn add_all_nodes(&mut self, records: impl IntoIterator<Item = NodeRecord>) {
1029        for record in records {
1030            self.add_node(record);
1031        }
1032    }
1033
1034    /// If the node's not in the table yet, this will add it to the table and start the endpoint
1035    /// proof by sending a ping to the node.
1036    ///
1037    /// Returns `true` if the record was added successfully, and `false` if the node is either
1038    /// already in the table or the record's bucket is full.
1039    pub fn add_node(&mut self, record: NodeRecord) -> bool {
1040        let key = kad_key(record.id);
1041        match self.kbuckets.entry(&key) {
1042            kbucket::Entry::Absent(entry) => {
1043                let node = NodeEntry::new(record);
1044                match entry.insert(
1045                    node,
1046                    NodeStatus {
1047                        direction: ConnectionDirection::Outgoing,
1048                        state: ConnectionState::Disconnected,
1049                    },
1050                ) {
1051                    BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1052                        trace!(target: "discv4", ?record, "inserted new record");
1053                    }
1054                    _ => return false,
1055                }
1056            }
1057            _ => return false,
1058        }
1059
1060        // send the initial ping to the _new_ node
1061        self.try_ping(record, PingReason::InitialInsert);
1062        true
1063    }
1064
1065    /// Encodes the packet, sends it and returns the hash.
1066    pub(crate) fn send_packet(&self, msg: Message, to: SocketAddr) -> B256 {
1067        let (payload, hash) = msg.encode(&self.secret_key);
1068        trace!(target: "discv4", r#type=?msg.msg_type(), ?to, ?hash, "sending packet");
1069        let _ = self.egress.try_send((payload, to)).map_err(|err| {
1070            debug!(
1071                target: "discv4",
1072                %err,
1073                "dropped outgoing packet",
1074            );
1075        });
1076        hash
1077    }
1078
1079    /// Message handler for an incoming `Ping`
1080    fn on_ping(&mut self, ping: Ping, remote_addr: SocketAddr, remote_id: PeerId, hash: B256) {
1081        if self.is_expired(ping.expire) {
1082            // ping's expiration timestamp is in the past
1083            return
1084        }
1085
1086        // create the record
1087        let record = NodeRecord {
1088            address: remote_addr.ip(),
1089            udp_port: remote_addr.port(),
1090            tcp_port: ping.from.tcp_port,
1091            id: remote_id,
1092        }
1093        .into_ipv4_mapped();
1094
1095        let key = kad_key(record.id);
1096
1097        // See also <https://github.com/ethereum/devp2p/blob/master/discv4.md#ping-packet-0x01>:
1098        // > If no communication with the sender of this ping has occurred within the last 12h, a
1099        // > ping should be sent in addition to pong in order to receive an endpoint proof.
1100        //
1101        // Note: we only mark if the node is absent because the `last 12h` condition is handled by
1102        // the ping interval
1103        let mut is_new_insert = false;
1104        let mut needs_bond = false;
1105        let mut is_proven = false;
1106
1107        let old_enr = match self.kbuckets.entry(&key) {
1108            kbucket::Entry::Present(mut entry, _) => {
1109                if entry.value().is_expired() {
1110                    // If no communication with the sender has occurred within the last 12h, a ping
1111                    // should be sent in addition to pong in order to receive an endpoint proof.
1112                    needs_bond = true;
1113                } else {
1114                    is_proven = entry.value().has_endpoint_proof;
1115                }
1116                entry.value_mut().update_with_enr(ping.enr_sq)
1117            }
1118            kbucket::Entry::Pending(mut entry, _) => {
1119                if entry.value().is_expired() {
1120                    // If no communication with the sender has occurred within the last 12h, a ping
1121                    // should be sent in addition to pong in order to receive an endpoint proof.
1122                    needs_bond = true;
1123                } else {
1124                    is_proven = entry.value().has_endpoint_proof;
1125                }
1126                entry.value().update_with_enr(ping.enr_sq)
1127            }
1128            kbucket::Entry::Absent(entry) => {
1129                let mut node = NodeEntry::new(record);
1130                node.last_enr_seq = ping.enr_sq;
1131
1132                match entry.insert(
1133                    node,
1134                    NodeStatus {
1135                        direction: ConnectionDirection::Incoming,
1136                        // mark as disconnected until endpoint proof established on pong
1137                        state: ConnectionState::Disconnected,
1138                    },
1139                ) {
1140                    BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1141                        // mark as new insert if insert was successful
1142                        is_new_insert = true;
1143                    }
1144                    BucketInsertResult::Full => {
1145                        // we received a ping but the corresponding bucket for the peer is already
1146                        // full, we can't add any additional peers to that bucket, but we still want
1147                        // to emit an event that we discovered the node
1148                        trace!(target: "discv4", ?record, "discovered new record but bucket is full");
1149                        self.notify(DiscoveryUpdate::DiscoveredAtCapacity(record));
1150                        needs_bond = true;
1151                    }
1152                    BucketInsertResult::TooManyIncoming | BucketInsertResult::NodeExists => {
1153                        needs_bond = true;
1154                        // insert unsuccessful but we still want to send the pong
1155                    }
1156                    BucketInsertResult::FailedFilter => return,
1157                }
1158
1159                None
1160            }
1161            kbucket::Entry::SelfEntry => return,
1162        };
1163
1164        // send the pong first, but the PONG and optionally PING don't need to be send in a
1165        // particular order
1166        let pong = Message::Pong(Pong {
1167            // we use the actual address of the peer
1168            to: record.into(),
1169            echo: hash,
1170            expire: ping.expire,
1171            enr_sq: self.enr_seq(),
1172        });
1173        self.send_packet(pong, remote_addr);
1174
1175        // if node was absent also send a ping to establish the endpoint proof from our end
1176        if is_new_insert {
1177            self.try_ping(record, PingReason::InitialInsert);
1178        } else if needs_bond {
1179            self.try_ping(record, PingReason::EstablishBond);
1180        } else if is_proven {
1181            // if node has been proven, this means we've received a pong and verified its endpoint
1182            // proof. We've also sent a pong above to verify our endpoint proof, so we can now
1183            // send our find_nodes request if PingReason::Lookup
1184            if let Some((_, ctx)) = self.pending_lookup.remove(&record.id) {
1185                if self.pending_find_nodes.contains_key(&record.id) {
1186                    // there's already another pending request, unmark it so the next round can
1187                    // try to send it
1188                    ctx.unmark_queried(record.id);
1189                } else {
1190                    // we just received a ping from that peer so we can send a find node request
1191                    // directly
1192                    self.find_node(&record, ctx);
1193                }
1194            }
1195        } else {
1196            // Request ENR if included in the ping
1197            match (ping.enr_sq, old_enr) {
1198                (Some(new), Some(old)) => {
1199                    if new > old {
1200                        self.send_enr_request(record);
1201                    }
1202                }
1203                (Some(_), None) => {
1204                    self.send_enr_request(record);
1205                }
1206                _ => {}
1207            };
1208        }
1209    }
1210
1211    // Guarding function for [`Self::send_ping`] that applies pre-checks
1212    fn try_ping(&mut self, node: NodeRecord, reason: PingReason) {
1213        if node.id == *self.local_peer_id() {
1214            // don't ping ourselves
1215            return
1216        }
1217
1218        if self.pending_pings.contains_key(&node.id) ||
1219            self.pending_find_nodes.contains_key(&node.id)
1220        {
1221            return
1222        }
1223
1224        if self.queued_pings.iter().any(|(n, _)| n.id == node.id) {
1225            return
1226        }
1227
1228        if self.pending_pings.len() < MAX_NODES_PING {
1229            self.send_ping(node, reason);
1230        } else if self.queued_pings.len() < MAX_QUEUED_PINGS {
1231            self.queued_pings.push_back((node, reason));
1232        }
1233    }
1234
1235    /// Sends a ping message to the node's UDP address.
1236    ///
1237    /// Returns the echo hash of the ping message.
1238    pub(crate) fn send_ping(&mut self, node: NodeRecord, reason: PingReason) -> B256 {
1239        let remote_addr = node.udp_addr();
1240        let id = node.id;
1241        let ping = Ping {
1242            from: self.local_node_record.into(),
1243            to: node.into(),
1244            expire: self.ping_expiration(),
1245            enr_sq: self.enr_seq(),
1246        };
1247        trace!(target: "discv4", ?ping, "sending ping");
1248        let echo_hash = self.send_packet(Message::Ping(ping), remote_addr);
1249
1250        self.pending_pings
1251            .insert(id, PingRequest { sent_at: Instant::now(), node, echo_hash, reason });
1252        echo_hash
1253    }
1254
1255    /// Sends an enr request message to the node's UDP address.
1256    ///
1257    /// Returns the echo hash of the ping message.
1258    pub(crate) fn send_enr_request(&mut self, node: NodeRecord) {
1259        if !self.config.enable_eip868 {
1260            return
1261        }
1262        let remote_addr = node.udp_addr();
1263        let enr_request = EnrRequest { expire: self.enr_request_expiration() };
1264
1265        trace!(target: "discv4", ?enr_request, "sending enr request");
1266        let echo_hash = self.send_packet(Message::EnrRequest(enr_request), remote_addr);
1267
1268        self.pending_enr_requests
1269            .insert(node.id, EnrRequestState { sent_at: Instant::now(), echo_hash });
1270    }
1271
1272    /// Message handler for an incoming `Pong`.
1273    fn on_pong(&mut self, pong: Pong, remote_addr: SocketAddr, remote_id: PeerId) {
1274        if self.is_expired(pong.expire) {
1275            return
1276        }
1277
1278        let PingRequest { node, reason, .. } = match self.pending_pings.entry(remote_id) {
1279            Entry::Occupied(entry) => {
1280                {
1281                    let request = entry.get();
1282                    if request.echo_hash != pong.echo {
1283                        trace!(target: "discv4", from=?remote_addr, expected=?request.echo_hash, echo_hash=?pong.echo,"Got unexpected Pong");
1284                        return
1285                    }
1286                }
1287                entry.remove()
1288            }
1289            Entry::Vacant(_) => return,
1290        };
1291
1292        // keep track of the pong
1293        self.received_pongs.on_pong(remote_id, remote_addr.ip());
1294
1295        match reason {
1296            PingReason::InitialInsert => {
1297                self.update_on_pong(node, pong.enr_sq);
1298            }
1299            PingReason::EstablishBond => {
1300                // same as `InitialInsert` which renews the bond if the peer is in the table
1301                self.update_on_pong(node, pong.enr_sq);
1302            }
1303            PingReason::RePing => {
1304                self.update_on_reping(node, pong.enr_sq);
1305            }
1306            PingReason::Lookup(node, ctx) => {
1307                self.update_on_pong(node, pong.enr_sq);
1308                // insert node and assoc. lookup_context into the pending_lookup table to complete
1309                // our side of the endpoint proof verification.
1310                // Start the lookup timer here - and evict accordingly. Note that this is a separate
1311                // timer than the ping_request timer.
1312                self.pending_lookup.insert(node.id, (Instant::now(), ctx));
1313            }
1314        }
1315    }
1316
1317    /// Handler for an incoming `FindNode` message
1318    fn on_find_node(&mut self, msg: FindNode, remote_addr: SocketAddr, node_id: PeerId) {
1319        if self.is_expired(msg.expire) {
1320            // expiration timestamp is in the past
1321            return
1322        }
1323        if node_id == *self.local_peer_id() {
1324            // ignore find node requests to ourselves
1325            return
1326        }
1327
1328        if self.has_bond(node_id, remote_addr.ip()) {
1329            self.respond_closest(msg.id, remote_addr)
1330        }
1331    }
1332
1333    /// Handler for incoming `EnrResponse` message
1334    fn on_enr_response(&mut self, msg: EnrResponse, remote_addr: SocketAddr, id: PeerId) {
1335        trace!(target: "discv4", ?remote_addr, ?msg, "received ENR response");
1336        if let Some(resp) = self.pending_enr_requests.remove(&id) {
1337            // ensure the ENR's public key matches the expected node id
1338            let enr_id = pk2id(&msg.enr.public_key());
1339            if id != enr_id {
1340                return
1341            }
1342
1343            if resp.echo_hash == msg.request_hash {
1344                let key = kad_key(id);
1345                let fork_id = msg.eth_fork_id();
1346                let (record, old_fork_id) = match self.kbuckets.entry(&key) {
1347                    kbucket::Entry::Present(mut entry, _) => {
1348                        let id = entry.value_mut().update_with_fork_id(fork_id);
1349                        (entry.value().record, id)
1350                    }
1351                    kbucket::Entry::Pending(mut entry, _) => {
1352                        let id = entry.value().update_with_fork_id(fork_id);
1353                        (entry.value().record, id)
1354                    }
1355                    _ => return,
1356                };
1357                match (fork_id, old_fork_id) {
1358                    (Some(new), Some(old)) => {
1359                        if new != old {
1360                            self.notify(DiscoveryUpdate::EnrForkId(record, new))
1361                        }
1362                    }
1363                    (Some(new), None) => self.notify(DiscoveryUpdate::EnrForkId(record, new)),
1364                    _ => {}
1365                }
1366            }
1367        }
1368    }
1369
1370    /// Handler for incoming `EnrRequest` message
1371    fn on_enr_request(
1372        &self,
1373        msg: EnrRequest,
1374        remote_addr: SocketAddr,
1375        id: PeerId,
1376        request_hash: B256,
1377    ) {
1378        if !self.config.enable_eip868 || self.is_expired(msg.expire) {
1379            return
1380        }
1381
1382        if self.has_bond(id, remote_addr.ip()) {
1383            self.send_packet(
1384                Message::EnrResponse(EnrResponse {
1385                    request_hash,
1386                    enr: self.local_eip_868_enr.clone(),
1387                }),
1388                remote_addr,
1389            );
1390        }
1391    }
1392
1393    /// Handler for incoming `Neighbours` messages that are handled if they're responses to
1394    /// `FindNode` requests.
1395    fn on_neighbours(&mut self, msg: Neighbours, remote_addr: SocketAddr, node_id: PeerId) {
1396        if self.is_expired(msg.expire) {
1397            // response is expired
1398            return
1399        }
1400        // check if this request was expected
1401        let ctx = match self.pending_find_nodes.entry(node_id) {
1402            Entry::Occupied(mut entry) => {
1403                {
1404                    let request = entry.get_mut();
1405                    // Mark the request as answered
1406                    request.answered = true;
1407                    let total = request.response_count + msg.nodes.len();
1408
1409                    // Neighbours response is exactly 1 bucket (16 entries).
1410                    if total <= MAX_NODES_PER_BUCKET {
1411                        request.response_count = total;
1412                    } else {
1413                        trace!(target: "discv4", total, from=?remote_addr, "Received neighbors packet entries exceeds max nodes per bucket");
1414                        return
1415                    }
1416                };
1417
1418                if entry.get().response_count == MAX_NODES_PER_BUCKET {
1419                    // node responding with a full bucket of records
1420                    let ctx = entry.remove().lookup_context;
1421                    ctx.mark_responded(node_id);
1422                    ctx
1423                } else {
1424                    entry.get().lookup_context.clone()
1425                }
1426            }
1427            Entry::Vacant(_) => {
1428                // received neighbours response without requesting it
1429                trace!(target: "discv4", from=?remote_addr, "Received unsolicited Neighbours");
1430                return
1431            }
1432        };
1433
1434        // log the peers we discovered
1435        trace!(target: "discv4",
1436            target=format!("{:#?}", node_id),
1437            peers_count=msg.nodes.len(),
1438            peers=format!("[{:#}]", msg.nodes.iter()
1439                .map(|node_rec| node_rec.id
1440            ).format(", ")),
1441            "Received peers from Neighbours packet"
1442        );
1443
1444        // This is the recursive lookup step where we initiate new FindNode requests for new nodes
1445        // that were discovered.
1446        for node in msg.nodes.into_iter().map(NodeRecord::into_ipv4_mapped) {
1447            // prevent banned peers from being added to the context
1448            if self.config.ban_list.is_banned(&node.id, &node.address) {
1449                trace!(target: "discv4", peer_id=?node.id, ip=?node.address, "ignoring banned record");
1450                continue
1451            }
1452
1453            ctx.add_node(node);
1454        }
1455
1456        // get the next closest nodes, not yet queried nodes and start over.
1457        let closest =
1458            ctx.filter_closest(ALPHA, |node| !self.pending_find_nodes.contains_key(&node.id));
1459
1460        for closest in closest {
1461            let key = kad_key(closest.id);
1462            match self.kbuckets.entry(&key) {
1463                BucketEntry::Absent(entry) => {
1464                    // the node's endpoint is not proven yet, so we need to ping it first, on
1465                    // success, we will add the node to the pending_lookup table, and wait to send
1466                    // back a Pong before initiating a FindNode request.
1467                    // In order to prevent that this node is selected again on subsequent responses,
1468                    // while the ping is still active, we always mark it as queried.
1469                    ctx.mark_queried(closest.id);
1470                    let node = NodeEntry::new(closest);
1471                    match entry.insert(
1472                        node,
1473                        NodeStatus {
1474                            direction: ConnectionDirection::Outgoing,
1475                            state: ConnectionState::Disconnected,
1476                        },
1477                    ) {
1478                        BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1479                            // only ping if the node was added to the table
1480                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1481                        }
1482                        BucketInsertResult::Full => {
1483                            // new node but the node's bucket is already full
1484                            self.notify(DiscoveryUpdate::DiscoveredAtCapacity(closest))
1485                        }
1486                        _ => {}
1487                    }
1488                }
1489                BucketEntry::SelfEntry => {
1490                    // we received our own node entry
1491                }
1492                BucketEntry::Present(entry, _) => {
1493                    if entry.value().has_endpoint_proof {
1494                        if entry
1495                            .value()
1496                            .exceeds_find_node_failures(self.config.max_find_node_failures)
1497                        {
1498                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1499                        } else {
1500                            self.find_node(&closest, ctx.clone());
1501                        }
1502                    }
1503                }
1504                BucketEntry::Pending(mut entry, _) => {
1505                    if entry.value().has_endpoint_proof {
1506                        if entry
1507                            .value()
1508                            .exceeds_find_node_failures(self.config.max_find_node_failures)
1509                        {
1510                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1511                        } else {
1512                            self.find_node(&closest, ctx.clone());
1513                        }
1514                    }
1515                }
1516            }
1517        }
1518    }
1519
1520    /// Sends a Neighbours packet for `target` to the given addr
1521    fn respond_closest(&mut self, target: PeerId, to: SocketAddr) {
1522        let key = kad_key(target);
1523        let expire = self.send_neighbours_expiration();
1524
1525        // get the MAX_NODES_PER_BUCKET closest nodes to the target
1526        let closest_nodes =
1527            self.kbuckets.closest_values(&key).take(MAX_NODES_PER_BUCKET).collect::<Vec<_>>();
1528
1529        for nodes in closest_nodes.chunks(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) {
1530            let nodes = nodes.iter().map(|node| node.value.record).collect::<Vec<NodeRecord>>();
1531            trace!(target: "discv4", len = nodes.len(), to=?to,"Sent neighbours packet");
1532            let msg = Message::Neighbours(Neighbours { nodes, expire });
1533            self.send_packet(msg, to);
1534        }
1535    }
1536
1537    fn evict_expired_requests(&mut self, now: Instant) {
1538        self.pending_enr_requests.retain(|_node_id, enr_request| {
1539            now.duration_since(enr_request.sent_at) < self.config.enr_expiration
1540        });
1541
1542        let mut failed_pings = Vec::new();
1543        self.pending_pings.retain(|node_id, ping_request| {
1544            if now.duration_since(ping_request.sent_at) > self.config.ping_expiration {
1545                failed_pings.push(*node_id);
1546                return false
1547            }
1548            true
1549        });
1550
1551        if !failed_pings.is_empty() {
1552            // remove nodes that failed to pong
1553            trace!(target: "discv4", num=%failed_pings.len(), "evicting nodes due to failed pong");
1554            for node_id in failed_pings {
1555                self.remove_node(node_id);
1556            }
1557        }
1558
1559        let mut failed_lookups = Vec::new();
1560        self.pending_lookup.retain(|node_id, (lookup_sent_at, _)| {
1561            if now.duration_since(*lookup_sent_at) > self.config.request_timeout {
1562                failed_lookups.push(*node_id);
1563                return false
1564            }
1565            true
1566        });
1567
1568        if !failed_lookups.is_empty() {
1569            // remove nodes that failed the e2e lookup process, so we can restart it
1570            trace!(target: "discv4", num=%failed_lookups.len(), "evicting nodes due to failed lookup");
1571            for node_id in failed_lookups {
1572                self.remove_node(node_id);
1573            }
1574        }
1575
1576        self.evict_failed_find_nodes(now);
1577    }
1578
1579    /// Handles failed responses to `FindNode`
1580    fn evict_failed_find_nodes(&mut self, now: Instant) {
1581        let mut failed_find_nodes = Vec::new();
1582        self.pending_find_nodes.retain(|node_id, find_node_request| {
1583            if now.duration_since(find_node_request.sent_at) > self.config.neighbours_expiration {
1584                if !find_node_request.answered {
1585                    // node actually responded but with fewer entries than expected, but we don't
1586                    // treat this as an hard error since it responded.
1587                    failed_find_nodes.push(*node_id);
1588                }
1589                return false
1590            }
1591            true
1592        });
1593
1594        if failed_find_nodes.is_empty() {
1595            return
1596        }
1597
1598        trace!(target: "discv4", num=%failed_find_nodes.len(), "processing failed find nodes");
1599
1600        for node_id in failed_find_nodes {
1601            let key = kad_key(node_id);
1602            let failures = match self.kbuckets.entry(&key) {
1603                kbucket::Entry::Present(mut entry, _) => {
1604                    entry.value_mut().inc_failed_request();
1605                    entry.value().find_node_failures
1606                }
1607                kbucket::Entry::Pending(mut entry, _) => {
1608                    entry.value().inc_failed_request();
1609                    entry.value().find_node_failures
1610                }
1611                _ => continue,
1612            };
1613
1614            // if the node failed to respond anything useful multiple times, remove the node from
1615            // the table, but only if there are enough other nodes in the bucket (bucket must be at
1616            // least half full)
1617            if failures > self.config.max_find_node_failures {
1618                self.soft_remove_node(node_id);
1619            }
1620        }
1621    }
1622
1623    /// Re-pings all nodes which endpoint proofs are considered expired: [`NodeEntry::is_expired`]
1624    ///
1625    /// This will send a `Ping` to the nodes, if a node fails to respond with a `Pong` to renew the
1626    /// endpoint proof it will be removed from the table.
1627    fn re_ping_oldest(&mut self) {
1628        let mut nodes = self
1629            .kbuckets
1630            .iter_ref()
1631            .filter(|entry| entry.node.value.is_expired())
1632            .map(|n| n.node.value)
1633            .collect::<Vec<_>>();
1634        nodes.sort_by(|a, b| a.last_seen.cmp(&b.last_seen));
1635        let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::<Vec<_>>();
1636        for node in to_ping {
1637            self.try_ping(node, PingReason::RePing)
1638        }
1639    }
1640
1641    /// Returns true if the expiration timestamp is in the past.
1642    fn is_expired(&self, expiration: u64) -> bool {
1643        self.ensure_not_expired(expiration).is_err()
1644    }
1645
1646    /// Validate that given timestamp is not expired.
1647    ///
1648    /// Note: this accepts the timestamp as u64 because this is used by the wire protocol, but the
1649    /// UNIX timestamp (number of non-leap seconds since January 1, 1970 0:00:00 UTC) is supposed to
1650    /// be an i64.
1651    ///
1652    /// Returns an error if:
1653    ///  - invalid UNIX timestamp (larger than `i64::MAX`)
1654    ///  - timestamp is expired (lower than current local UNIX timestamp)
1655    fn ensure_not_expired(&self, timestamp: u64) -> Result<(), ()> {
1656        // ensure the timestamp is a valid UNIX timestamp
1657        let _ = i64::try_from(timestamp).map_err(drop)?;
1658
1659        let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
1660        if self.config.enforce_expiration_timestamps && timestamp < now {
1661            trace!(target: "discv4", "Expired packet");
1662            return Err(())
1663        }
1664        Ok(())
1665    }
1666
1667    /// Pops buffered ping requests and sends them.
1668    fn ping_buffered(&mut self) {
1669        while self.pending_pings.len() < MAX_NODES_PING {
1670            match self.queued_pings.pop_front() {
1671                Some((next, reason)) => self.try_ping(next, reason),
1672                None => break,
1673            }
1674        }
1675    }
1676
1677    fn ping_expiration(&self) -> u64 {
1678        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.ping_expiration)
1679            .as_secs()
1680    }
1681
1682    fn find_node_expiration(&self) -> u64 {
1683        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.request_timeout)
1684            .as_secs()
1685    }
1686
1687    fn enr_request_expiration(&self) -> u64 {
1688        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.enr_expiration)
1689            .as_secs()
1690    }
1691
1692    fn send_neighbours_expiration(&self) -> u64 {
1693        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.neighbours_expiration)
1694            .as_secs()
1695    }
1696
1697    /// Polls the socket and advances the state.
1698    ///
1699    /// To prevent traffic amplification attacks, implementations must verify that the sender of a
1700    /// query participates in the discovery protocol. The sender of a packet is considered verified
1701    /// if it has sent a valid Pong response with matching ping hash within the last 12 hours.
1702    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Discv4Event> {
1703        loop {
1704            // drain buffered events first
1705            if let Some(event) = self.queued_events.pop_front() {
1706                return Poll::Ready(event)
1707            }
1708
1709            // trigger self lookup
1710            if self.config.enable_lookup {
1711                while self.lookup_interval.poll_tick(cx).is_ready() {
1712                    let target = self.lookup_rotator.next(&self.local_node_record.id);
1713                    self.lookup_with(target, None);
1714                }
1715            }
1716
1717            // re-ping some peers
1718            while self.ping_interval.poll_tick(cx).is_ready() {
1719                self.re_ping_oldest();
1720            }
1721
1722            if let Some(Poll::Ready(Some(ip))) =
1723                self.resolve_external_ip_interval.as_mut().map(|r| r.poll_tick(cx))
1724            {
1725                self.set_external_ip_addr(ip);
1726            }
1727
1728            // drain all incoming `Discv4` commands, this channel can never close
1729            while let Poll::Ready(Some(cmd)) = self.commands_rx.poll_recv(cx) {
1730                match cmd {
1731                    Discv4Command::Add(enr) => {
1732                        self.add_node(enr);
1733                    }
1734                    Discv4Command::Lookup { node_id, tx } => {
1735                        let node_id = node_id.unwrap_or(self.local_node_record.id);
1736                        self.lookup_with(node_id, tx);
1737                    }
1738                    Discv4Command::SetLookupInterval(duration) => {
1739                        self.set_lookup_interval(duration);
1740                    }
1741                    Discv4Command::Updates(tx) => {
1742                        let rx = self.update_stream();
1743                        let _ = tx.send(rx);
1744                    }
1745                    Discv4Command::BanPeer(node_id) => self.ban_node(node_id),
1746                    Discv4Command::Remove(node_id) => {
1747                        self.remove_node(node_id);
1748                    }
1749                    Discv4Command::Ban(node_id, ip) => {
1750                        self.ban_node(node_id);
1751                        self.ban_ip(ip);
1752                    }
1753                    Discv4Command::BanIp(ip) => {
1754                        self.ban_ip(ip);
1755                    }
1756                    Discv4Command::SetEIP868RLPPair { key, rlp } => {
1757                        debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair");
1758
1759                        let _ = self.local_eip_868_enr.insert_raw_rlp(key, rlp, &self.secret_key);
1760                    }
1761                    Discv4Command::SetTcpPort(port) => {
1762                        debug!(target: "discv4", %port, "Update tcp port");
1763                        self.local_node_record.tcp_port = port;
1764                        if self.local_node_record.address.is_ipv4() {
1765                            let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key);
1766                        } else {
1767                            let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key);
1768                        }
1769                    }
1770
1771                    Discv4Command::Terminated => {
1772                        // terminate the service
1773                        self.queued_events.push_back(Discv4Event::Terminated);
1774                    }
1775                }
1776            }
1777
1778            // restricts how many messages we process in a single poll before yielding back control
1779            let mut udp_message_budget = UDP_MESSAGE_POLL_LOOP_BUDGET;
1780
1781            // process all incoming datagrams
1782            while let Poll::Ready(Some(event)) = self.ingress.poll_recv(cx) {
1783                match event {
1784                    IngressEvent::RecvError(err) => {
1785                        debug!(target: "discv4", %err, "failed to read datagram");
1786                    }
1787                    IngressEvent::BadPacket(from, err, data) => {
1788                        trace!(target: "discv4", ?from, %err, packet=?hex::encode(&data), "bad packet");
1789                    }
1790                    IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => {
1791                        trace!(target: "discv4", r#type=?msg.msg_type(), from=?remote_addr,"received packet");
1792                        let event = match msg {
1793                            Message::Ping(ping) => {
1794                                self.on_ping(ping, remote_addr, node_id, hash);
1795                                Discv4Event::Ping
1796                            }
1797                            Message::Pong(pong) => {
1798                                self.on_pong(pong, remote_addr, node_id);
1799                                Discv4Event::Pong
1800                            }
1801                            Message::FindNode(msg) => {
1802                                self.on_find_node(msg, remote_addr, node_id);
1803                                Discv4Event::FindNode
1804                            }
1805                            Message::Neighbours(msg) => {
1806                                self.on_neighbours(msg, remote_addr, node_id);
1807                                Discv4Event::Neighbours
1808                            }
1809                            Message::EnrRequest(msg) => {
1810                                self.on_enr_request(msg, remote_addr, node_id, hash);
1811                                Discv4Event::EnrRequest
1812                            }
1813                            Message::EnrResponse(msg) => {
1814                                self.on_enr_response(msg, remote_addr, node_id);
1815                                Discv4Event::EnrResponse
1816                            }
1817                        };
1818
1819                        self.queued_events.push_back(event);
1820                    }
1821                }
1822
1823                udp_message_budget -= 1;
1824                if udp_message_budget < 0 {
1825                    trace!(target: "discv4", budget=UDP_MESSAGE_POLL_LOOP_BUDGET, "exhausted message poll budget");
1826                    if self.queued_events.is_empty() {
1827                        // we've exceeded the message budget and have no events to process
1828                        // this will make sure we're woken up again
1829                        cx.waker().wake_by_ref();
1830                    }
1831                    break
1832                }
1833            }
1834
1835            // try resending buffered pings
1836            self.ping_buffered();
1837
1838            // evict expired requests
1839            while self.evict_expired_requests_interval.poll_tick(cx).is_ready() {
1840                self.evict_expired_requests(Instant::now());
1841            }
1842
1843            // evict expired nodes
1844            while self.expire_interval.poll_tick(cx).is_ready() {
1845                self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION);
1846            }
1847
1848            if self.queued_events.is_empty() {
1849                return Poll::Pending
1850            }
1851        }
1852    }
1853}
1854
1855/// Endless future impl
1856impl Stream for Discv4Service {
1857    type Item = Discv4Event;
1858
1859    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1860        // Poll the internal poll method
1861        match ready!(self.get_mut().poll(cx)) {
1862            // if the service is terminated, return None to terminate the stream
1863            Discv4Event::Terminated => Poll::Ready(None),
1864            // For any other event, return Poll::Ready(Some(event))
1865            ev => Poll::Ready(Some(ev)),
1866        }
1867    }
1868}
1869
1870impl fmt::Debug for Discv4Service {
1871    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1872        f.debug_struct("Discv4Service")
1873            .field("local_address", &self.local_address)
1874            .field("local_peer_id", &self.local_peer_id())
1875            .field("local_node_record", &self.local_node_record)
1876            .field("queued_pings", &self.queued_pings)
1877            .field("pending_lookup", &self.pending_lookup)
1878            .field("pending_find_nodes", &self.pending_find_nodes)
1879            .field("lookup_interval", &self.lookup_interval)
1880            .finish_non_exhaustive()
1881    }
1882}
1883
1884/// The Event type the Service stream produces.
1885///
1886/// This is mainly used for testing purposes and represents messages the service processed
1887#[derive(Debug, Eq, PartialEq)]
1888pub enum Discv4Event {
1889    /// A `Ping` message was handled.
1890    Ping,
1891    /// A `Pong` message was handled.
1892    Pong,
1893    /// A `FindNode` message was handled.
1894    FindNode,
1895    /// A `Neighbours` message was handled.
1896    Neighbours,
1897    /// A `EnrRequest` message was handled.
1898    EnrRequest,
1899    /// A `EnrResponse` message was handled.
1900    EnrResponse,
1901    /// Service is being terminated
1902    Terminated,
1903}
1904
1905/// Continuously reads new messages from the channel and writes them to the socket
1906pub(crate) async fn send_loop(udp: Arc<UdpSocket>, rx: EgressReceiver) {
1907    let mut stream = ReceiverStream::new(rx);
1908    while let Some((payload, to)) = stream.next().await {
1909        match udp.send_to(&payload, to).await {
1910            Ok(size) => {
1911                trace!(target: "discv4", ?to, ?size,"sent payload");
1912            }
1913            Err(err) => {
1914                debug!(target: "discv4", ?to, %err,"Failed to send datagram.");
1915            }
1916        }
1917    }
1918}
1919
1920/// Rate limits the number of incoming packets from individual IPs to 1 packet/second
1921const MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP: usize = 60usize;
1922
1923/// Continuously awaits new incoming messages and sends them back through the channel.
1924///
1925/// The receive loop enforce primitive rate limiting for ips to prevent message spams from
1926/// individual IPs
1927pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_id: PeerId) {
1928    let send = |event: IngressEvent| async {
1929        let _ = tx.send(event).await.map_err(|err| {
1930            debug!(
1931                target: "discv4",
1932                 %err,
1933                "failed send incoming packet",
1934            )
1935        });
1936    };
1937
1938    let mut cache = ReceiveCache::default();
1939
1940    // tick at half the rate of the limit
1941    let tick = MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP / 2;
1942    let mut interval = tokio::time::interval(Duration::from_secs(tick as u64));
1943
1944    let mut buf = [0; MAX_PACKET_SIZE];
1945    loop {
1946        let res = udp.recv_from(&mut buf).await;
1947        match res {
1948            Err(err) => {
1949                debug!(target: "discv4", %err, "Failed to read datagram.");
1950                send(IngressEvent::RecvError(err)).await;
1951            }
1952            Ok((read, remote_addr)) => {
1953                // rate limit incoming packets by IP
1954                if cache.inc_ip(remote_addr.ip()) > MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP {
1955                    trace!(target: "discv4", ?remote_addr, "Too many incoming packets from IP.");
1956                    continue
1957                }
1958
1959                let packet = &buf[..read];
1960                match Message::decode(packet) {
1961                    Ok(packet) => {
1962                        if packet.node_id == local_id {
1963                            // received our own message
1964                            debug!(target: "discv4", ?remote_addr, "Received own packet.");
1965                            continue
1966                        }
1967
1968                        // skip if we've already received the same packet
1969                        if cache.contains_packet(packet.hash) {
1970                            debug!(target: "discv4", ?remote_addr, "Received duplicate packet.");
1971                            continue
1972                        }
1973
1974                        send(IngressEvent::Packet(remote_addr, packet)).await;
1975                    }
1976                    Err(err) => {
1977                        trace!(target: "discv4", %err,"Failed to decode packet");
1978                        send(IngressEvent::BadPacket(remote_addr, err, packet.to_vec())).await
1979                    }
1980                }
1981            }
1982        }
1983
1984        // reset the tracked ips if the interval has passed
1985        if poll_fn(|cx| match interval.poll_tick(cx) {
1986            Poll::Ready(_) => Poll::Ready(true),
1987            Poll::Pending => Poll::Ready(false),
1988        })
1989        .await
1990        {
1991            cache.tick_ips(tick);
1992        }
1993    }
1994}
1995
1996/// A cache for received packets and their source address.
1997///
1998/// This is used to discard duplicated packets and rate limit messages from the same source.
1999struct ReceiveCache {
2000    /// keeps track of how many messages we've received from a given IP address since the last
2001    /// tick.
2002    ///
2003    /// This is used to count the number of messages received from a given IP address within an
2004    /// interval.
2005    ip_messages: HashMap<IpAddr, usize>,
2006    // keeps track of unique packet hashes
2007    unique_packets: schnellru::LruMap<B256, ()>,
2008}
2009
2010impl ReceiveCache {
2011    /// Updates the counter for each IP address and removes IPs that have exceeded the limit.
2012    ///
2013    /// This will decrement the counter for each IP address and remove IPs that have reached 0.
2014    fn tick_ips(&mut self, tick: usize) {
2015        self.ip_messages.retain(|_, count| {
2016            if let Some(reset) = count.checked_sub(tick) {
2017                *count = reset;
2018                true
2019            } else {
2020                false
2021            }
2022        });
2023    }
2024
2025    /// Increases the counter for the given IP address and returns the new count.
2026    fn inc_ip(&mut self, ip: IpAddr) -> usize {
2027        let ctn = self.ip_messages.entry(ip).or_default();
2028        *ctn = ctn.saturating_add(1);
2029        *ctn
2030    }
2031
2032    /// Returns true if we previously received the packet
2033    fn contains_packet(&mut self, hash: B256) -> bool {
2034        !self.unique_packets.insert(hash, ())
2035    }
2036}
2037
2038impl Default for ReceiveCache {
2039    fn default() -> Self {
2040        Self {
2041            ip_messages: Default::default(),
2042            unique_packets: schnellru::LruMap::new(schnellru::ByLength::new(32)),
2043        }
2044    }
2045}
2046
2047/// The commands sent from the frontend [Discv4] to the service [`Discv4Service`].
2048enum Discv4Command {
2049    Add(NodeRecord),
2050    SetTcpPort(u16),
2051    SetEIP868RLPPair { key: Vec<u8>, rlp: Bytes },
2052    Ban(PeerId, IpAddr),
2053    BanPeer(PeerId),
2054    BanIp(IpAddr),
2055    Remove(PeerId),
2056    Lookup { node_id: Option<PeerId>, tx: Option<NodeRecordSender> },
2057    SetLookupInterval(Duration),
2058    Updates(OneshotSender<ReceiverStream<DiscoveryUpdate>>),
2059    Terminated,
2060}
2061
2062/// Event type receiver produces
2063#[derive(Debug)]
2064pub(crate) enum IngressEvent {
2065    /// Encountered an error when reading a datagram message.
2066    RecvError(io::Error),
2067    /// Received a bad message
2068    BadPacket(SocketAddr, DecodePacketError, Vec<u8>),
2069    /// Received a datagram from an address.
2070    Packet(SocketAddr, Packet),
2071}
2072
2073/// Tracks a sent ping
2074#[derive(Debug)]
2075struct PingRequest {
2076    // Timestamp when the request was sent.
2077    sent_at: Instant,
2078    // Node to which the request was sent.
2079    node: NodeRecord,
2080    // Hash sent in the Ping request
2081    echo_hash: B256,
2082    /// Why this ping was sent.
2083    reason: PingReason,
2084}
2085
2086/// Rotates the `PeerId` that is periodically looked up.
2087///
2088/// By selecting different targets, the lookups will be seeded with different ALPHA seed nodes.
2089#[derive(Debug)]
2090struct LookupTargetRotator {
2091    interval: usize,
2092    counter: usize,
2093}
2094
2095// === impl LookupTargetRotator ===
2096
2097impl LookupTargetRotator {
2098    /// Returns a rotator that always returns the local target.
2099    const fn local_only() -> Self {
2100        Self { interval: 1, counter: 0 }
2101    }
2102}
2103
2104impl Default for LookupTargetRotator {
2105    fn default() -> Self {
2106        Self {
2107            // every 4th lookup is our own node
2108            interval: 4,
2109            counter: 3,
2110        }
2111    }
2112}
2113
2114impl LookupTargetRotator {
2115    /// This will return the next node id to lookup
2116    fn next(&mut self, local: &PeerId) -> PeerId {
2117        self.counter += 1;
2118        self.counter %= self.interval;
2119        if self.counter == 0 {
2120            return *local
2121        }
2122        PeerId::random()
2123    }
2124}
2125
2126/// Tracks lookups across multiple `FindNode` requests.
2127///
2128/// If this type is dropped by all Clones, it will send all the discovered nodes to the listener, if
2129/// one is present.
2130#[derive(Clone, Debug)]
2131struct LookupContext {
2132    inner: Rc<LookupContextInner>,
2133}
2134
2135impl LookupContext {
2136    /// Create new context for a recursive lookup
2137    fn new(
2138        target: discv5::Key<NodeKey>,
2139        nearest_nodes: impl IntoIterator<Item = (Distance, NodeRecord)>,
2140        listener: Option<NodeRecordSender>,
2141    ) -> Self {
2142        let closest_nodes = nearest_nodes
2143            .into_iter()
2144            .map(|(distance, record)| {
2145                (distance, QueryNode { record, queried: false, responded: false })
2146            })
2147            .collect();
2148
2149        let inner = Rc::new(LookupContextInner {
2150            target,
2151            closest_nodes: RefCell::new(closest_nodes),
2152            listener,
2153        });
2154        Self { inner }
2155    }
2156
2157    /// Returns the target of this lookup
2158    fn target(&self) -> PeerId {
2159        self.inner.target.preimage().0
2160    }
2161
2162    fn closest(&self, num: usize) -> Vec<NodeRecord> {
2163        self.inner
2164            .closest_nodes
2165            .borrow()
2166            .iter()
2167            .filter(|(_, node)| !node.queried)
2168            .map(|(_, n)| n.record)
2169            .take(num)
2170            .collect()
2171    }
2172
2173    /// Returns the closest nodes that have not been queried yet.
2174    fn filter_closest<P>(&self, num: usize, filter: P) -> Vec<NodeRecord>
2175    where
2176        P: FnMut(&NodeRecord) -> bool,
2177    {
2178        self.inner
2179            .closest_nodes
2180            .borrow()
2181            .iter()
2182            .filter(|(_, node)| !node.queried)
2183            .map(|(_, n)| n.record)
2184            .filter(filter)
2185            .take(num)
2186            .collect()
2187    }
2188
2189    /// Inserts the node if it's missing
2190    fn add_node(&self, record: NodeRecord) {
2191        let distance = self.inner.target.distance(&kad_key(record.id));
2192        let mut closest = self.inner.closest_nodes.borrow_mut();
2193        if let btree_map::Entry::Vacant(entry) = closest.entry(distance) {
2194            entry.insert(QueryNode { record, queried: false, responded: false });
2195        }
2196    }
2197
2198    fn set_queried(&self, id: PeerId, val: bool) {
2199        if let Some((_, node)) =
2200            self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2201        {
2202            node.queried = val;
2203        }
2204    }
2205
2206    /// Marks the node as queried
2207    fn mark_queried(&self, id: PeerId) {
2208        self.set_queried(id, true)
2209    }
2210
2211    /// Marks the node as not queried
2212    fn unmark_queried(&self, id: PeerId) {
2213        self.set_queried(id, false)
2214    }
2215
2216    /// Marks the node as responded
2217    fn mark_responded(&self, id: PeerId) {
2218        if let Some((_, node)) =
2219            self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2220        {
2221            node.responded = true;
2222        }
2223    }
2224}
2225
2226// SAFETY: The [`Discv4Service`] is intended to be spawned as task which requires `Send`.
2227// The `LookupContext` is shared by all active `FindNode` requests that are part of the lookup step.
2228// Which can modify the context. The shared context is only ever accessed mutably when a `Neighbour`
2229// response is processed and all Clones are stored inside [`Discv4Service`], in other words it is
2230// guaranteed that there's only 1 owner ([`Discv4Service`]) of all possible [`Rc`] clones of
2231// [`LookupContext`].
2232unsafe impl Send for LookupContext {}
2233#[derive(Debug)]
2234struct LookupContextInner {
2235    /// The target to lookup.
2236    target: discv5::Key<NodeKey>,
2237    /// The closest nodes
2238    closest_nodes: RefCell<BTreeMap<Distance, QueryNode>>,
2239    /// A listener for all the nodes retrieved in this lookup
2240    ///
2241    /// This is present if the lookup was triggered manually via [Discv4] and we want to return all
2242    /// the nodes once the lookup finishes.
2243    listener: Option<NodeRecordSender>,
2244}
2245
2246impl Drop for LookupContextInner {
2247    fn drop(&mut self) {
2248        if let Some(tx) = self.listener.take() {
2249            // there's only 1 instance shared across `FindNode` requests, if this is dropped then
2250            // all requests finished, and we can send all results back
2251            let nodes = self
2252                .closest_nodes
2253                .take()
2254                .into_values()
2255                .filter(|node| node.responded)
2256                .map(|node| node.record)
2257                .collect();
2258            let _ = tx.send(nodes);
2259        }
2260    }
2261}
2262
2263/// Tracks the state of a recursive lookup step
2264#[derive(Debug, Clone, Copy)]
2265struct QueryNode {
2266    record: NodeRecord,
2267    queried: bool,
2268    responded: bool,
2269}
2270
2271#[derive(Debug)]
2272struct FindNodeRequest {
2273    // Timestamp when the request was sent.
2274    sent_at: Instant,
2275    // Number of items sent by the node
2276    response_count: usize,
2277    // Whether the request has been answered yet.
2278    answered: bool,
2279    /// Response buffer
2280    lookup_context: LookupContext,
2281}
2282
2283// === impl FindNodeRequest ===
2284
2285impl FindNodeRequest {
2286    fn new(resp: LookupContext) -> Self {
2287        Self { sent_at: Instant::now(), response_count: 0, answered: false, lookup_context: resp }
2288    }
2289}
2290
2291#[derive(Debug)]
2292struct EnrRequestState {
2293    // Timestamp when the request was sent.
2294    sent_at: Instant,
2295    // Hash sent in the Ping request
2296    echo_hash: B256,
2297}
2298
2299/// Stored node info.
2300#[derive(Debug, Clone, Eq, PartialEq)]
2301struct NodeEntry {
2302    /// Node record info.
2303    record: NodeRecord,
2304    /// Timestamp of last pong.
2305    last_seen: Instant,
2306    /// Last enr seq we retrieved via a ENR request.
2307    last_enr_seq: Option<u64>,
2308    /// `ForkId` if retrieved via ENR requests.
2309    fork_id: Option<ForkId>,
2310    /// Counter for failed _consecutive_ findNode requests.
2311    find_node_failures: u8,
2312    /// Whether the endpoint of the peer is proven.
2313    has_endpoint_proof: bool,
2314}
2315
2316// === impl NodeEntry ===
2317
2318impl NodeEntry {
2319    /// Creates a new, unpopulated entry
2320    fn new(record: NodeRecord) -> Self {
2321        Self {
2322            record,
2323            last_seen: Instant::now(),
2324            last_enr_seq: None,
2325            fork_id: None,
2326            find_node_failures: 0,
2327            has_endpoint_proof: false,
2328        }
2329    }
2330
2331    #[cfg(test)]
2332    fn new_proven(record: NodeRecord) -> Self {
2333        let mut node = Self::new(record);
2334        node.has_endpoint_proof = true;
2335        node
2336    }
2337
2338    /// Marks the entry with an established proof and resets the consecutive failure counter.
2339    const fn establish_proof(&mut self) {
2340        self.has_endpoint_proof = true;
2341        self.find_node_failures = 0;
2342    }
2343
2344    /// Returns true if the tracked find node failures exceed the max amount
2345    const fn exceeds_find_node_failures(&self, max_failures: u8) -> bool {
2346        self.find_node_failures >= max_failures
2347    }
2348
2349    /// Updates the last timestamp and sets the enr seq
2350    fn update_with_enr(&mut self, last_enr_seq: Option<u64>) -> Option<u64> {
2351        self.update_now(|s| std::mem::replace(&mut s.last_enr_seq, last_enr_seq))
2352    }
2353
2354    /// Increases the failed request counter
2355    const fn inc_failed_request(&mut self) {
2356        self.find_node_failures += 1;
2357    }
2358
2359    /// Updates the last timestamp and sets the enr seq
2360    fn update_with_fork_id(&mut self, fork_id: Option<ForkId>) -> Option<ForkId> {
2361        self.update_now(|s| std::mem::replace(&mut s.fork_id, fork_id))
2362    }
2363
2364    /// Updates the `last_seen` timestamp and calls the closure
2365    fn update_now<F, R>(&mut self, f: F) -> R
2366    where
2367        F: FnOnce(&mut Self) -> R,
2368    {
2369        self.last_seen = Instant::now();
2370        f(self)
2371    }
2372}
2373
2374// === impl NodeEntry ===
2375
2376impl NodeEntry {
2377    /// Returns true if the node should be re-pinged.
2378    fn is_expired(&self) -> bool {
2379        self.last_seen.elapsed() > (ENDPOINT_PROOF_EXPIRATION / 2)
2380    }
2381}
2382
2383/// Represents why a ping is issued
2384#[derive(Debug)]
2385enum PingReason {
2386    /// Initial ping to a previously unknown peer that was inserted into the table.
2387    InitialInsert,
2388    /// A ping to a peer to establish a bond (endpoint proof).
2389    EstablishBond,
2390    /// Re-ping a peer.
2391    RePing,
2392    /// Part of a lookup to ensure endpoint is proven before we can send a `FindNode` request.
2393    Lookup(NodeRecord, LookupContext),
2394}
2395
2396/// Represents node related updates state changes in the underlying node table
2397#[derive(Debug, Clone)]
2398pub enum DiscoveryUpdate {
2399    /// A new node was discovered _and_ added to the table.
2400    Added(NodeRecord),
2401    /// A new node was discovered but _not_ added to the table because it is currently full.
2402    DiscoveredAtCapacity(NodeRecord),
2403    /// Received a [`ForkId`] via EIP-868 for the given [`NodeRecord`].
2404    EnrForkId(NodeRecord, ForkId),
2405    /// Node that was removed from the table
2406    Removed(PeerId),
2407    /// A series of updates
2408    Batch(Vec<Self>),
2409}
2410
2411#[cfg(test)]
2412mod tests {
2413    use super::*;
2414    use crate::test_utils::{create_discv4, create_discv4_with_config, rng_endpoint, rng_record};
2415    use alloy_primitives::hex;
2416    use alloy_rlp::{Decodable, Encodable};
2417    use rand_08::Rng;
2418    use reth_ethereum_forks::{EnrForkIdEntry, ForkHash};
2419    use reth_network_peers::mainnet_nodes;
2420    use std::future::poll_fn;
2421
2422    #[tokio::test]
2423    async fn test_configured_enr_forkid_entry() {
2424        let fork: ForkId = ForkId { hash: ForkHash([220, 233, 108, 45]), next: 0u64 };
2425        let mut disc_conf = Discv4Config::default();
2426        disc_conf.add_eip868_pair("eth", EnrForkIdEntry::from(fork));
2427        let (_discv4, service) = create_discv4_with_config(disc_conf).await;
2428        let eth = service.local_eip_868_enr.get_raw_rlp(b"eth").unwrap();
2429        let fork_entry_id = EnrForkIdEntry::decode(&mut &eth[..]).unwrap();
2430
2431        let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2432        let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2433        let expected = EnrForkIdEntry {
2434            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2435        };
2436        assert_eq!(expected, fork_entry_id);
2437        assert_eq!(expected, decoded);
2438    }
2439
2440    #[test]
2441    fn test_enr_forkid_entry_decode() {
2442        let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2443        let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2444        let expected = EnrForkIdEntry {
2445            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2446        };
2447        assert_eq!(expected, decoded);
2448    }
2449
2450    #[test]
2451    fn test_enr_forkid_entry_encode() {
2452        let original = EnrForkIdEntry {
2453            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2454        };
2455        let expected: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2456        let mut encoded = Vec::with_capacity(expected.len());
2457        original.encode(&mut encoded);
2458        assert_eq!(&expected[..], encoded.as_slice());
2459    }
2460
2461    #[test]
2462    fn test_local_rotator() {
2463        let id = PeerId::random();
2464        let mut rotator = LookupTargetRotator::local_only();
2465        assert_eq!(rotator.next(&id), id);
2466        assert_eq!(rotator.next(&id), id);
2467    }
2468
2469    #[test]
2470    fn test_rotator() {
2471        let id = PeerId::random();
2472        let mut rotator = LookupTargetRotator::default();
2473        assert_eq!(rotator.next(&id), id);
2474        assert_ne!(rotator.next(&id), id);
2475        assert_ne!(rotator.next(&id), id);
2476        assert_ne!(rotator.next(&id), id);
2477        assert_eq!(rotator.next(&id), id);
2478    }
2479
2480    #[tokio::test]
2481    async fn test_pending_ping() {
2482        let (_, mut service) = create_discv4().await;
2483
2484        let local_addr = service.local_addr();
2485
2486        let mut num_inserted = 0;
2487        loop {
2488            let node = NodeRecord::new(local_addr, PeerId::random());
2489            if service.add_node(node) {
2490                num_inserted += 1;
2491                assert!(service.pending_pings.contains_key(&node.id));
2492                assert_eq!(service.pending_pings.len(), num_inserted);
2493                if num_inserted == MAX_NODES_PING {
2494                    break
2495                }
2496            }
2497        }
2498
2499        // `pending_pings` is full, insert into `queued_pings`.
2500        num_inserted = 0;
2501        for _ in 0..MAX_NODES_PING {
2502            let node = NodeRecord::new(local_addr, PeerId::random());
2503            if service.add_node(node) {
2504                num_inserted += 1;
2505                assert!(!service.pending_pings.contains_key(&node.id));
2506                assert_eq!(service.pending_pings.len(), MAX_NODES_PING);
2507                assert_eq!(service.queued_pings.len(), num_inserted);
2508            }
2509        }
2510    }
2511
2512    // Bootstraps with mainnet boot nodes
2513    #[tokio::test(flavor = "multi_thread")]
2514    #[ignore]
2515    async fn test_mainnet_lookup() {
2516        reth_tracing::init_test_tracing();
2517        let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2518
2519        let all_nodes = mainnet_nodes();
2520        let config = Discv4Config::builder()
2521            .add_boot_nodes(all_nodes)
2522            .lookup_interval(Duration::from_secs(1))
2523            .add_eip868_pair("eth", fork_id)
2524            .build();
2525        let (_discv4, mut service) = create_discv4_with_config(config).await;
2526
2527        let mut updates = service.update_stream();
2528
2529        let _handle = service.spawn();
2530
2531        let mut table = HashMap::new();
2532        while let Some(update) = updates.next().await {
2533            match update {
2534                DiscoveryUpdate::EnrForkId(record, fork_id) => {
2535                    println!("{record:?}, {fork_id:?}");
2536                }
2537                DiscoveryUpdate::Added(record) => {
2538                    table.insert(record.id, record);
2539                }
2540                DiscoveryUpdate::Removed(id) => {
2541                    table.remove(&id);
2542                }
2543                _ => {}
2544            }
2545            println!("total peers {}", table.len());
2546        }
2547    }
2548
2549    #[tokio::test]
2550    async fn test_mapped_ipv4() {
2551        reth_tracing::init_test_tracing();
2552        let mut rng = rand_08::thread_rng();
2553        let config = Discv4Config::builder().build();
2554        let (_discv4, mut service) = create_discv4_with_config(config).await;
2555
2556        let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2557        let v6 = v4.to_ipv6_mapped();
2558        let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2559
2560        let ping = Ping {
2561            from: rng_endpoint(&mut rng),
2562            to: rng_endpoint(&mut rng),
2563            expire: service.ping_expiration(),
2564            enr_sq: Some(rng.r#gen()),
2565        };
2566
2567        let id = PeerId::random();
2568        service.on_ping(ping, addr, id, B256::random());
2569
2570        let key = kad_key(id);
2571        match service.kbuckets.entry(&key) {
2572            kbucket::Entry::Present(entry, _) => {
2573                let node_addr = entry.value().record.address;
2574                assert!(node_addr.is_ipv4());
2575                assert_eq!(node_addr, IpAddr::from(v4));
2576            }
2577            _ => unreachable!(),
2578        };
2579    }
2580
2581    #[tokio::test]
2582    async fn test_respect_ping_expiration() {
2583        reth_tracing::init_test_tracing();
2584        let mut rng = rand_08::thread_rng();
2585        let config = Discv4Config::builder().build();
2586        let (_discv4, mut service) = create_discv4_with_config(config).await;
2587
2588        let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2589        let v6 = v4.to_ipv6_mapped();
2590        let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2591
2592        let ping = Ping {
2593            from: rng_endpoint(&mut rng),
2594            to: rng_endpoint(&mut rng),
2595            expire: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() - 1,
2596            enr_sq: Some(rng.r#gen()),
2597        };
2598
2599        let id = PeerId::random();
2600        service.on_ping(ping, addr, id, B256::random());
2601
2602        let key = kad_key(id);
2603        match service.kbuckets.entry(&key) {
2604            kbucket::Entry::Absent(_) => {}
2605            _ => unreachable!(),
2606        };
2607    }
2608
2609    #[tokio::test]
2610    async fn test_single_lookups() {
2611        reth_tracing::init_test_tracing();
2612
2613        let config = Discv4Config::builder().build();
2614        let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2615
2616        let id = PeerId::random();
2617        let key = kad_key(id);
2618        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2619
2620        let _ = service.kbuckets.insert_or_update(
2621            &key,
2622            NodeEntry::new_proven(record),
2623            NodeStatus {
2624                direction: ConnectionDirection::Incoming,
2625                state: ConnectionState::Connected,
2626            },
2627        );
2628
2629        service.lookup_self();
2630        assert_eq!(service.pending_find_nodes.len(), 1);
2631
2632        poll_fn(|cx| {
2633            let _ = service.poll(cx);
2634            assert_eq!(service.pending_find_nodes.len(), 1);
2635
2636            Poll::Ready(())
2637        })
2638        .await;
2639    }
2640
2641    #[tokio::test]
2642    async fn test_on_neighbours_recursive_lookup() {
2643        reth_tracing::init_test_tracing();
2644
2645        let config = Discv4Config::builder().build();
2646        let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2647        let (_discv4, mut service2) = create_discv4_with_config(config).await;
2648
2649        let id = PeerId::random();
2650        let key = kad_key(id);
2651        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2652
2653        let _ = service.kbuckets.insert_or_update(
2654            &key,
2655            NodeEntry::new_proven(record),
2656            NodeStatus {
2657                direction: ConnectionDirection::Incoming,
2658                state: ConnectionState::Connected,
2659            },
2660        );
2661        // Needed in this test to populate self.pending_find_nodes for as a prereq to a valid
2662        // on_neighbours request
2663        service.lookup_self();
2664        assert_eq!(service.pending_find_nodes.len(), 1);
2665
2666        poll_fn(|cx| {
2667            let _ = service.poll(cx);
2668            assert_eq!(service.pending_find_nodes.len(), 1);
2669
2670            Poll::Ready(())
2671        })
2672        .await;
2673
2674        let expiry = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() +
2675            10000000000000;
2676        let msg = Neighbours { nodes: vec![service2.local_node_record], expire: expiry };
2677        service.on_neighbours(msg, record.tcp_addr(), id);
2678        // wait for the processed ping
2679        let event = poll_fn(|cx| service2.poll(cx)).await;
2680        assert_eq!(event, Discv4Event::Ping);
2681        // assert that no find_node req has been added here on top of the initial one, since both
2682        // sides of the endpoint proof is not completed here
2683        assert_eq!(service.pending_find_nodes.len(), 1);
2684        // we now wait for PONG
2685        let event = poll_fn(|cx| service.poll(cx)).await;
2686        assert_eq!(event, Discv4Event::Pong);
2687        // Ideally we want to assert against service.pending_lookup.len() here - but because the
2688        // service2 sends Pong and Ping consecutivley on_ping(), the pending_lookup table gets
2689        // drained almost immediately - and no way to grab the handle to its intermediary state here
2690        // :(
2691        let event = poll_fn(|cx| service.poll(cx)).await;
2692        assert_eq!(event, Discv4Event::Ping);
2693        // assert that we've added the find_node req here after both sides of the endpoint proof is
2694        // done
2695        assert_eq!(service.pending_find_nodes.len(), 2);
2696    }
2697
2698    #[tokio::test]
2699    async fn test_no_local_in_closest() {
2700        reth_tracing::init_test_tracing();
2701
2702        let config = Discv4Config::builder().build();
2703        let (_discv4, mut service) = create_discv4_with_config(config).await;
2704
2705        let target_key = kad_key(PeerId::random());
2706
2707        let id = PeerId::random();
2708        let key = kad_key(id);
2709        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2710
2711        let _ = service.kbuckets.insert_or_update(
2712            &key,
2713            NodeEntry::new(record),
2714            NodeStatus {
2715                direction: ConnectionDirection::Incoming,
2716                state: ConnectionState::Connected,
2717            },
2718        );
2719
2720        let closest = service
2721            .kbuckets
2722            .closest_values(&target_key)
2723            .map(|n| n.value.record)
2724            .take(MAX_NODES_PER_BUCKET)
2725            .collect::<Vec<_>>();
2726
2727        assert_eq!(closest.len(), 1);
2728        assert!(!closest.iter().any(|r| r.id == *service.local_peer_id()));
2729    }
2730
2731    #[tokio::test]
2732    async fn test_random_lookup() {
2733        reth_tracing::init_test_tracing();
2734
2735        let config = Discv4Config::builder().build();
2736        let (_discv4, mut service) = create_discv4_with_config(config).await;
2737
2738        let target = PeerId::random();
2739
2740        let id = PeerId::random();
2741        let key = kad_key(id);
2742        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2743
2744        let _ = service.kbuckets.insert_or_update(
2745            &key,
2746            NodeEntry::new_proven(record),
2747            NodeStatus {
2748                direction: ConnectionDirection::Incoming,
2749                state: ConnectionState::Connected,
2750            },
2751        );
2752
2753        service.lookup(target);
2754        assert_eq!(service.pending_find_nodes.len(), 1);
2755
2756        let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2757
2758        assert_eq!(ctx.target(), target);
2759        assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2760
2761        ctx.add_node(record);
2762        assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2763    }
2764
2765    #[tokio::test]
2766    async fn test_reping_on_find_node_failures() {
2767        reth_tracing::init_test_tracing();
2768
2769        let config = Discv4Config::builder().build();
2770        let (_discv4, mut service) = create_discv4_with_config(config).await;
2771
2772        let target = PeerId::random();
2773
2774        let id = PeerId::random();
2775        let key = kad_key(id);
2776        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2777
2778        let mut entry = NodeEntry::new_proven(record);
2779        entry.find_node_failures = u8::MAX;
2780        let _ = service.kbuckets.insert_or_update(
2781            &key,
2782            entry,
2783            NodeStatus {
2784                direction: ConnectionDirection::Incoming,
2785                state: ConnectionState::Connected,
2786            },
2787        );
2788
2789        service.lookup(target);
2790        assert_eq!(service.pending_find_nodes.len(), 0);
2791        assert_eq!(service.pending_pings.len(), 1);
2792
2793        service.update_on_pong(record, None);
2794
2795        service
2796            .on_entry(record.id, |entry| {
2797                // reset on pong
2798                assert_eq!(entry.find_node_failures, 0);
2799                assert!(entry.has_endpoint_proof);
2800            })
2801            .unwrap();
2802    }
2803
2804    #[tokio::test]
2805    async fn test_service_commands() {
2806        reth_tracing::init_test_tracing();
2807
2808        let config = Discv4Config::builder().build();
2809        let (discv4, mut service) = create_discv4_with_config(config).await;
2810
2811        service.lookup_self();
2812
2813        let _handle = service.spawn();
2814        discv4.send_lookup_self();
2815        let _ = discv4.lookup_self().await;
2816    }
2817
2818    #[tokio::test]
2819    async fn test_requests_timeout() {
2820        reth_tracing::init_test_tracing();
2821        let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2822
2823        let config = Discv4Config::builder()
2824            .request_timeout(Duration::from_millis(200))
2825            .ping_expiration(Duration::from_millis(200))
2826            .lookup_neighbours_expiration(Duration::from_millis(200))
2827            .add_eip868_pair("eth", fork_id)
2828            .build();
2829        let (_disv4, mut service) = create_discv4_with_config(config).await;
2830
2831        let id = PeerId::random();
2832        let key = kad_key(id);
2833        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2834
2835        let _ = service.kbuckets.insert_or_update(
2836            &key,
2837            NodeEntry::new_proven(record),
2838            NodeStatus {
2839                direction: ConnectionDirection::Incoming,
2840                state: ConnectionState::Connected,
2841            },
2842        );
2843
2844        service.lookup_self();
2845        assert_eq!(service.pending_find_nodes.len(), 1);
2846
2847        let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2848
2849        service.pending_lookup.insert(record.id, (Instant::now(), ctx));
2850
2851        assert_eq!(service.pending_lookup.len(), 1);
2852
2853        let ping = Ping {
2854            from: service.local_node_record.into(),
2855            to: record.into(),
2856            expire: service.ping_expiration(),
2857            enr_sq: service.enr_seq(),
2858        };
2859        let echo_hash = service.send_packet(Message::Ping(ping), record.udp_addr());
2860        let ping_request = PingRequest {
2861            sent_at: Instant::now(),
2862            node: record,
2863            echo_hash,
2864            reason: PingReason::InitialInsert,
2865        };
2866        service.pending_pings.insert(record.id, ping_request);
2867
2868        assert_eq!(service.pending_pings.len(), 1);
2869
2870        tokio::time::sleep(Duration::from_secs(1)).await;
2871
2872        poll_fn(|cx| {
2873            let _ = service.poll(cx);
2874
2875            assert_eq!(service.pending_find_nodes.len(), 0);
2876            assert_eq!(service.pending_lookup.len(), 0);
2877            assert_eq!(service.pending_pings.len(), 0);
2878
2879            Poll::Ready(())
2880        })
2881        .await;
2882    }
2883
2884    // sends a PING packet with wrong 'to' field and expects a PONG response.
2885    #[tokio::test(flavor = "multi_thread")]
2886    async fn test_check_wrong_to() {
2887        reth_tracing::init_test_tracing();
2888
2889        let config = Discv4Config::builder().external_ip_resolver(None).build();
2890        let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2891        let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2892
2893        // ping node 2 with wrong to field
2894        let mut ping = Ping {
2895            from: service_1.local_node_record.into(),
2896            to: service_2.local_node_record.into(),
2897            expire: service_1.ping_expiration(),
2898            enr_sq: service_1.enr_seq(),
2899        };
2900        ping.to.address = "192.0.2.0".parse().unwrap();
2901
2902        let echo_hash = service_1.send_packet(Message::Ping(ping), service_2.local_addr());
2903        let ping_request = PingRequest {
2904            sent_at: Instant::now(),
2905            node: service_2.local_node_record,
2906            echo_hash,
2907            reason: PingReason::InitialInsert,
2908        };
2909        service_1.pending_pings.insert(*service_2.local_peer_id(), ping_request);
2910
2911        // wait for the processed ping
2912        let event = poll_fn(|cx| service_2.poll(cx)).await;
2913        assert_eq!(event, Discv4Event::Ping);
2914
2915        // we now wait for PONG
2916        let event = poll_fn(|cx| service_1.poll(cx)).await;
2917        assert_eq!(event, Discv4Event::Pong);
2918        // followed by a ping
2919        let event = poll_fn(|cx| service_1.poll(cx)).await;
2920        assert_eq!(event, Discv4Event::Ping);
2921    }
2922
2923    #[tokio::test(flavor = "multi_thread")]
2924    async fn test_check_ping_pong() {
2925        reth_tracing::init_test_tracing();
2926
2927        let config = Discv4Config::builder().external_ip_resolver(None).build();
2928        let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2929        let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2930
2931        // send ping from 1 -> 2
2932        service_1.add_node(service_2.local_node_record);
2933
2934        // wait for the processed ping
2935        let event = poll_fn(|cx| service_2.poll(cx)).await;
2936        assert_eq!(event, Discv4Event::Ping);
2937
2938        // node is now in the table but not connected yet
2939        let key1 = kad_key(*service_1.local_peer_id());
2940        match service_2.kbuckets.entry(&key1) {
2941            kbucket::Entry::Present(_entry, status) => {
2942                assert!(!status.is_connected());
2943            }
2944            _ => unreachable!(),
2945        }
2946
2947        // we now wait for PONG
2948        let event = poll_fn(|cx| service_1.poll(cx)).await;
2949        assert_eq!(event, Discv4Event::Pong);
2950
2951        // endpoint is proven
2952        let key2 = kad_key(*service_2.local_peer_id());
2953        match service_1.kbuckets.entry(&key2) {
2954            kbucket::Entry::Present(_entry, status) => {
2955                assert!(status.is_connected());
2956            }
2957            _ => unreachable!(),
2958        }
2959
2960        // we now wait for the PING initiated by 2
2961        let event = poll_fn(|cx| service_1.poll(cx)).await;
2962        assert_eq!(event, Discv4Event::Ping);
2963
2964        // we now wait for PONG
2965        let event = poll_fn(|cx| service_2.poll(cx)).await;
2966
2967        match event {
2968            Discv4Event::EnrRequest => {
2969                // since we support enr in the ping it may also request the enr
2970                let event = poll_fn(|cx| service_2.poll(cx)).await;
2971                match event {
2972                    Discv4Event::EnrRequest => {
2973                        let event = poll_fn(|cx| service_2.poll(cx)).await;
2974                        assert_eq!(event, Discv4Event::Pong);
2975                    }
2976                    Discv4Event::Pong => {}
2977                    _ => {
2978                        unreachable!()
2979                    }
2980                }
2981            }
2982            Discv4Event::Pong => {}
2983            ev => unreachable!("{ev:?}"),
2984        }
2985
2986        // endpoint is proven
2987        match service_2.kbuckets.entry(&key1) {
2988            kbucket::Entry::Present(_entry, status) => {
2989                assert!(status.is_connected());
2990            }
2991            ev => unreachable!("{ev:?}"),
2992        }
2993    }
2994
2995    #[test]
2996    fn test_insert() {
2997        let local_node_record = rng_record(&mut rand_08::thread_rng());
2998        let mut kbuckets: KBucketsTable<NodeKey, NodeEntry> = KBucketsTable::new(
2999            NodeKey::from(&local_node_record).into(),
3000            Duration::from_secs(60),
3001            MAX_NODES_PER_BUCKET,
3002            None,
3003            None,
3004        );
3005
3006        let new_record = rng_record(&mut rand_08::thread_rng());
3007        let key = kad_key(new_record.id);
3008        match kbuckets.entry(&key) {
3009            kbucket::Entry::Absent(entry) => {
3010                let node = NodeEntry::new(new_record);
3011                let _ = entry.insert(
3012                    node,
3013                    NodeStatus {
3014                        direction: ConnectionDirection::Outgoing,
3015                        state: ConnectionState::Disconnected,
3016                    },
3017                );
3018            }
3019            _ => {
3020                unreachable!()
3021            }
3022        };
3023        match kbuckets.entry(&key) {
3024            kbucket::Entry::Present(_, _) => {}
3025            _ => {
3026                unreachable!()
3027            }
3028        }
3029    }
3030
3031    #[tokio::test]
3032    async fn test_bootnode_not_in_update_stream() {
3033        reth_tracing::init_test_tracing();
3034        let (_, service_1) = create_discv4().await;
3035        let peerid_1 = *service_1.local_peer_id();
3036
3037        let config = Discv4Config::builder().add_boot_node(service_1.local_node_record).build();
3038        service_1.spawn();
3039
3040        let (_, mut service_2) = create_discv4_with_config(config).await;
3041
3042        let mut updates = service_2.update_stream();
3043
3044        service_2.spawn();
3045
3046        // Poll for events for a reasonable time
3047        let mut bootnode_appeared = false;
3048        let timeout = tokio::time::sleep(Duration::from_secs(1));
3049        tokio::pin!(timeout);
3050
3051        loop {
3052            tokio::select! {
3053                Some(update) = updates.next() => {
3054                    if let DiscoveryUpdate::Added(record) = update
3055                        && record.id == peerid_1 {
3056                            bootnode_appeared = true;
3057                            break;
3058                        }
3059                }
3060                _ = &mut timeout => break,
3061            }
3062        }
3063
3064        // Assert bootnode did not appear in update stream
3065        assert!(bootnode_appeared, "Bootnode should appear in update stream");
3066    }
3067}