reth_discv4/
lib.rs

1//! Discovery v4 implementation: <https://github.com/ethereum/devp2p/blob/master/discv4.md>
2//!
3//! Discv4 employs a kademlia-like routing table to store and manage discovered peers and topics.
4//! The protocol allows for external IP discovery in NAT environments through regular PING/PONG's
5//! with discovered nodes. Nodes return the external IP address that they have received and a simple
6//! majority is chosen as our external IP address. If an external IP address is updated, this is
7//! produced as an event to notify the swarm (if one is used for this behaviour).
8//!
9//! This implementation consists of a [`Discv4`] and [`Discv4Service`] pair. The service manages the
10//! state and drives the UDP socket. The (optional) [`Discv4`] serves as the frontend to interact
11//! with the service via a channel. Whenever the underlying table changes service produces a
12//! [`DiscoveryUpdate`] that listeners will receive.
13//!
14//! ## Feature Flags
15//!
16//! - `serde` (default): Enable serde support
17//! - `test-utils`: Export utilities for testing
18
19#![doc(
20    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
21    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
22    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
23)]
24#![cfg_attr(not(test), warn(unused_crate_dependencies))]
25#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
26
27use crate::{
28    error::{DecodePacketError, Discv4Error},
29    proto::{FindNode, Message, Neighbours, Packet, Ping, Pong},
30};
31use alloy_primitives::{bytes::Bytes, hex, B256};
32use discv5::{
33    kbucket,
34    kbucket::{
35        BucketInsertResult, Distance, Entry as BucketEntry, InsertResult, KBucketsTable,
36        NodeStatus, MAX_NODES_PER_BUCKET,
37    },
38    ConnectionDirection, ConnectionState,
39};
40use enr::Enr;
41use itertools::Itertools;
42use parking_lot::Mutex;
43use proto::{EnrRequest, EnrResponse};
44use reth_ethereum_forks::ForkId;
45use reth_network_peers::{pk2id, PeerId};
46use secp256k1::SecretKey;
47use std::{
48    cell::RefCell,
49    collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque},
50    fmt,
51    future::poll_fn,
52    io,
53    net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
54    pin::Pin,
55    rc::Rc,
56    sync::Arc,
57    task::{ready, Context, Poll},
58    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
59};
60use tokio::{
61    net::UdpSocket,
62    sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::Sender as OneshotSender},
63    task::{JoinHandle, JoinSet},
64    time::Interval,
65};
66use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
67use tracing::{debug, trace};
68
69pub mod error;
70pub mod proto;
71
72mod config;
73pub use config::{Discv4Config, Discv4ConfigBuilder};
74
75mod node;
76use node::{kad_key, NodeKey};
77
78mod table;
79
80// reexport NodeRecord primitive
81pub use reth_network_peers::NodeRecord;
82
83#[cfg(any(test, feature = "test-utils"))]
84pub mod test_utils;
85
86use crate::table::PongTable;
87use reth_net_nat::ResolveNatInterval;
88/// reexport to get public ip.
89pub use reth_net_nat::{external_ip, NatResolver};
90
91/// The default address for discv4 via UDP
92///
93/// Note: the default TCP address is the same.
94pub const DEFAULT_DISCOVERY_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
95
96/// The default port for discv4 via UDP
97///
98/// Note: the default TCP port is the same.
99pub const DEFAULT_DISCOVERY_PORT: u16 = 30303;
100
101/// The default address for discv4 via UDP: "0.0.0.0:30303"
102///
103/// Note: The default TCP address is the same.
104pub const DEFAULT_DISCOVERY_ADDRESS: SocketAddr =
105    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT));
106
107/// The maximum size of any packet is 1280 bytes.
108const MAX_PACKET_SIZE: usize = 1280;
109
110/// Length of the UDP datagram packet-header: Hash(32b) + Signature(65b) + Packet Type(1b)
111const MIN_PACKET_SIZE: usize = 32 + 65 + 1;
112
113/// Concurrency factor for `FindNode` requests to pick `ALPHA` closest nodes, <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>
114const ALPHA: usize = 3;
115
116/// Maximum number of nodes to ping at concurrently.
117///
118/// This corresponds to 2 full `Neighbours` responses with 16 _new_ nodes. This will apply some
119/// backpressure in recursive lookups.
120const MAX_NODES_PING: usize = 2 * MAX_NODES_PER_BUCKET;
121
122/// Maximum number of pings to keep queued.
123///
124/// If we are currently sending too many pings, any new pings will be queued. To prevent unbounded
125/// growth of the queue, the queue has a maximum capacity, after which any additional pings will be
126/// discarded.
127///
128/// This corresponds to 2 full `Neighbours` responses with 16 new nodes.
129const MAX_QUEUED_PINGS: usize = 2 * MAX_NODES_PER_BUCKET;
130
131/// The size of the datagram is limited [`MAX_PACKET_SIZE`], 16 nodes, as the discv4 specifies don't
132/// fit in one datagram. The safe number of nodes that always fit in a datagram is 12, with worst
133/// case all of them being IPv6 nodes. This is calculated by `(MAX_PACKET_SIZE - (header + expire +
134/// rlp overhead) / size(rlp(Node_IPv6))`
135/// Even in the best case where all nodes are IPv4, only 14 nodes fit into one packet.
136const SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS: usize = (MAX_PACKET_SIZE - 109) / 91;
137
138/// The timeout used to identify expired nodes, 24h
139///
140/// Mirrors geth's `bondExpiration` of 24h
141const ENDPOINT_PROOF_EXPIRATION: Duration = Duration::from_secs(24 * 60 * 60);
142
143/// Duration used to expire nodes from the routing table 1hr
144const EXPIRE_DURATION: Duration = Duration::from_secs(60 * 60);
145
146// Restricts how many udp messages can be processed in a single [Discv4Service::poll] call.
147//
148// This will act as a manual yield point when draining the socket messages where the most CPU
149// expensive part is handling outgoing messages: encoding and hashing the packet
150const UDP_MESSAGE_POLL_LOOP_BUDGET: i32 = 4;
151
152type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>;
153type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>;
154
155pub(crate) type IngressSender = mpsc::Sender<IngressEvent>;
156pub(crate) type IngressReceiver = mpsc::Receiver<IngressEvent>;
157
158type NodeRecordSender = OneshotSender<Vec<NodeRecord>>;
159
160/// The Discv4 frontend.
161///
162/// This is a cloneable type that communicates with the [`Discv4Service`] by sending commands over a
163/// shared channel.
164///
165/// See also [`Discv4::spawn`]
166#[derive(Debug, Clone)]
167pub struct Discv4 {
168    /// The address of the udp socket
169    local_addr: SocketAddr,
170    /// channel to send commands over to the service
171    to_service: mpsc::UnboundedSender<Discv4Command>,
172    /// Tracks the local node record.
173    ///
174    /// This includes the currently tracked external IP address of the node.
175    node_record: Arc<Mutex<NodeRecord>>,
176}
177
178impl Discv4 {
179    /// Same as [`Self::bind`] but also spawns the service onto a new task.
180    ///
181    /// See also: [`Discv4Service::spawn()`]
182    pub async fn spawn(
183        local_address: SocketAddr,
184        local_enr: NodeRecord,
185        secret_key: SecretKey,
186        config: Discv4Config,
187    ) -> io::Result<Self> {
188        let (discv4, service) = Self::bind(local_address, local_enr, secret_key, config).await?;
189
190        service.spawn();
191
192        Ok(discv4)
193    }
194
195    /// Returns a new instance with the given channel directly
196    ///
197    /// NOTE: this is only intended for test setups.
198    #[cfg(feature = "test-utils")]
199    pub fn noop() -> Self {
200        let (to_service, _rx) = mpsc::unbounded_channel();
201        let local_addr =
202            (IpAddr::from(std::net::Ipv4Addr::UNSPECIFIED), DEFAULT_DISCOVERY_PORT).into();
203        Self {
204            local_addr,
205            to_service,
206            node_record: Arc::new(Mutex::new(NodeRecord::new(
207                "127.0.0.1:3030".parse().unwrap(),
208                PeerId::random(),
209            ))),
210        }
211    }
212
213    /// Binds a new `UdpSocket` and creates the service
214    ///
215    /// ```
216    /// # use std::io;
217    /// use rand::thread_rng;
218    /// use reth_discv4::{Discv4, Discv4Config};
219    /// use reth_network_peers::{pk2id, NodeRecord, PeerId};
220    /// use secp256k1::SECP256K1;
221    /// use std::{net::SocketAddr, str::FromStr};
222    /// # async fn t() -> io::Result<()> {
223    /// // generate a (random) keypair
224    /// let mut rng = thread_rng();
225    /// let (secret_key, pk) = SECP256K1.generate_keypair(&mut rng);
226    /// let id = pk2id(&pk);
227    ///
228    /// let socket = SocketAddr::from_str("0.0.0.0:0").unwrap();
229    /// let local_enr =
230    ///     NodeRecord { address: socket.ip(), tcp_port: socket.port(), udp_port: socket.port(), id };
231    /// let config = Discv4Config::default();
232    ///
233    /// let (discv4, mut service) = Discv4::bind(socket, local_enr, secret_key, config).await.unwrap();
234    ///
235    /// // get an update strea
236    /// let updates = service.update_stream();
237    ///
238    /// let _handle = service.spawn();
239    ///
240    /// // lookup the local node in the DHT
241    /// let _discovered = discv4.lookup_self().await.unwrap();
242    ///
243    /// # Ok(())
244    /// # }
245    /// ```
246    pub async fn bind(
247        local_address: SocketAddr,
248        mut local_node_record: NodeRecord,
249        secret_key: SecretKey,
250        config: Discv4Config,
251    ) -> io::Result<(Self, Discv4Service)> {
252        let socket = UdpSocket::bind(local_address).await?;
253        let local_addr = socket.local_addr()?;
254        local_node_record.udp_port = local_addr.port();
255        trace!(target: "discv4", ?local_addr,"opened UDP socket");
256
257        let service = Discv4Service::new(socket, local_addr, local_node_record, secret_key, config);
258        let discv4 = service.handle();
259        Ok((discv4, service))
260    }
261
262    /// Returns the address of the UDP socket.
263    pub const fn local_addr(&self) -> SocketAddr {
264        self.local_addr
265    }
266
267    /// Returns the [`NodeRecord`] of the local node.
268    ///
269    /// This includes the currently tracked external IP address of the node.
270    pub fn node_record(&self) -> NodeRecord {
271        *self.node_record.lock()
272    }
273
274    /// Returns the currently tracked external IP of the node.
275    pub fn external_ip(&self) -> IpAddr {
276        self.node_record.lock().address
277    }
278
279    /// Sets the [Interval] used for periodically looking up targets over the network
280    pub fn set_lookup_interval(&self, duration: Duration) {
281        self.send_to_service(Discv4Command::SetLookupInterval(duration))
282    }
283
284    /// Starts a `FindNode` recursive lookup that locates the closest nodes to the given node id. See also: <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>
285    ///
286    /// The lookup initiator starts by picking α closest nodes to the target it knows of. The
287    /// initiator then sends concurrent `FindNode` packets to those nodes. α is a system-wide
288    /// concurrency parameter, such as 3. In the recursive step, the initiator resends `FindNode` to
289    /// nodes it has learned about from previous queries. Of the k nodes the initiator has heard of
290    /// closest to the target, it picks α that it has not yet queried and resends `FindNode` to
291    /// them. Nodes that fail to respond quickly are removed from consideration until and unless
292    /// they do respond.
293    //
294    // If a round of FindNode queries fails to return a node any closer than the closest already
295    // seen, the initiator resends the find node to all of the k closest nodes it has not already
296    // queried. The lookup terminates when the initiator has queried and gotten responses from the k
297    // closest nodes it has seen.
298    pub async fn lookup_self(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
299        self.lookup_node(None).await
300    }
301
302    /// Looks up the given node id.
303    ///
304    /// Returning the closest nodes to the given node id.
305    pub async fn lookup(&self, node_id: PeerId) -> Result<Vec<NodeRecord>, Discv4Error> {
306        self.lookup_node(Some(node_id)).await
307    }
308
309    /// Performs a random lookup for node records.
310    pub async fn lookup_random(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
311        let target = PeerId::random();
312        self.lookup_node(Some(target)).await
313    }
314
315    /// Sends a message to the service to lookup the closest nodes
316    pub fn send_lookup(&self, node_id: PeerId) {
317        let cmd = Discv4Command::Lookup { node_id: Some(node_id), tx: None };
318        self.send_to_service(cmd);
319    }
320
321    async fn lookup_node(&self, node_id: Option<PeerId>) -> Result<Vec<NodeRecord>, Discv4Error> {
322        let (tx, rx) = oneshot::channel();
323        let cmd = Discv4Command::Lookup { node_id, tx: Some(tx) };
324        self.to_service.send(cmd)?;
325        Ok(rx.await?)
326    }
327
328    /// Triggers a new self lookup without expecting a response
329    pub fn send_lookup_self(&self) {
330        let cmd = Discv4Command::Lookup { node_id: None, tx: None };
331        self.send_to_service(cmd);
332    }
333
334    /// Removes the peer from the table, if it exists.
335    pub fn remove_peer(&self, node_id: PeerId) {
336        let cmd = Discv4Command::Remove(node_id);
337        self.send_to_service(cmd);
338    }
339
340    /// Adds the node to the table, if it is not already present.
341    pub fn add_node(&self, node_record: NodeRecord) {
342        let cmd = Discv4Command::Add(node_record);
343        self.send_to_service(cmd);
344    }
345
346    /// Adds the peer and id to the ban list.
347    ///
348    /// This will prevent any future inclusion in the table
349    pub fn ban(&self, node_id: PeerId, ip: IpAddr) {
350        let cmd = Discv4Command::Ban(node_id, ip);
351        self.send_to_service(cmd);
352    }
353
354    /// Adds the ip to the ban list.
355    ///
356    /// This will prevent any future inclusion in the table
357    pub fn ban_ip(&self, ip: IpAddr) {
358        let cmd = Discv4Command::BanIp(ip);
359        self.send_to_service(cmd);
360    }
361
362    /// Adds the peer to the ban list.
363    ///
364    /// This will prevent any future inclusion in the table
365    pub fn ban_node(&self, node_id: PeerId) {
366        let cmd = Discv4Command::BanPeer(node_id);
367        self.send_to_service(cmd);
368    }
369
370    /// Sets the tcp port
371    ///
372    /// This will update our [`NodeRecord`]'s tcp port.
373    pub fn set_tcp_port(&self, port: u16) {
374        let cmd = Discv4Command::SetTcpPort(port);
375        self.send_to_service(cmd);
376    }
377
378    /// Sets the pair in the EIP-868 [`Enr`] of the node.
379    ///
380    /// If the key already exists, this will update it.
381    ///
382    /// CAUTION: The value **must** be rlp encoded
383    pub fn set_eip868_rlp_pair(&self, key: Vec<u8>, rlp: Bytes) {
384        let cmd = Discv4Command::SetEIP868RLPPair { key, rlp };
385        self.send_to_service(cmd);
386    }
387
388    /// Sets the pair in the EIP-868 [`Enr`] of the node.
389    ///
390    /// If the key already exists, this will update it.
391    pub fn set_eip868_rlp(&self, key: Vec<u8>, value: impl alloy_rlp::Encodable) {
392        self.set_eip868_rlp_pair(key, Bytes::from(alloy_rlp::encode(&value)))
393    }
394
395    #[inline]
396    fn send_to_service(&self, cmd: Discv4Command) {
397        let _ = self.to_service.send(cmd).map_err(|err| {
398            debug!(
399                target: "discv4",
400                %err,
401                "channel capacity reached, dropping command",
402            )
403        });
404    }
405
406    /// Returns the receiver half of new listener channel that streams [`DiscoveryUpdate`]s.
407    pub async fn update_stream(&self) -> Result<ReceiverStream<DiscoveryUpdate>, Discv4Error> {
408        let (tx, rx) = oneshot::channel();
409        let cmd = Discv4Command::Updates(tx);
410        self.to_service.send(cmd)?;
411        Ok(rx.await?)
412    }
413
414    /// Terminates the spawned [`Discv4Service`].
415    pub fn terminate(&self) {
416        self.send_to_service(Discv4Command::Terminated);
417    }
418}
419
420/// Manages discv4 peer discovery over UDP.
421///
422/// This is a [Stream] to handles incoming and outgoing discv4 messages and emits updates via:
423/// [`Discv4Service::update_stream`].
424///
425/// This type maintains the discv Kademlia routing table and is responsible for performing lookups.
426///
427/// ## Lookups
428///
429/// See also [Recursive Lookups](https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup).
430/// Lookups are either triggered periodically or performaned on demand: [`Discv4::lookup`]
431/// Newly discovered nodes are emitted as [`DiscoveryUpdate::Added`] event to all subscribers:
432/// [`Discv4Service::update_stream`].
433#[must_use = "Stream does nothing unless polled"]
434pub struct Discv4Service {
435    /// Local address of the UDP socket.
436    local_address: SocketAddr,
437    /// The local ENR for EIP-868 <https://eips.ethereum.org/EIPS/eip-868>
438    local_eip_868_enr: Enr<SecretKey>,
439    /// Local ENR of the server.
440    local_node_record: NodeRecord,
441    /// Keeps track of the node record of the local node.
442    shared_node_record: Arc<Mutex<NodeRecord>>,
443    /// The secret key used to sign payloads
444    secret_key: SecretKey,
445    /// The UDP socket for sending and receiving messages.
446    _socket: Arc<UdpSocket>,
447    /// The spawned UDP tasks.
448    ///
449    /// Note: If dropped, the spawned send+receive tasks are aborted.
450    _tasks: JoinSet<()>,
451    /// The routing table.
452    kbuckets: KBucketsTable<NodeKey, NodeEntry>,
453    /// Receiver for incoming messages
454    ///
455    /// Receives incoming messages from the UDP task.
456    ingress: IngressReceiver,
457    /// Sender for sending outgoing messages
458    ///
459    /// Sends outgoind messages to the UDP task.
460    egress: EgressSender,
461    /// Buffered pending pings to apply backpressure.
462    ///
463    /// Lookups behave like bursts of requests: Endpoint proof followed by `FindNode` request. [Recursive lookups](https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup) can trigger multiple followup Pings+FindNode requests.
464    /// A cap on concurrent `Ping` prevents escalation where: A large number of new nodes
465    /// discovered via `FindNode` in a recursive lookup triggers a large number of `Ping`s, and
466    /// followup `FindNode` requests.... Buffering them effectively prevents high `Ping` peaks.
467    queued_pings: VecDeque<(NodeRecord, PingReason)>,
468    /// Currently active pings to specific nodes.
469    pending_pings: HashMap<PeerId, PingRequest>,
470    /// Currently active endpoint proof verification lookups to specific nodes.
471    ///
472    /// Entries here means we've proven the peer's endpoint but haven't completed our end of the
473    /// endpoint proof
474    pending_lookup: HashMap<PeerId, (Instant, LookupContext)>,
475    /// Currently active `FindNode` requests
476    pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
477    /// Currently active ENR requests
478    pending_enr_requests: HashMap<PeerId, EnrRequestState>,
479    /// Copy of he sender half of the commands channel for [Discv4]
480    to_service: mpsc::UnboundedSender<Discv4Command>,
481    /// Receiver half of the commands channel for [Discv4]
482    commands_rx: mpsc::UnboundedReceiver<Discv4Command>,
483    /// All subscribers for table updates
484    update_listeners: Vec<mpsc::Sender<DiscoveryUpdate>>,
485    /// The interval when to trigger random lookups
486    lookup_interval: Interval,
487    /// Used to rotate targets to lookup
488    lookup_rotator: LookupTargetRotator,
489    /// Interval when to recheck active requests
490    evict_expired_requests_interval: Interval,
491    /// Interval when to resend pings.
492    ping_interval: Interval,
493    /// The interval at which to attempt resolving external IP again.
494    resolve_external_ip_interval: Option<ResolveNatInterval>,
495    /// How this services is configured
496    config: Discv4Config,
497    /// Buffered events populated during poll.
498    queued_events: VecDeque<Discv4Event>,
499    /// Keeps track of nodes from which we have received a `Pong` message.
500    received_pongs: PongTable,
501    /// Interval used to expire additionally tracked nodes
502    expire_interval: Interval,
503}
504
505impl Discv4Service {
506    /// Create a new instance for a bound [`UdpSocket`].
507    pub(crate) fn new(
508        socket: UdpSocket,
509        local_address: SocketAddr,
510        local_node_record: NodeRecord,
511        secret_key: SecretKey,
512        config: Discv4Config,
513    ) -> Self {
514        let socket = Arc::new(socket);
515        let (ingress_tx, ingress_rx) = mpsc::channel(config.udp_ingress_message_buffer);
516        let (egress_tx, egress_rx) = mpsc::channel(config.udp_egress_message_buffer);
517        let mut tasks = JoinSet::<()>::new();
518
519        let udp = Arc::clone(&socket);
520        tasks.spawn(receive_loop(udp, ingress_tx, local_node_record.id));
521
522        let udp = Arc::clone(&socket);
523        tasks.spawn(send_loop(udp, egress_rx));
524
525        let kbuckets = KBucketsTable::new(
526            NodeKey::from(&local_node_record).into(),
527            Duration::from_secs(60),
528            MAX_NODES_PER_BUCKET,
529            None,
530            None,
531        );
532
533        let self_lookup_interval = tokio::time::interval(config.lookup_interval);
534
535        // Wait `ping_interval` and then start pinging every `ping_interval` because we want to wait
536        // for
537        let ping_interval = tokio::time::interval_at(
538            tokio::time::Instant::now() + config.ping_interval,
539            config.ping_interval,
540        );
541
542        let evict_expired_requests_interval = tokio::time::interval_at(
543            tokio::time::Instant::now() + config.request_timeout,
544            config.request_timeout,
545        );
546
547        let lookup_rotator = if config.enable_dht_random_walk {
548            LookupTargetRotator::default()
549        } else {
550            LookupTargetRotator::local_only()
551        };
552
553        // for EIP-868 construct an ENR
554        let local_eip_868_enr = {
555            let mut builder = Enr::builder();
556            builder.ip(local_node_record.address);
557            if local_node_record.address.is_ipv4() {
558                builder.udp4(local_node_record.udp_port);
559                builder.tcp4(local_node_record.tcp_port);
560            } else {
561                builder.udp6(local_node_record.udp_port);
562                builder.tcp6(local_node_record.tcp_port);
563            }
564
565            for (key, val) in &config.additional_eip868_rlp_pairs {
566                builder.add_value_rlp(key, val.clone());
567            }
568            builder.build(&secret_key).expect("v4 is set")
569        };
570
571        let (to_service, commands_rx) = mpsc::unbounded_channel();
572
573        let shared_node_record = Arc::new(Mutex::new(local_node_record));
574
575        Self {
576            local_address,
577            local_eip_868_enr,
578            local_node_record,
579            shared_node_record,
580            _socket: socket,
581            kbuckets,
582            secret_key,
583            _tasks: tasks,
584            ingress: ingress_rx,
585            egress: egress_tx,
586            queued_pings: VecDeque::with_capacity(MAX_QUEUED_PINGS),
587            pending_pings: Default::default(),
588            pending_lookup: Default::default(),
589            pending_find_nodes: Default::default(),
590            pending_enr_requests: Default::default(),
591            commands_rx,
592            to_service,
593            update_listeners: Vec::with_capacity(1),
594            lookup_interval: self_lookup_interval,
595            ping_interval,
596            evict_expired_requests_interval,
597            lookup_rotator,
598            resolve_external_ip_interval: config.resolve_external_ip_interval(),
599            config,
600            queued_events: Default::default(),
601            received_pongs: Default::default(),
602            expire_interval: tokio::time::interval(EXPIRE_DURATION),
603        }
604    }
605
606    /// Returns the frontend handle that can communicate with the service via commands.
607    pub fn handle(&self) -> Discv4 {
608        Discv4 {
609            local_addr: self.local_address,
610            to_service: self.to_service.clone(),
611            node_record: self.shared_node_record.clone(),
612        }
613    }
614
615    /// Returns the current enr sequence of the local record.
616    fn enr_seq(&self) -> Option<u64> {
617        self.config.enable_eip868.then(|| self.local_eip_868_enr.seq())
618    }
619
620    /// Sets the [Interval] used for periodically looking up targets over the network
621    pub fn set_lookup_interval(&mut self, duration: Duration) {
622        self.lookup_interval = tokio::time::interval(duration);
623    }
624
625    /// Sets the given ip address as the node's external IP in the node record announced in
626    /// discovery
627    pub fn set_external_ip_addr(&mut self, external_ip: IpAddr) {
628        if self.local_node_record.address != external_ip {
629            debug!(target: "discv4", ?external_ip, "Updating external ip");
630            self.local_node_record.address = external_ip;
631            let _ = self.local_eip_868_enr.set_ip(external_ip, &self.secret_key);
632            let mut lock = self.shared_node_record.lock();
633            *lock = self.local_node_record;
634            debug!(target: "discv4", enr=?self.local_eip_868_enr, "Updated local ENR");
635        }
636    }
637
638    /// Returns the [`PeerId`] that identifies this node
639    pub const fn local_peer_id(&self) -> &PeerId {
640        &self.local_node_record.id
641    }
642
643    /// Returns the address of the UDP socket
644    pub const fn local_addr(&self) -> SocketAddr {
645        self.local_address
646    }
647
648    /// Returns the ENR of this service.
649    ///
650    /// Note: this will include the external address if resolved.
651    pub const fn local_enr(&self) -> NodeRecord {
652        self.local_node_record
653    }
654
655    /// Returns mutable reference to ENR for testing.
656    #[cfg(test)]
657    pub fn local_enr_mut(&mut self) -> &mut NodeRecord {
658        &mut self.local_node_record
659    }
660
661    /// Returns true if the given `PeerId` is currently in the bucket
662    pub fn contains_node(&self, id: PeerId) -> bool {
663        let key = kad_key(id);
664        self.kbuckets.get_index(&key).is_some()
665    }
666
667    /// Bootstraps the local node to join the DHT.
668    ///
669    /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
670    /// own ID in the DHT. This introduces the local node to the other nodes
671    /// in the DHT and populates its routing table with the closest proven neighbours.
672    ///
673    /// This is similar to adding all bootnodes via [`Self::add_node`], but does not fire a
674    /// [`DiscoveryUpdate::Added`] event for the given bootnodes. So boot nodes don't appear in the
675    /// update stream, which is usually desirable, since bootnodes should not be connected to.
676    ///
677    /// If adding the configured bootnodes should result in a [`DiscoveryUpdate::Added`], see
678    /// [`Self::add_all_nodes`].
679    ///
680    /// **Note:** This is a noop if there are no bootnodes.
681    pub fn bootstrap(&mut self) {
682        for record in self.config.bootstrap_nodes.clone() {
683            debug!(target: "discv4", ?record, "pinging boot node");
684            let key = kad_key(record.id);
685            let entry = NodeEntry::new(record);
686
687            // insert the boot node in the table
688            match self.kbuckets.insert_or_update(
689                &key,
690                entry,
691                NodeStatus {
692                    state: ConnectionState::Disconnected,
693                    direction: ConnectionDirection::Outgoing,
694                },
695            ) {
696                InsertResult::Failed(_) => {}
697                _ => {
698                    self.try_ping(record, PingReason::InitialInsert);
699                }
700            }
701        }
702    }
703
704    /// Spawns this services onto a new task
705    ///
706    /// Note: requires a running tokio runtime
707    pub fn spawn(mut self) -> JoinHandle<()> {
708        tokio::task::spawn(async move {
709            self.bootstrap();
710
711            while let Some(event) = self.next().await {
712                trace!(target: "discv4", ?event, "processed");
713            }
714            trace!(target: "discv4", "service terminated");
715        })
716    }
717
718    /// Creates a new bounded channel for [`DiscoveryUpdate`]s.
719    pub fn update_stream(&mut self) -> ReceiverStream<DiscoveryUpdate> {
720        let (tx, rx) = mpsc::channel(512);
721        self.update_listeners.push(tx);
722        ReceiverStream::new(rx)
723    }
724
725    /// Looks up the local node in the DHT.
726    pub fn lookup_self(&mut self) {
727        self.lookup(self.local_node_record.id)
728    }
729
730    /// Looks up the given node in the DHT
731    ///
732    /// A `FindNode` packet requests information about nodes close to target. The target is a
733    /// 64-byte secp256k1 public key. When `FindNode` is received, the recipient should reply
734    /// with Neighbors packets containing the closest 16 nodes to target found in its local
735    /// table.
736    //
737    // To guard against traffic amplification attacks, Neighbors replies should only be sent if the
738    // sender of FindNode has been verified by the endpoint proof procedure.
739    pub fn lookup(&mut self, target: PeerId) {
740        self.lookup_with(target, None)
741    }
742
743    /// Starts the recursive lookup process for the given target, <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>.
744    ///
745    /// At first the `ALPHA` (==3, defined concurrency factor) nodes that are closest to the target
746    /// in the underlying DHT are selected to seed the lookup via `FindNode` requests. In the
747    /// recursive step, the initiator resends `FindNode` to nodes it has learned about from previous
748    /// queries.
749    ///
750    /// This takes an optional Sender through which all successfully discovered nodes are sent once
751    /// the request has finished.
752    fn lookup_with(&mut self, target: PeerId, tx: Option<NodeRecordSender>) {
753        trace!(target: "discv4", ?target, "Starting lookup");
754        let target_key = kad_key(target);
755
756        // Start a lookup context with the 16 (MAX_NODES_PER_BUCKET) closest nodes to which we have
757        // a valid endpoint proof
758        let ctx = LookupContext::new(
759            target_key.clone(),
760            self.kbuckets
761                .closest_values(&target_key)
762                .filter(|node| {
763                    node.value.has_endpoint_proof &&
764                        !self.pending_find_nodes.contains_key(&node.key.preimage().0)
765                })
766                .take(MAX_NODES_PER_BUCKET)
767                .map(|n| (target_key.distance(&n.key), n.value.record)),
768            tx,
769        );
770
771        // From those 16, pick the 3 closest to start the concurrent lookup.
772        let closest = ctx.closest(ALPHA);
773
774        if closest.is_empty() && self.pending_find_nodes.is_empty() {
775            // no closest nodes, and no lookup in progress: table is empty.
776            // This could happen if all records were deleted from the table due to missed pongs
777            // (e.g. connectivity problems over a long period of time, or issues during initial
778            // bootstrapping) so we attempt to bootstrap again
779            self.bootstrap();
780            return
781        }
782
783        trace!(target: "discv4", ?target, num = closest.len(), "Start lookup closest nodes");
784
785        for node in closest {
786            // here we still want to check against previous request failures and if necessary
787            // re-establish a new endpoint proof because it can be the case that the other node lost
788            // our entry and no longer has an endpoint proof on their end
789            self.find_node_checked(&node, ctx.clone());
790        }
791    }
792
793    /// Sends a new `FindNode` packet to the node with `target` as the lookup target.
794    ///
795    /// CAUTION: This expects there's a valid Endpoint proof to the given `node`.
796    fn find_node(&mut self, node: &NodeRecord, ctx: LookupContext) {
797        trace!(target: "discv4", ?node, lookup=?ctx.target(), "Sending FindNode");
798        ctx.mark_queried(node.id);
799        let id = ctx.target();
800        let msg = Message::FindNode(FindNode { id, expire: self.find_node_expiration() });
801        self.send_packet(msg, node.udp_addr());
802        self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx));
803    }
804
805    /// Sends a new `FindNode` packet to the node with `target` as the lookup target but checks
806    /// whether we should send a new ping first to renew the endpoint proof by checking the
807    /// previously failed findNode requests. It could be that the node is no longer reachable or
808    /// lost our entry.
809    fn find_node_checked(&mut self, node: &NodeRecord, ctx: LookupContext) {
810        let max_failures = self.config.max_find_node_failures;
811        let needs_ping = self
812            .on_entry(node.id, |entry| entry.exceeds_find_node_failures(max_failures))
813            .unwrap_or(true);
814        if needs_ping {
815            self.try_ping(*node, PingReason::Lookup(*node, ctx))
816        } else {
817            self.find_node(node, ctx)
818        }
819    }
820
821    /// Notifies all listeners.
822    ///
823    /// Removes all listeners that are closed.
824    fn notify(&mut self, update: DiscoveryUpdate) {
825        self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) {
826            Ok(()) => true,
827            Err(err) => match err {
828                TrySendError::Full(_) => true,
829                TrySendError::Closed(_) => false,
830            },
831        });
832    }
833
834    /// Adds the ip to the ban list indefinitely
835    pub fn ban_ip(&mut self, ip: IpAddr) {
836        self.config.ban_list.ban_ip(ip);
837    }
838
839    /// Adds the peer to the ban list indefinitely.
840    pub fn ban_node(&mut self, node_id: PeerId) {
841        self.remove_node(node_id);
842        self.config.ban_list.ban_peer(node_id);
843    }
844
845    /// Adds the ip to the ban list until the given timestamp.
846    pub fn ban_ip_until(&mut self, ip: IpAddr, until: Instant) {
847        self.config.ban_list.ban_ip_until(ip, until);
848    }
849
850    /// Adds the peer to the ban list and bans it until the given timestamp
851    pub fn ban_node_until(&mut self, node_id: PeerId, until: Instant) {
852        self.remove_node(node_id);
853        self.config.ban_list.ban_peer_until(node_id, until);
854    }
855
856    /// Removes a `node_id` from the routing table.
857    ///
858    /// This allows applications, for whatever reason, to remove nodes from the local routing
859    /// table. Returns `true` if the node was in the table and `false` otherwise.
860    pub fn remove_node(&mut self, node_id: PeerId) -> bool {
861        let key = kad_key(node_id);
862        self.remove_key(node_id, key)
863    }
864
865    /// Removes a `node_id` from the routing table but only if there are enough other nodes in the
866    /// bucket (bucket must be at least half full)
867    ///
868    /// Returns `true` if the node was removed
869    pub fn soft_remove_node(&mut self, node_id: PeerId) -> bool {
870        let key = kad_key(node_id);
871        let Some(bucket) = self.kbuckets.get_bucket(&key) else { return false };
872        if bucket.num_entries() < MAX_NODES_PER_BUCKET / 2 {
873            // skip half empty bucket
874            return false
875        }
876        self.remove_key(node_id, key)
877    }
878
879    fn remove_key(&mut self, node_id: PeerId, key: discv5::Key<NodeKey>) -> bool {
880        let removed = self.kbuckets.remove(&key);
881        if removed {
882            trace!(target: "discv4", ?node_id, "removed node");
883            self.notify(DiscoveryUpdate::Removed(node_id));
884        }
885        removed
886    }
887
888    /// Gets the number of entries that are considered connected.
889    pub fn num_connected(&self) -> usize {
890        self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected())
891    }
892
893    /// Check if the peer has an active bond.
894    fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool {
895        if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) {
896            if timestamp.elapsed() < self.config.bond_expiration {
897                return true
898            }
899        }
900        false
901    }
902
903    /// Applies a closure on the pending or present [`NodeEntry`].
904    fn on_entry<F, R>(&mut self, peer_id: PeerId, f: F) -> Option<R>
905    where
906        F: FnOnce(&NodeEntry) -> R,
907    {
908        let key = kad_key(peer_id);
909        match self.kbuckets.entry(&key) {
910            BucketEntry::Present(entry, _) => Some(f(entry.value())),
911            BucketEntry::Pending(mut entry, _) => Some(f(entry.value())),
912            _ => None,
913        }
914    }
915
916    /// Update the entry on RE-ping.
917    ///
918    /// Invoked when we received the Pong to our [`PingReason::RePing`] ping.
919    ///
920    /// On re-ping we check for a changed `enr_seq` if eip868 is enabled and when it changed we sent
921    /// a followup request to retrieve the updated ENR
922    fn update_on_reping(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
923        if record.id == self.local_node_record.id {
924            return
925        }
926
927        // If EIP868 extension is disabled then we want to ignore this
928        if !self.config.enable_eip868 {
929            last_enr_seq = None;
930        }
931
932        let key = kad_key(record.id);
933        let old_enr = match self.kbuckets.entry(&key) {
934            kbucket::Entry::Present(mut entry, _) => {
935                entry.value_mut().update_with_enr(last_enr_seq)
936            }
937            kbucket::Entry::Pending(mut entry, _) => entry.value().update_with_enr(last_enr_seq),
938            _ => return,
939        };
940
941        // Check if ENR was updated
942        match (last_enr_seq, old_enr) {
943            (Some(new), Some(old)) => {
944                if new > old {
945                    self.send_enr_request(record);
946                }
947            }
948            (Some(_), None) => {
949                // got an ENR
950                self.send_enr_request(record);
951            }
952            _ => {}
953        };
954    }
955
956    /// Callback invoked when we receive a pong from the peer.
957    fn update_on_pong(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
958        if record.id == *self.local_peer_id() {
959            return
960        }
961
962        // If EIP868 extension is disabled then we want to ignore this
963        if !self.config.enable_eip868 {
964            last_enr_seq = None;
965        }
966
967        // if the peer included a enr seq in the pong then we can try to request the ENR of that
968        // node
969        let has_enr_seq = last_enr_seq.is_some();
970
971        let key = kad_key(record.id);
972        match self.kbuckets.entry(&key) {
973            kbucket::Entry::Present(mut entry, old_status) => {
974                // endpoint is now proven
975                entry.value_mut().establish_proof();
976                entry.value_mut().update_with_enr(last_enr_seq);
977
978                if !old_status.is_connected() {
979                    let _ = entry.update(ConnectionState::Connected, Some(old_status.direction));
980                    trace!(target: "discv4", ?record, "added after successful endpoint proof");
981                    self.notify(DiscoveryUpdate::Added(record));
982
983                    if has_enr_seq {
984                        // request the ENR of the node
985                        self.send_enr_request(record);
986                    }
987                }
988            }
989            kbucket::Entry::Pending(mut entry, mut status) => {
990                // endpoint is now proven
991                entry.value().establish_proof();
992                entry.value().update_with_enr(last_enr_seq);
993
994                if !status.is_connected() {
995                    status.state = ConnectionState::Connected;
996                    let _ = entry.update(status);
997                    trace!(target: "discv4", ?record, "added after successful endpoint proof");
998                    self.notify(DiscoveryUpdate::Added(record));
999
1000                    if has_enr_seq {
1001                        // request the ENR of the node
1002                        self.send_enr_request(record);
1003                    }
1004                }
1005            }
1006            _ => {}
1007        };
1008    }
1009
1010    /// Adds all nodes
1011    ///
1012    /// See [`Self::add_node`]
1013    pub fn add_all_nodes(&mut self, records: impl IntoIterator<Item = NodeRecord>) {
1014        for record in records {
1015            self.add_node(record);
1016        }
1017    }
1018
1019    /// If the node's not in the table yet, this will add it to the table and start the endpoint
1020    /// proof by sending a ping to the node.
1021    ///
1022    /// Returns `true` if the record was added successfully, and `false` if the node is either
1023    /// already in the table or the record's bucket is full.
1024    pub fn add_node(&mut self, record: NodeRecord) -> bool {
1025        let key = kad_key(record.id);
1026        match self.kbuckets.entry(&key) {
1027            kbucket::Entry::Absent(entry) => {
1028                let node = NodeEntry::new(record);
1029                match entry.insert(
1030                    node,
1031                    NodeStatus {
1032                        direction: ConnectionDirection::Outgoing,
1033                        state: ConnectionState::Disconnected,
1034                    },
1035                ) {
1036                    BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1037                        trace!(target: "discv4", ?record, "inserted new record");
1038                    }
1039                    _ => return false,
1040                }
1041            }
1042            _ => return false,
1043        }
1044
1045        // send the initial ping to the _new_ node
1046        self.try_ping(record, PingReason::InitialInsert);
1047        true
1048    }
1049
1050    /// Encodes the packet, sends it and returns the hash.
1051    pub(crate) fn send_packet(&self, msg: Message, to: SocketAddr) -> B256 {
1052        let (payload, hash) = msg.encode(&self.secret_key);
1053        trace!(target: "discv4", r#type=?msg.msg_type(), ?to, ?hash, "sending packet");
1054        let _ = self.egress.try_send((payload, to)).map_err(|err| {
1055            debug!(
1056                target: "discv4",
1057                %err,
1058                "dropped outgoing packet",
1059            );
1060        });
1061        hash
1062    }
1063
1064    /// Message handler for an incoming `Ping`
1065    fn on_ping(&mut self, ping: Ping, remote_addr: SocketAddr, remote_id: PeerId, hash: B256) {
1066        if self.is_expired(ping.expire) {
1067            // ping's expiration timestamp is in the past
1068            return
1069        }
1070
1071        // create the record
1072        let record = NodeRecord {
1073            address: remote_addr.ip(),
1074            udp_port: remote_addr.port(),
1075            tcp_port: ping.from.tcp_port,
1076            id: remote_id,
1077        }
1078        .into_ipv4_mapped();
1079
1080        let key = kad_key(record.id);
1081
1082        // See also <https://github.com/ethereum/devp2p/blob/master/discv4.md#ping-packet-0x01>:
1083        // > If no communication with the sender of this ping has occurred within the last 12h, a
1084        // > ping should be sent in addition to pong in order to receive an endpoint proof.
1085        //
1086        // Note: we only mark if the node is absent because the `last 12h` condition is handled by
1087        // the ping interval
1088        let mut is_new_insert = false;
1089        let mut needs_bond = false;
1090        let mut is_proven = false;
1091
1092        let old_enr = match self.kbuckets.entry(&key) {
1093            kbucket::Entry::Present(mut entry, _) => {
1094                if entry.value().is_expired() {
1095                    // If no communication with the sender has occurred within the last 12h, a ping
1096                    // should be sent in addition to pong in order to receive an endpoint proof.
1097                    needs_bond = true;
1098                } else {
1099                    is_proven = entry.value().has_endpoint_proof;
1100                }
1101                entry.value_mut().update_with_enr(ping.enr_sq)
1102            }
1103            kbucket::Entry::Pending(mut entry, _) => {
1104                if entry.value().is_expired() {
1105                    // If no communication with the sender has occurred within the last 12h, a ping
1106                    // should be sent in addition to pong in order to receive an endpoint proof.
1107                    needs_bond = true;
1108                } else {
1109                    is_proven = entry.value().has_endpoint_proof;
1110                }
1111                entry.value().update_with_enr(ping.enr_sq)
1112            }
1113            kbucket::Entry::Absent(entry) => {
1114                let mut node = NodeEntry::new(record);
1115                node.last_enr_seq = ping.enr_sq;
1116
1117                match entry.insert(
1118                    node,
1119                    NodeStatus {
1120                        direction: ConnectionDirection::Incoming,
1121                        // mark as disconnected until endpoint proof established on pong
1122                        state: ConnectionState::Disconnected,
1123                    },
1124                ) {
1125                    BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1126                        // mark as new insert if insert was successful
1127                        is_new_insert = true;
1128                    }
1129                    BucketInsertResult::Full => {
1130                        // we received a ping but the corresponding bucket for the peer is already
1131                        // full, we can't add any additional peers to that bucket, but we still want
1132                        // to emit an event that we discovered the node
1133                        trace!(target: "discv4", ?record, "discovered new record but bucket is full");
1134                        self.notify(DiscoveryUpdate::DiscoveredAtCapacity(record));
1135                        needs_bond = true;
1136                    }
1137                    BucketInsertResult::TooManyIncoming | BucketInsertResult::NodeExists => {
1138                        needs_bond = true;
1139                        // insert unsuccessful but we still want to send the pong
1140                    }
1141                    BucketInsertResult::FailedFilter => return,
1142                }
1143
1144                None
1145            }
1146            kbucket::Entry::SelfEntry => return,
1147        };
1148
1149        // send the pong first, but the PONG and optionally PING don't need to be send in a
1150        // particular order
1151        let pong = Message::Pong(Pong {
1152            // we use the actual address of the peer
1153            to: record.into(),
1154            echo: hash,
1155            expire: ping.expire,
1156            enr_sq: self.enr_seq(),
1157        });
1158        self.send_packet(pong, remote_addr);
1159
1160        // if node was absent also send a ping to establish the endpoint proof from our end
1161        if is_new_insert {
1162            self.try_ping(record, PingReason::InitialInsert);
1163        } else if needs_bond {
1164            self.try_ping(record, PingReason::EstablishBond);
1165        } else if is_proven {
1166            // if node has been proven, this means we've received a pong and verified its endpoint
1167            // proof. We've also sent a pong above to verify our endpoint proof, so we can now
1168            // send our find_nodes request if PingReason::Lookup
1169            if let Some((_, ctx)) = self.pending_lookup.remove(&record.id) {
1170                if self.pending_find_nodes.contains_key(&record.id) {
1171                    // there's already another pending request, unmark it so the next round can
1172                    // try to send it
1173                    ctx.unmark_queried(record.id);
1174                } else {
1175                    // we just received a ping from that peer so we can send a find node request
1176                    // directly
1177                    self.find_node(&record, ctx);
1178                }
1179            }
1180        } else {
1181            // Request ENR if included in the ping
1182            match (ping.enr_sq, old_enr) {
1183                (Some(new), Some(old)) => {
1184                    if new > old {
1185                        self.send_enr_request(record);
1186                    }
1187                }
1188                (Some(_), None) => {
1189                    self.send_enr_request(record);
1190                }
1191                _ => {}
1192            };
1193        }
1194    }
1195
1196    // Guarding function for [`Self::send_ping`] that applies pre-checks
1197    fn try_ping(&mut self, node: NodeRecord, reason: PingReason) {
1198        if node.id == *self.local_peer_id() {
1199            // don't ping ourselves
1200            return
1201        }
1202
1203        if self.pending_pings.contains_key(&node.id) ||
1204            self.pending_find_nodes.contains_key(&node.id)
1205        {
1206            return
1207        }
1208
1209        if self.queued_pings.iter().any(|(n, _)| n.id == node.id) {
1210            return
1211        }
1212
1213        if self.pending_pings.len() < MAX_NODES_PING {
1214            self.send_ping(node, reason);
1215        } else if self.queued_pings.len() < MAX_QUEUED_PINGS {
1216            self.queued_pings.push_back((node, reason));
1217        }
1218    }
1219
1220    /// Sends a ping message to the node's UDP address.
1221    ///
1222    /// Returns the echo hash of the ping message.
1223    pub(crate) fn send_ping(&mut self, node: NodeRecord, reason: PingReason) -> B256 {
1224        let remote_addr = node.udp_addr();
1225        let id = node.id;
1226        let ping = Ping {
1227            from: self.local_node_record.into(),
1228            to: node.into(),
1229            expire: self.ping_expiration(),
1230            enr_sq: self.enr_seq(),
1231        };
1232        trace!(target: "discv4", ?ping, "sending ping");
1233        let echo_hash = self.send_packet(Message::Ping(ping), remote_addr);
1234
1235        self.pending_pings
1236            .insert(id, PingRequest { sent_at: Instant::now(), node, echo_hash, reason });
1237        echo_hash
1238    }
1239
1240    /// Sends an enr request message to the node's UDP address.
1241    ///
1242    /// Returns the echo hash of the ping message.
1243    pub(crate) fn send_enr_request(&mut self, node: NodeRecord) {
1244        if !self.config.enable_eip868 {
1245            return
1246        }
1247        let remote_addr = node.udp_addr();
1248        let enr_request = EnrRequest { expire: self.enr_request_expiration() };
1249
1250        trace!(target: "discv4", ?enr_request, "sending enr request");
1251        let echo_hash = self.send_packet(Message::EnrRequest(enr_request), remote_addr);
1252
1253        self.pending_enr_requests
1254            .insert(node.id, EnrRequestState { sent_at: Instant::now(), echo_hash });
1255    }
1256
1257    /// Message handler for an incoming `Pong`.
1258    fn on_pong(&mut self, pong: Pong, remote_addr: SocketAddr, remote_id: PeerId) {
1259        if self.is_expired(pong.expire) {
1260            return
1261        }
1262
1263        let PingRequest { node, reason, .. } = match self.pending_pings.entry(remote_id) {
1264            Entry::Occupied(entry) => {
1265                {
1266                    let request = entry.get();
1267                    if request.echo_hash != pong.echo {
1268                        trace!(target: "discv4", from=?remote_addr, expected=?request.echo_hash, echo_hash=?pong.echo,"Got unexpected Pong");
1269                        return
1270                    }
1271                }
1272                entry.remove()
1273            }
1274            Entry::Vacant(_) => return,
1275        };
1276
1277        // keep track of the pong
1278        self.received_pongs.on_pong(remote_id, remote_addr.ip());
1279
1280        match reason {
1281            PingReason::InitialInsert => {
1282                self.update_on_pong(node, pong.enr_sq);
1283            }
1284            PingReason::EstablishBond => {
1285                // same as `InitialInsert` which renews the bond if the peer is in the table
1286                self.update_on_pong(node, pong.enr_sq);
1287            }
1288            PingReason::RePing => {
1289                self.update_on_reping(node, pong.enr_sq);
1290            }
1291            PingReason::Lookup(node, ctx) => {
1292                self.update_on_pong(node, pong.enr_sq);
1293                // insert node and assoc. lookup_context into the pending_lookup table to complete
1294                // our side of the endpoint proof verification.
1295                // Start the lookup timer here - and evict accordingly. Note that this is a separate
1296                // timer than the ping_request timer.
1297                self.pending_lookup.insert(node.id, (Instant::now(), ctx));
1298            }
1299        }
1300    }
1301
1302    /// Handler for an incoming `FindNode` message
1303    fn on_find_node(&mut self, msg: FindNode, remote_addr: SocketAddr, node_id: PeerId) {
1304        if self.is_expired(msg.expire) {
1305            // expiration timestamp is in the past
1306            return
1307        }
1308        if node_id == *self.local_peer_id() {
1309            // ignore find node requests to ourselves
1310            return
1311        }
1312
1313        if self.has_bond(node_id, remote_addr.ip()) {
1314            self.respond_closest(msg.id, remote_addr)
1315        }
1316    }
1317
1318    /// Handler for incoming `EnrResponse` message
1319    fn on_enr_response(&mut self, msg: EnrResponse, remote_addr: SocketAddr, id: PeerId) {
1320        trace!(target: "discv4", ?remote_addr, ?msg, "received ENR response");
1321        if let Some(resp) = self.pending_enr_requests.remove(&id) {
1322            // ensure the ENR's public key matches the expected node id
1323            let enr_id = pk2id(&msg.enr.public_key());
1324            if id != enr_id {
1325                return
1326            }
1327
1328            if resp.echo_hash == msg.request_hash {
1329                let key = kad_key(id);
1330                let fork_id = msg.eth_fork_id();
1331                let (record, old_fork_id) = match self.kbuckets.entry(&key) {
1332                    kbucket::Entry::Present(mut entry, _) => {
1333                        let id = entry.value_mut().update_with_fork_id(fork_id);
1334                        (entry.value().record, id)
1335                    }
1336                    kbucket::Entry::Pending(mut entry, _) => {
1337                        let id = entry.value().update_with_fork_id(fork_id);
1338                        (entry.value().record, id)
1339                    }
1340                    _ => return,
1341                };
1342                match (fork_id, old_fork_id) {
1343                    (Some(new), Some(old)) => {
1344                        if new != old {
1345                            self.notify(DiscoveryUpdate::EnrForkId(record, new))
1346                        }
1347                    }
1348                    (Some(new), None) => self.notify(DiscoveryUpdate::EnrForkId(record, new)),
1349                    _ => {}
1350                }
1351            }
1352        }
1353    }
1354
1355    /// Handler for incoming `EnrRequest` message
1356    fn on_enr_request(
1357        &self,
1358        msg: EnrRequest,
1359        remote_addr: SocketAddr,
1360        id: PeerId,
1361        request_hash: B256,
1362    ) {
1363        if !self.config.enable_eip868 || self.is_expired(msg.expire) {
1364            return
1365        }
1366
1367        if self.has_bond(id, remote_addr.ip()) {
1368            self.send_packet(
1369                Message::EnrResponse(EnrResponse {
1370                    request_hash,
1371                    enr: self.local_eip_868_enr.clone(),
1372                }),
1373                remote_addr,
1374            );
1375        }
1376    }
1377
1378    /// Handler for incoming `Neighbours` messages that are handled if they're responses to
1379    /// `FindNode` requests.
1380    fn on_neighbours(&mut self, msg: Neighbours, remote_addr: SocketAddr, node_id: PeerId) {
1381        if self.is_expired(msg.expire) {
1382            // response is expired
1383            return
1384        }
1385        // check if this request was expected
1386        let ctx = match self.pending_find_nodes.entry(node_id) {
1387            Entry::Occupied(mut entry) => {
1388                {
1389                    let request = entry.get_mut();
1390                    // Mark the request as answered
1391                    request.answered = true;
1392                    let total = request.response_count + msg.nodes.len();
1393
1394                    // Neighbours response is exactly 1 bucket (16 entries).
1395                    if total <= MAX_NODES_PER_BUCKET {
1396                        request.response_count = total;
1397                    } else {
1398                        trace!(target: "discv4", total, from=?remote_addr, "Received neighbors packet entries exceeds max nodes per bucket");
1399                        return
1400                    }
1401                };
1402
1403                if entry.get().response_count == MAX_NODES_PER_BUCKET {
1404                    // node responding with a full bucket of records
1405                    let ctx = entry.remove().lookup_context;
1406                    ctx.mark_responded(node_id);
1407                    ctx
1408                } else {
1409                    entry.get().lookup_context.clone()
1410                }
1411            }
1412            Entry::Vacant(_) => {
1413                // received neighbours response without requesting it
1414                trace!(target: "discv4", from=?remote_addr, "Received unsolicited Neighbours");
1415                return
1416            }
1417        };
1418
1419        // log the peers we discovered
1420        trace!(target: "discv4",
1421            target=format!("{:#?}", node_id),
1422            peers_count=msg.nodes.len(),
1423            peers=format!("[{:#}]", msg.nodes.iter()
1424                .map(|node_rec| node_rec.id
1425            ).format(", ")),
1426            "Received peers from Neighbours packet"
1427        );
1428
1429        // This is the recursive lookup step where we initiate new FindNode requests for new nodes
1430        // that were discovered.
1431        for node in msg.nodes.into_iter().map(NodeRecord::into_ipv4_mapped) {
1432            // prevent banned peers from being added to the context
1433            if self.config.ban_list.is_banned(&node.id, &node.address) {
1434                trace!(target: "discv4", peer_id=?node.id, ip=?node.address, "ignoring banned record");
1435                continue
1436            }
1437
1438            ctx.add_node(node);
1439        }
1440
1441        // get the next closest nodes, not yet queried nodes and start over.
1442        let closest =
1443            ctx.filter_closest(ALPHA, |node| !self.pending_find_nodes.contains_key(&node.id));
1444
1445        for closest in closest {
1446            let key = kad_key(closest.id);
1447            match self.kbuckets.entry(&key) {
1448                BucketEntry::Absent(entry) => {
1449                    // the node's endpoint is not proven yet, so we need to ping it first, on
1450                    // success, we will add the node to the pending_lookup table, and wait to send
1451                    // back a Pong before initiating a FindNode request.
1452                    // In order to prevent that this node is selected again on subsequent responses,
1453                    // while the ping is still active, we always mark it as queried.
1454                    ctx.mark_queried(closest.id);
1455                    let node = NodeEntry::new(closest);
1456                    match entry.insert(
1457                        node,
1458                        NodeStatus {
1459                            direction: ConnectionDirection::Outgoing,
1460                            state: ConnectionState::Disconnected,
1461                        },
1462                    ) {
1463                        BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1464                            // only ping if the node was added to the table
1465                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1466                        }
1467                        BucketInsertResult::Full => {
1468                            // new node but the node's bucket is already full
1469                            self.notify(DiscoveryUpdate::DiscoveredAtCapacity(closest))
1470                        }
1471                        _ => {}
1472                    }
1473                }
1474                BucketEntry::SelfEntry => {
1475                    // we received our own node entry
1476                }
1477                BucketEntry::Present(entry, _) => {
1478                    if entry.value().has_endpoint_proof {
1479                        if entry
1480                            .value()
1481                            .exceeds_find_node_failures(self.config.max_find_node_failures)
1482                        {
1483                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1484                        } else {
1485                            self.find_node(&closest, ctx.clone());
1486                        }
1487                    }
1488                }
1489                BucketEntry::Pending(mut entry, _) => {
1490                    if entry.value().has_endpoint_proof {
1491                        if entry
1492                            .value()
1493                            .exceeds_find_node_failures(self.config.max_find_node_failures)
1494                        {
1495                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1496                        } else {
1497                            self.find_node(&closest, ctx.clone());
1498                        }
1499                    }
1500                }
1501            }
1502        }
1503    }
1504
1505    /// Sends a Neighbours packet for `target` to the given addr
1506    fn respond_closest(&mut self, target: PeerId, to: SocketAddr) {
1507        let key = kad_key(target);
1508        let expire = self.send_neighbours_expiration();
1509
1510        // get the MAX_NODES_PER_BUCKET closest nodes to the target
1511        let closest_nodes =
1512            self.kbuckets.closest_values(&key).take(MAX_NODES_PER_BUCKET).collect::<Vec<_>>();
1513
1514        for nodes in closest_nodes.chunks(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) {
1515            let nodes = nodes.iter().map(|node| node.value.record).collect::<Vec<NodeRecord>>();
1516            trace!(target: "discv4", len = nodes.len(), to=?to,"Sent neighbours packet");
1517            let msg = Message::Neighbours(Neighbours { nodes, expire });
1518            self.send_packet(msg, to);
1519        }
1520    }
1521
1522    fn evict_expired_requests(&mut self, now: Instant) {
1523        self.pending_enr_requests.retain(|_node_id, enr_request| {
1524            now.duration_since(enr_request.sent_at) < self.config.enr_expiration
1525        });
1526
1527        let mut failed_pings = Vec::new();
1528        self.pending_pings.retain(|node_id, ping_request| {
1529            if now.duration_since(ping_request.sent_at) > self.config.ping_expiration {
1530                failed_pings.push(*node_id);
1531                return false
1532            }
1533            true
1534        });
1535
1536        if !failed_pings.is_empty() {
1537            // remove nodes that failed to pong
1538            trace!(target: "discv4", num=%failed_pings.len(), "evicting nodes due to failed pong");
1539            for node_id in failed_pings {
1540                self.remove_node(node_id);
1541            }
1542        }
1543
1544        let mut failed_lookups = Vec::new();
1545        self.pending_lookup.retain(|node_id, (lookup_sent_at, _)| {
1546            if now.duration_since(*lookup_sent_at) > self.config.request_timeout {
1547                failed_lookups.push(*node_id);
1548                return false
1549            }
1550            true
1551        });
1552
1553        if !failed_lookups.is_empty() {
1554            // remove nodes that failed the e2e lookup process, so we can restart it
1555            trace!(target: "discv4", num=%failed_lookups.len(), "evicting nodes due to failed lookup");
1556            for node_id in failed_lookups {
1557                self.remove_node(node_id);
1558            }
1559        }
1560
1561        self.evict_failed_find_nodes(now);
1562    }
1563
1564    /// Handles failed responses to `FindNode`
1565    fn evict_failed_find_nodes(&mut self, now: Instant) {
1566        let mut failed_find_nodes = Vec::new();
1567        self.pending_find_nodes.retain(|node_id, find_node_request| {
1568            if now.duration_since(find_node_request.sent_at) > self.config.neighbours_expiration {
1569                if !find_node_request.answered {
1570                    // node actually responded but with fewer entries than expected, but we don't
1571                    // treat this as an hard error since it responded.
1572                    failed_find_nodes.push(*node_id);
1573                }
1574                return false
1575            }
1576            true
1577        });
1578
1579        if failed_find_nodes.is_empty() {
1580            return
1581        }
1582
1583        trace!(target: "discv4", num=%failed_find_nodes.len(), "processing failed find nodes");
1584
1585        for node_id in failed_find_nodes {
1586            let key = kad_key(node_id);
1587            let failures = match self.kbuckets.entry(&key) {
1588                kbucket::Entry::Present(mut entry, _) => {
1589                    entry.value_mut().inc_failed_request();
1590                    entry.value().find_node_failures
1591                }
1592                kbucket::Entry::Pending(mut entry, _) => {
1593                    entry.value().inc_failed_request();
1594                    entry.value().find_node_failures
1595                }
1596                _ => continue,
1597            };
1598
1599            // if the node failed to respond anything useful multiple times, remove the node from
1600            // the table, but only if there are enough other nodes in the bucket (bucket must be at
1601            // least half full)
1602            if failures > self.config.max_find_node_failures {
1603                self.soft_remove_node(node_id);
1604            }
1605        }
1606    }
1607
1608    /// Re-pings all nodes which endpoint proofs are considered expired: [`NodeEntry::is_expired`]
1609    ///
1610    /// This will send a `Ping` to the nodes, if a node fails to respond with a `Pong` to renew the
1611    /// endpoint proof it will be removed from the table.
1612    fn re_ping_oldest(&mut self) {
1613        let mut nodes = self
1614            .kbuckets
1615            .iter_ref()
1616            .filter(|entry| entry.node.value.is_expired())
1617            .map(|n| n.node.value)
1618            .collect::<Vec<_>>();
1619        nodes.sort_by(|a, b| a.last_seen.cmp(&b.last_seen));
1620        let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::<Vec<_>>();
1621        for node in to_ping {
1622            self.try_ping(node, PingReason::RePing)
1623        }
1624    }
1625
1626    /// Returns true if the expiration timestamp is in the past.
1627    fn is_expired(&self, expiration: u64) -> bool {
1628        self.ensure_not_expired(expiration).is_err()
1629    }
1630
1631    /// Validate that given timestamp is not expired.
1632    ///
1633    /// Note: this accepts the timestamp as u64 because this is used by the wire protocol, but the
1634    /// UNIX timestamp (number of non-leap seconds since January 1, 1970 0:00:00 UTC) is supposed to
1635    /// be an i64.
1636    ///
1637    /// Returns an error if:
1638    ///  - invalid UNIX timestamp (larger than `i64::MAX`)
1639    ///  - timestamp is expired (lower than current local UNIX timestamp)
1640    fn ensure_not_expired(&self, timestamp: u64) -> Result<(), ()> {
1641        // ensure the timestamp is a valid UNIX timestamp
1642        let _ = i64::try_from(timestamp).map_err(drop)?;
1643
1644        let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
1645        if self.config.enforce_expiration_timestamps && timestamp < now {
1646            trace!(target: "discv4", "Expired packet");
1647            return Err(())
1648        }
1649        Ok(())
1650    }
1651
1652    /// Pops buffered ping requests and sends them.
1653    fn ping_buffered(&mut self) {
1654        while self.pending_pings.len() < MAX_NODES_PING {
1655            match self.queued_pings.pop_front() {
1656                Some((next, reason)) => self.try_ping(next, reason),
1657                None => break,
1658            }
1659        }
1660    }
1661
1662    fn ping_expiration(&self) -> u64 {
1663        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.ping_expiration)
1664            .as_secs()
1665    }
1666
1667    fn find_node_expiration(&self) -> u64 {
1668        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.request_timeout)
1669            .as_secs()
1670    }
1671
1672    fn enr_request_expiration(&self) -> u64 {
1673        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.enr_expiration)
1674            .as_secs()
1675    }
1676
1677    fn send_neighbours_expiration(&self) -> u64 {
1678        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.neighbours_expiration)
1679            .as_secs()
1680    }
1681
1682    /// Polls the socket and advances the state.
1683    ///
1684    /// To prevent traffic amplification attacks, implementations must verify that the sender of a
1685    /// query participates in the discovery protocol. The sender of a packet is considered verified
1686    /// if it has sent a valid Pong response with matching ping hash within the last 12 hours.
1687    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Discv4Event> {
1688        loop {
1689            // drain buffered events first
1690            if let Some(event) = self.queued_events.pop_front() {
1691                return Poll::Ready(event)
1692            }
1693
1694            // trigger self lookup
1695            if self.config.enable_lookup {
1696                while self.lookup_interval.poll_tick(cx).is_ready() {
1697                    let target = self.lookup_rotator.next(&self.local_node_record.id);
1698                    self.lookup_with(target, None);
1699                }
1700            }
1701
1702            // re-ping some peers
1703            while self.ping_interval.poll_tick(cx).is_ready() {
1704                self.re_ping_oldest();
1705            }
1706
1707            if let Some(Poll::Ready(Some(ip))) =
1708                self.resolve_external_ip_interval.as_mut().map(|r| r.poll_tick(cx))
1709            {
1710                self.set_external_ip_addr(ip);
1711            }
1712
1713            // drain all incoming `Discv4` commands, this channel can never close
1714            while let Poll::Ready(Some(cmd)) = self.commands_rx.poll_recv(cx) {
1715                match cmd {
1716                    Discv4Command::Add(enr) => {
1717                        self.add_node(enr);
1718                    }
1719                    Discv4Command::Lookup { node_id, tx } => {
1720                        let node_id = node_id.unwrap_or(self.local_node_record.id);
1721                        self.lookup_with(node_id, tx);
1722                    }
1723                    Discv4Command::SetLookupInterval(duration) => {
1724                        self.set_lookup_interval(duration);
1725                    }
1726                    Discv4Command::Updates(tx) => {
1727                        let rx = self.update_stream();
1728                        let _ = tx.send(rx);
1729                    }
1730                    Discv4Command::BanPeer(node_id) => self.ban_node(node_id),
1731                    Discv4Command::Remove(node_id) => {
1732                        self.remove_node(node_id);
1733                    }
1734                    Discv4Command::Ban(node_id, ip) => {
1735                        self.ban_node(node_id);
1736                        self.ban_ip(ip);
1737                    }
1738                    Discv4Command::BanIp(ip) => {
1739                        self.ban_ip(ip);
1740                    }
1741                    Discv4Command::SetEIP868RLPPair { key, rlp } => {
1742                        debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair");
1743
1744                        let _ = self.local_eip_868_enr.insert_raw_rlp(key, rlp, &self.secret_key);
1745                    }
1746                    Discv4Command::SetTcpPort(port) => {
1747                        debug!(target: "discv4", %port, "Update tcp port");
1748                        self.local_node_record.tcp_port = port;
1749                        if self.local_node_record.address.is_ipv4() {
1750                            let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key);
1751                        } else {
1752                            let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key);
1753                        }
1754                    }
1755
1756                    Discv4Command::Terminated => {
1757                        // terminate the service
1758                        self.queued_events.push_back(Discv4Event::Terminated);
1759                    }
1760                }
1761            }
1762
1763            // restricts how many messages we process in a single poll before yielding back control
1764            let mut udp_message_budget = UDP_MESSAGE_POLL_LOOP_BUDGET;
1765
1766            // process all incoming datagrams
1767            while let Poll::Ready(Some(event)) = self.ingress.poll_recv(cx) {
1768                match event {
1769                    IngressEvent::RecvError(err) => {
1770                        debug!(target: "discv4", %err, "failed to read datagram");
1771                    }
1772                    IngressEvent::BadPacket(from, err, data) => {
1773                        trace!(target: "discv4", ?from, %err, packet=?hex::encode(&data), "bad packet");
1774                    }
1775                    IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => {
1776                        trace!(target: "discv4", r#type=?msg.msg_type(), from=?remote_addr,"received packet");
1777                        let event = match msg {
1778                            Message::Ping(ping) => {
1779                                self.on_ping(ping, remote_addr, node_id, hash);
1780                                Discv4Event::Ping
1781                            }
1782                            Message::Pong(pong) => {
1783                                self.on_pong(pong, remote_addr, node_id);
1784                                Discv4Event::Pong
1785                            }
1786                            Message::FindNode(msg) => {
1787                                self.on_find_node(msg, remote_addr, node_id);
1788                                Discv4Event::FindNode
1789                            }
1790                            Message::Neighbours(msg) => {
1791                                self.on_neighbours(msg, remote_addr, node_id);
1792                                Discv4Event::Neighbours
1793                            }
1794                            Message::EnrRequest(msg) => {
1795                                self.on_enr_request(msg, remote_addr, node_id, hash);
1796                                Discv4Event::EnrRequest
1797                            }
1798                            Message::EnrResponse(msg) => {
1799                                self.on_enr_response(msg, remote_addr, node_id);
1800                                Discv4Event::EnrResponse
1801                            }
1802                        };
1803
1804                        self.queued_events.push_back(event);
1805                    }
1806                }
1807
1808                udp_message_budget -= 1;
1809                if udp_message_budget < 0 {
1810                    trace!(target: "discv4", budget=UDP_MESSAGE_POLL_LOOP_BUDGET, "exhausted message poll budget");
1811                    if self.queued_events.is_empty() {
1812                        // we've exceeded the message budget and have no events to process
1813                        // this will make sure we're woken up again
1814                        cx.waker().wake_by_ref();
1815                    }
1816                    break
1817                }
1818            }
1819
1820            // try resending buffered pings
1821            self.ping_buffered();
1822
1823            // evict expired requests
1824            while self.evict_expired_requests_interval.poll_tick(cx).is_ready() {
1825                self.evict_expired_requests(Instant::now());
1826            }
1827
1828            // evict expired nodes
1829            while self.expire_interval.poll_tick(cx).is_ready() {
1830                self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION);
1831            }
1832
1833            if self.queued_events.is_empty() {
1834                return Poll::Pending
1835            }
1836        }
1837    }
1838}
1839
1840/// Endless future impl
1841impl Stream for Discv4Service {
1842    type Item = Discv4Event;
1843
1844    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1845        // Poll the internal poll method
1846        match ready!(self.get_mut().poll(cx)) {
1847            // if the service is terminated, return None to terminate the stream
1848            Discv4Event::Terminated => Poll::Ready(None),
1849            // For any other event, return Poll::Ready(Some(event))
1850            ev => Poll::Ready(Some(ev)),
1851        }
1852    }
1853}
1854
1855impl fmt::Debug for Discv4Service {
1856    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1857        f.debug_struct("Discv4Service")
1858            .field("local_address", &self.local_address)
1859            .field("local_peer_id", &self.local_peer_id())
1860            .field("local_node_record", &self.local_node_record)
1861            .field("queued_pings", &self.queued_pings)
1862            .field("pending_lookup", &self.pending_lookup)
1863            .field("pending_find_nodes", &self.pending_find_nodes)
1864            .field("lookup_interval", &self.lookup_interval)
1865            .finish_non_exhaustive()
1866    }
1867}
1868
1869/// The Event type the Service stream produces.
1870///
1871/// This is mainly used for testing purposes and represents messages the service processed
1872#[derive(Debug, Eq, PartialEq)]
1873pub enum Discv4Event {
1874    /// A `Ping` message was handled.
1875    Ping,
1876    /// A `Pong` message was handled.
1877    Pong,
1878    /// A `FindNode` message was handled.
1879    FindNode,
1880    /// A `Neighbours` message was handled.
1881    Neighbours,
1882    /// A `EnrRequest` message was handled.
1883    EnrRequest,
1884    /// A `EnrResponse` message was handled.
1885    EnrResponse,
1886    /// Service is being terminated
1887    Terminated,
1888}
1889
1890/// Continuously reads new messages from the channel and writes them to the socket
1891pub(crate) async fn send_loop(udp: Arc<UdpSocket>, rx: EgressReceiver) {
1892    let mut stream = ReceiverStream::new(rx);
1893    while let Some((payload, to)) = stream.next().await {
1894        match udp.send_to(&payload, to).await {
1895            Ok(size) => {
1896                trace!(target: "discv4", ?to, ?size,"sent payload");
1897            }
1898            Err(err) => {
1899                debug!(target: "discv4", ?to, %err,"Failed to send datagram.");
1900            }
1901        }
1902    }
1903}
1904
1905/// Rate limits the number of incoming packets from individual IPs to 1 packet/second
1906const MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP: usize = 60usize;
1907
1908/// Continuously awaits new incoming messages and sends them back through the channel.
1909///
1910/// The receive loop enforce primitive rate limiting for ips to prevent message spams from
1911/// individual IPs
1912pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_id: PeerId) {
1913    let send = |event: IngressEvent| async {
1914        let _ = tx.send(event).await.map_err(|err| {
1915            debug!(
1916                target: "discv4",
1917                 %err,
1918                "failed send incoming packet",
1919            )
1920        });
1921    };
1922
1923    let mut cache = ReceiveCache::default();
1924
1925    // tick at half the rate of the limit
1926    let tick = MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP / 2;
1927    let mut interval = tokio::time::interval(Duration::from_secs(tick as u64));
1928
1929    let mut buf = [0; MAX_PACKET_SIZE];
1930    loop {
1931        let res = udp.recv_from(&mut buf).await;
1932        match res {
1933            Err(err) => {
1934                debug!(target: "discv4", %err, "Failed to read datagram.");
1935                send(IngressEvent::RecvError(err)).await;
1936            }
1937            Ok((read, remote_addr)) => {
1938                // rate limit incoming packets by IP
1939                if cache.inc_ip(remote_addr.ip()) > MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP {
1940                    trace!(target: "discv4", ?remote_addr, "Too many incoming packets from IP.");
1941                    continue
1942                }
1943
1944                let packet = &buf[..read];
1945                match Message::decode(packet) {
1946                    Ok(packet) => {
1947                        if packet.node_id == local_id {
1948                            // received our own message
1949                            debug!(target: "discv4", ?remote_addr, "Received own packet.");
1950                            continue
1951                        }
1952
1953                        // skip if we've already received the same packet
1954                        if cache.contains_packet(packet.hash) {
1955                            debug!(target: "discv4", ?remote_addr, "Received duplicate packet.");
1956                            continue
1957                        }
1958
1959                        send(IngressEvent::Packet(remote_addr, packet)).await;
1960                    }
1961                    Err(err) => {
1962                        trace!(target: "discv4", %err,"Failed to decode packet");
1963                        send(IngressEvent::BadPacket(remote_addr, err, packet.to_vec())).await
1964                    }
1965                }
1966            }
1967        }
1968
1969        // reset the tracked ips if the interval has passed
1970        if poll_fn(|cx| match interval.poll_tick(cx) {
1971            Poll::Ready(_) => Poll::Ready(true),
1972            Poll::Pending => Poll::Ready(false),
1973        })
1974        .await
1975        {
1976            cache.tick_ips(tick);
1977        }
1978    }
1979}
1980
1981/// A cache for received packets and their source address.
1982///
1983/// This is used to discard duplicated packets and rate limit messages from the same source.
1984struct ReceiveCache {
1985    /// keeps track of how many messages we've received from a given IP address since the last
1986    /// tick.
1987    ///
1988    /// This is used to count the number of messages received from a given IP address within an
1989    /// interval.
1990    ip_messages: HashMap<IpAddr, usize>,
1991    // keeps track of unique packet hashes
1992    unique_packets: schnellru::LruMap<B256, ()>,
1993}
1994
1995impl ReceiveCache {
1996    /// Updates the counter for each IP address and removes IPs that have exceeded the limit.
1997    ///
1998    /// This will decrement the counter for each IP address and remove IPs that have reached 0.
1999    fn tick_ips(&mut self, tick: usize) {
2000        self.ip_messages.retain(|_, count| {
2001            if let Some(reset) = count.checked_sub(tick) {
2002                *count = reset;
2003                true
2004            } else {
2005                false
2006            }
2007        });
2008    }
2009
2010    /// Increases the counter for the given IP address and returns the new count.
2011    fn inc_ip(&mut self, ip: IpAddr) -> usize {
2012        let ctn = self.ip_messages.entry(ip).or_default();
2013        *ctn = ctn.saturating_add(1);
2014        *ctn
2015    }
2016
2017    /// Returns true if we previously received the packet
2018    fn contains_packet(&mut self, hash: B256) -> bool {
2019        !self.unique_packets.insert(hash, ())
2020    }
2021}
2022
2023impl Default for ReceiveCache {
2024    fn default() -> Self {
2025        Self {
2026            ip_messages: Default::default(),
2027            unique_packets: schnellru::LruMap::new(schnellru::ByLength::new(32)),
2028        }
2029    }
2030}
2031
2032/// The commands sent from the frontend [Discv4] to the service [`Discv4Service`].
2033enum Discv4Command {
2034    Add(NodeRecord),
2035    SetTcpPort(u16),
2036    SetEIP868RLPPair { key: Vec<u8>, rlp: Bytes },
2037    Ban(PeerId, IpAddr),
2038    BanPeer(PeerId),
2039    BanIp(IpAddr),
2040    Remove(PeerId),
2041    Lookup { node_id: Option<PeerId>, tx: Option<NodeRecordSender> },
2042    SetLookupInterval(Duration),
2043    Updates(OneshotSender<ReceiverStream<DiscoveryUpdate>>),
2044    Terminated,
2045}
2046
2047/// Event type receiver produces
2048#[derive(Debug)]
2049pub(crate) enum IngressEvent {
2050    /// Encountered an error when reading a datagram message.
2051    RecvError(io::Error),
2052    /// Received a bad message
2053    BadPacket(SocketAddr, DecodePacketError, Vec<u8>),
2054    /// Received a datagram from an address.
2055    Packet(SocketAddr, Packet),
2056}
2057
2058/// Tracks a sent ping
2059#[derive(Debug)]
2060struct PingRequest {
2061    // Timestamp when the request was sent.
2062    sent_at: Instant,
2063    // Node to which the request was sent.
2064    node: NodeRecord,
2065    // Hash sent in the Ping request
2066    echo_hash: B256,
2067    /// Why this ping was sent.
2068    reason: PingReason,
2069}
2070
2071/// Rotates the `PeerId` that is periodically looked up.
2072///
2073/// By selecting different targets, the lookups will be seeded with different ALPHA seed nodes.
2074#[derive(Debug)]
2075struct LookupTargetRotator {
2076    interval: usize,
2077    counter: usize,
2078}
2079
2080// === impl LookupTargetRotator ===
2081
2082impl LookupTargetRotator {
2083    /// Returns a rotator that always returns the local target.
2084    const fn local_only() -> Self {
2085        Self { interval: 1, counter: 0 }
2086    }
2087}
2088
2089impl Default for LookupTargetRotator {
2090    fn default() -> Self {
2091        Self {
2092            // every 4th lookup is our own node
2093            interval: 4,
2094            counter: 3,
2095        }
2096    }
2097}
2098
2099impl LookupTargetRotator {
2100    /// this will return the next node id to lookup
2101    fn next(&mut self, local: &PeerId) -> PeerId {
2102        self.counter += 1;
2103        self.counter %= self.interval;
2104        if self.counter == 0 {
2105            return *local
2106        }
2107        PeerId::random()
2108    }
2109}
2110
2111/// Tracks lookups across multiple `FindNode` requests.
2112///
2113/// If this type is dropped by all Clones, it will send all the discovered nodes to the listener, if
2114/// one is present.
2115#[derive(Clone, Debug)]
2116struct LookupContext {
2117    inner: Rc<LookupContextInner>,
2118}
2119
2120impl LookupContext {
2121    /// Create new context for a recursive lookup
2122    fn new(
2123        target: discv5::Key<NodeKey>,
2124        nearest_nodes: impl IntoIterator<Item = (Distance, NodeRecord)>,
2125        listener: Option<NodeRecordSender>,
2126    ) -> Self {
2127        let closest_nodes = nearest_nodes
2128            .into_iter()
2129            .map(|(distance, record)| {
2130                (distance, QueryNode { record, queried: false, responded: false })
2131            })
2132            .collect();
2133
2134        let inner = Rc::new(LookupContextInner {
2135            target,
2136            closest_nodes: RefCell::new(closest_nodes),
2137            listener,
2138        });
2139        Self { inner }
2140    }
2141
2142    /// Returns the target of this lookup
2143    fn target(&self) -> PeerId {
2144        self.inner.target.preimage().0
2145    }
2146
2147    fn closest(&self, num: usize) -> Vec<NodeRecord> {
2148        self.inner
2149            .closest_nodes
2150            .borrow()
2151            .iter()
2152            .filter(|(_, node)| !node.queried)
2153            .map(|(_, n)| n.record)
2154            .take(num)
2155            .collect()
2156    }
2157
2158    /// Returns the closest nodes that have not been queried yet.
2159    fn filter_closest<P>(&self, num: usize, filter: P) -> Vec<NodeRecord>
2160    where
2161        P: FnMut(&NodeRecord) -> bool,
2162    {
2163        self.inner
2164            .closest_nodes
2165            .borrow()
2166            .iter()
2167            .filter(|(_, node)| !node.queried)
2168            .map(|(_, n)| n.record)
2169            .filter(filter)
2170            .take(num)
2171            .collect()
2172    }
2173
2174    /// Inserts the node if it's missing
2175    fn add_node(&self, record: NodeRecord) {
2176        let distance = self.inner.target.distance(&kad_key(record.id));
2177        let mut closest = self.inner.closest_nodes.borrow_mut();
2178        if let btree_map::Entry::Vacant(entry) = closest.entry(distance) {
2179            entry.insert(QueryNode { record, queried: false, responded: false });
2180        }
2181    }
2182
2183    fn set_queried(&self, id: PeerId, val: bool) {
2184        if let Some((_, node)) =
2185            self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2186        {
2187            node.queried = val;
2188        }
2189    }
2190
2191    /// Marks the node as queried
2192    fn mark_queried(&self, id: PeerId) {
2193        self.set_queried(id, true)
2194    }
2195
2196    /// Marks the node as not queried
2197    fn unmark_queried(&self, id: PeerId) {
2198        self.set_queried(id, false)
2199    }
2200
2201    /// Marks the node as responded
2202    fn mark_responded(&self, id: PeerId) {
2203        if let Some((_, node)) =
2204            self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2205        {
2206            node.responded = true;
2207        }
2208    }
2209}
2210
2211// SAFETY: The [`Discv4Service`] is intended to be spawned as task which requires `Send`.
2212// The `LookupContext` is shared by all active `FindNode` requests that are part of the lookup step.
2213// Which can modify the context. The shared context is only ever accessed mutably when a `Neighbour`
2214// response is processed and all Clones are stored inside [`Discv4Service`], in other words it is
2215// guaranteed that there's only 1 owner ([`Discv4Service`]) of all possible [`Rc`] clones of
2216// [`LookupContext`].
2217unsafe impl Send for LookupContext {}
2218#[derive(Debug)]
2219struct LookupContextInner {
2220    /// The target to lookup.
2221    target: discv5::Key<NodeKey>,
2222    /// The closest nodes
2223    closest_nodes: RefCell<BTreeMap<Distance, QueryNode>>,
2224    /// A listener for all the nodes retrieved in this lookup
2225    ///
2226    /// This is present if the lookup was triggered manually via [Discv4] and we want to return all
2227    /// the nodes once the lookup finishes.
2228    listener: Option<NodeRecordSender>,
2229}
2230
2231impl Drop for LookupContextInner {
2232    fn drop(&mut self) {
2233        if let Some(tx) = self.listener.take() {
2234            // there's only 1 instance shared across `FindNode` requests, if this is dropped then
2235            // all requests finished, and we can send all results back
2236            let nodes = self
2237                .closest_nodes
2238                .take()
2239                .into_values()
2240                .filter(|node| node.responded)
2241                .map(|node| node.record)
2242                .collect();
2243            let _ = tx.send(nodes);
2244        }
2245    }
2246}
2247
2248/// Tracks the state of a recursive lookup step
2249#[derive(Debug, Clone, Copy)]
2250struct QueryNode {
2251    record: NodeRecord,
2252    queried: bool,
2253    responded: bool,
2254}
2255
2256#[derive(Debug)]
2257struct FindNodeRequest {
2258    // Timestamp when the request was sent.
2259    sent_at: Instant,
2260    // Number of items sent by the node
2261    response_count: usize,
2262    // Whether the request has been answered yet.
2263    answered: bool,
2264    /// Response buffer
2265    lookup_context: LookupContext,
2266}
2267
2268// === impl FindNodeRequest ===
2269
2270impl FindNodeRequest {
2271    fn new(resp: LookupContext) -> Self {
2272        Self { sent_at: Instant::now(), response_count: 0, answered: false, lookup_context: resp }
2273    }
2274}
2275
2276#[derive(Debug)]
2277struct EnrRequestState {
2278    // Timestamp when the request was sent.
2279    sent_at: Instant,
2280    // Hash sent in the Ping request
2281    echo_hash: B256,
2282}
2283
2284/// Stored node info.
2285#[derive(Debug, Clone, Eq, PartialEq)]
2286struct NodeEntry {
2287    /// Node record info.
2288    record: NodeRecord,
2289    /// Timestamp of last pong.
2290    last_seen: Instant,
2291    /// Last enr seq we retrieved via a ENR request.
2292    last_enr_seq: Option<u64>,
2293    /// `ForkId` if retrieved via ENR requests.
2294    fork_id: Option<ForkId>,
2295    /// Counter for failed _consecutive_ findNode requests.
2296    find_node_failures: u8,
2297    /// Whether the endpoint of the peer is proven.
2298    has_endpoint_proof: bool,
2299}
2300
2301// === impl NodeEntry ===
2302
2303impl NodeEntry {
2304    /// Creates a new, unpopulated entry
2305    fn new(record: NodeRecord) -> Self {
2306        Self {
2307            record,
2308            last_seen: Instant::now(),
2309            last_enr_seq: None,
2310            fork_id: None,
2311            find_node_failures: 0,
2312            has_endpoint_proof: false,
2313        }
2314    }
2315
2316    #[cfg(test)]
2317    fn new_proven(record: NodeRecord) -> Self {
2318        let mut node = Self::new(record);
2319        node.has_endpoint_proof = true;
2320        node
2321    }
2322
2323    /// Marks the entry with an established proof and resets the consecutive failure counter.
2324    fn establish_proof(&mut self) {
2325        self.has_endpoint_proof = true;
2326        self.find_node_failures = 0;
2327    }
2328
2329    /// Returns true if the tracked find node failures exceed the max amount
2330    const fn exceeds_find_node_failures(&self, max_failures: u8) -> bool {
2331        self.find_node_failures >= max_failures
2332    }
2333
2334    /// Updates the last timestamp and sets the enr seq
2335    fn update_with_enr(&mut self, last_enr_seq: Option<u64>) -> Option<u64> {
2336        self.update_now(|s| std::mem::replace(&mut s.last_enr_seq, last_enr_seq))
2337    }
2338
2339    /// Increases the failed request counter
2340    fn inc_failed_request(&mut self) {
2341        self.find_node_failures += 1;
2342    }
2343
2344    /// Updates the last timestamp and sets the enr seq
2345    fn update_with_fork_id(&mut self, fork_id: Option<ForkId>) -> Option<ForkId> {
2346        self.update_now(|s| std::mem::replace(&mut s.fork_id, fork_id))
2347    }
2348
2349    /// Updates the `last_seen` timestamp and calls the closure
2350    fn update_now<F, R>(&mut self, f: F) -> R
2351    where
2352        F: FnOnce(&mut Self) -> R,
2353    {
2354        self.last_seen = Instant::now();
2355        f(self)
2356    }
2357}
2358
2359// === impl NodeEntry ===
2360
2361impl NodeEntry {
2362    /// Returns true if the node should be re-pinged.
2363    fn is_expired(&self) -> bool {
2364        self.last_seen.elapsed() > (ENDPOINT_PROOF_EXPIRATION / 2)
2365    }
2366}
2367
2368/// Represents why a ping is issued
2369#[derive(Debug)]
2370enum PingReason {
2371    /// Initial ping to a previously unknown peer that was inserted into the table.
2372    InitialInsert,
2373    /// A ping to a peer to establish a bond (endpoint proof).
2374    EstablishBond,
2375    /// Re-ping a peer.
2376    RePing,
2377    /// Part of a lookup to ensure endpoint is proven before we can send a `FindNode` request.
2378    Lookup(NodeRecord, LookupContext),
2379}
2380
2381/// Represents node related updates state changes in the underlying node table
2382#[derive(Debug, Clone)]
2383pub enum DiscoveryUpdate {
2384    /// A new node was discovered _and_ added to the table.
2385    Added(NodeRecord),
2386    /// A new node was discovered but _not_ added to the table because it is currently full.
2387    DiscoveredAtCapacity(NodeRecord),
2388    /// Received a [`ForkId`] via EIP-868 for the given [`NodeRecord`].
2389    EnrForkId(NodeRecord, ForkId),
2390    /// Node that was removed from the table
2391    Removed(PeerId),
2392    /// A series of updates
2393    Batch(Vec<DiscoveryUpdate>),
2394}
2395
2396#[cfg(test)]
2397mod tests {
2398    use super::*;
2399    use crate::test_utils::{create_discv4, create_discv4_with_config, rng_endpoint, rng_record};
2400    use alloy_primitives::hex;
2401    use alloy_rlp::{Decodable, Encodable};
2402    use rand::{thread_rng, Rng};
2403    use reth_ethereum_forks::{EnrForkIdEntry, ForkHash};
2404    use reth_network_peers::mainnet_nodes;
2405    use std::future::poll_fn;
2406
2407    #[tokio::test]
2408    async fn test_configured_enr_forkid_entry() {
2409        let fork: ForkId = ForkId { hash: ForkHash([220, 233, 108, 45]), next: 0u64 };
2410        let mut disc_conf = Discv4Config::default();
2411        disc_conf.add_eip868_pair("eth", EnrForkIdEntry::from(fork));
2412        let (_discv4, service) = create_discv4_with_config(disc_conf).await;
2413        let eth = service.local_eip_868_enr.get_raw_rlp(b"eth").unwrap();
2414        let fork_entry_id = EnrForkIdEntry::decode(&mut &eth[..]).unwrap();
2415
2416        let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2417        let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2418        let expected = EnrForkIdEntry {
2419            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2420        };
2421        assert_eq!(expected, fork_entry_id);
2422        assert_eq!(expected, decoded);
2423    }
2424
2425    #[test]
2426    fn test_enr_forkid_entry_decode() {
2427        let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2428        let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2429        let expected = EnrForkIdEntry {
2430            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2431        };
2432        assert_eq!(expected, decoded);
2433    }
2434
2435    #[test]
2436    fn test_enr_forkid_entry_encode() {
2437        let original = EnrForkIdEntry {
2438            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2439        };
2440        let expected: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2441        let mut encoded = Vec::with_capacity(expected.len());
2442        original.encode(&mut encoded);
2443        assert_eq!(&expected[..], encoded.as_slice());
2444    }
2445
2446    #[test]
2447    fn test_local_rotator() {
2448        let id = PeerId::random();
2449        let mut rotator = LookupTargetRotator::local_only();
2450        assert_eq!(rotator.next(&id), id);
2451        assert_eq!(rotator.next(&id), id);
2452    }
2453
2454    #[test]
2455    fn test_rotator() {
2456        let id = PeerId::random();
2457        let mut rotator = LookupTargetRotator::default();
2458        assert_eq!(rotator.next(&id), id);
2459        assert_ne!(rotator.next(&id), id);
2460        assert_ne!(rotator.next(&id), id);
2461        assert_ne!(rotator.next(&id), id);
2462        assert_eq!(rotator.next(&id), id);
2463    }
2464
2465    #[tokio::test]
2466    async fn test_pending_ping() {
2467        let (_, mut service) = create_discv4().await;
2468
2469        let local_addr = service.local_addr();
2470
2471        let mut num_inserted = 0;
2472        loop {
2473            let node = NodeRecord::new(local_addr, PeerId::random());
2474            if service.add_node(node) {
2475                num_inserted += 1;
2476                assert!(service.pending_pings.contains_key(&node.id));
2477                assert_eq!(service.pending_pings.len(), num_inserted);
2478                if num_inserted == MAX_NODES_PING {
2479                    break
2480                }
2481            }
2482        }
2483
2484        // `pending_pings` is full, insert into `queued_pings`.
2485        num_inserted = 0;
2486        for _ in 0..MAX_NODES_PING {
2487            let node = NodeRecord::new(local_addr, PeerId::random());
2488            if service.add_node(node) {
2489                num_inserted += 1;
2490                assert!(!service.pending_pings.contains_key(&node.id));
2491                assert_eq!(service.pending_pings.len(), MAX_NODES_PING);
2492                assert_eq!(service.queued_pings.len(), num_inserted);
2493            }
2494        }
2495    }
2496
2497    // Bootstraps with mainnet boot nodes
2498    #[tokio::test(flavor = "multi_thread")]
2499    #[ignore]
2500    async fn test_mainnet_lookup() {
2501        reth_tracing::init_test_tracing();
2502        let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2503
2504        let all_nodes = mainnet_nodes();
2505        let config = Discv4Config::builder()
2506            .add_boot_nodes(all_nodes)
2507            .lookup_interval(Duration::from_secs(1))
2508            .add_eip868_pair("eth", fork_id)
2509            .build();
2510        let (_discv4, mut service) = create_discv4_with_config(config).await;
2511
2512        let mut updates = service.update_stream();
2513
2514        let _handle = service.spawn();
2515
2516        let mut table = HashMap::new();
2517        while let Some(update) = updates.next().await {
2518            match update {
2519                DiscoveryUpdate::EnrForkId(record, fork_id) => {
2520                    println!("{record:?}, {fork_id:?}");
2521                }
2522                DiscoveryUpdate::Added(record) => {
2523                    table.insert(record.id, record);
2524                }
2525                DiscoveryUpdate::Removed(id) => {
2526                    table.remove(&id);
2527                }
2528                _ => {}
2529            }
2530            println!("total peers {}", table.len());
2531        }
2532    }
2533
2534    #[tokio::test]
2535    async fn test_mapped_ipv4() {
2536        reth_tracing::init_test_tracing();
2537        let mut rng = thread_rng();
2538        let config = Discv4Config::builder().build();
2539        let (_discv4, mut service) = create_discv4_with_config(config).await;
2540
2541        let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2542        let v6 = v4.to_ipv6_mapped();
2543        let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2544
2545        let ping = Ping {
2546            from: rng_endpoint(&mut rng),
2547            to: rng_endpoint(&mut rng),
2548            expire: service.ping_expiration(),
2549            enr_sq: Some(rng.gen()),
2550        };
2551
2552        let id = PeerId::random_with(&mut rng);
2553        service.on_ping(ping, addr, id, rng.gen());
2554
2555        let key = kad_key(id);
2556        match service.kbuckets.entry(&key) {
2557            kbucket::Entry::Present(entry, _) => {
2558                let node_addr = entry.value().record.address;
2559                assert!(node_addr.is_ipv4());
2560                assert_eq!(node_addr, IpAddr::from(v4));
2561            }
2562            _ => unreachable!(),
2563        };
2564    }
2565
2566    #[tokio::test]
2567    async fn test_respect_ping_expiration() {
2568        reth_tracing::init_test_tracing();
2569        let mut rng = thread_rng();
2570        let config = Discv4Config::builder().build();
2571        let (_discv4, mut service) = create_discv4_with_config(config).await;
2572
2573        let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2574        let v6 = v4.to_ipv6_mapped();
2575        let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2576
2577        let ping = Ping {
2578            from: rng_endpoint(&mut rng),
2579            to: rng_endpoint(&mut rng),
2580            expire: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() - 1,
2581            enr_sq: Some(rng.gen()),
2582        };
2583
2584        let id = PeerId::random_with(&mut rng);
2585        service.on_ping(ping, addr, id, rng.gen());
2586
2587        let key = kad_key(id);
2588        match service.kbuckets.entry(&key) {
2589            kbucket::Entry::Absent(_) => {}
2590            _ => unreachable!(),
2591        };
2592    }
2593
2594    #[tokio::test]
2595    async fn test_single_lookups() {
2596        reth_tracing::init_test_tracing();
2597
2598        let config = Discv4Config::builder().build();
2599        let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2600
2601        let id = PeerId::random();
2602        let key = kad_key(id);
2603        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2604
2605        let _ = service.kbuckets.insert_or_update(
2606            &key,
2607            NodeEntry::new_proven(record),
2608            NodeStatus {
2609                direction: ConnectionDirection::Incoming,
2610                state: ConnectionState::Connected,
2611            },
2612        );
2613
2614        service.lookup_self();
2615        assert_eq!(service.pending_find_nodes.len(), 1);
2616
2617        poll_fn(|cx| {
2618            let _ = service.poll(cx);
2619            assert_eq!(service.pending_find_nodes.len(), 1);
2620
2621            Poll::Ready(())
2622        })
2623        .await;
2624    }
2625
2626    #[tokio::test]
2627    async fn test_on_neighbours_recursive_lookup() {
2628        reth_tracing::init_test_tracing();
2629
2630        let config = Discv4Config::builder().build();
2631        let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2632        let (_discv4, mut service2) = create_discv4_with_config(config).await;
2633
2634        let id = PeerId::random();
2635        let key = kad_key(id);
2636        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2637
2638        let _ = service.kbuckets.insert_or_update(
2639            &key,
2640            NodeEntry::new_proven(record),
2641            NodeStatus {
2642                direction: ConnectionDirection::Incoming,
2643                state: ConnectionState::Connected,
2644            },
2645        );
2646        // Needed in this test to populate self.pending_find_nodes for as a prereq to a valid
2647        // on_neighbours request
2648        service.lookup_self();
2649        assert_eq!(service.pending_find_nodes.len(), 1);
2650
2651        poll_fn(|cx| {
2652            let _ = service.poll(cx);
2653            assert_eq!(service.pending_find_nodes.len(), 1);
2654
2655            Poll::Ready(())
2656        })
2657        .await;
2658
2659        let expiry = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() +
2660            10000000000000;
2661        let msg = Neighbours { nodes: vec![service2.local_node_record], expire: expiry };
2662        service.on_neighbours(msg, record.tcp_addr(), id);
2663        // wait for the processed ping
2664        let event = poll_fn(|cx| service2.poll(cx)).await;
2665        assert_eq!(event, Discv4Event::Ping);
2666        // assert that no find_node req has been added here on top of the initial one, since both
2667        // sides of the endpoint proof is not completed here
2668        assert_eq!(service.pending_find_nodes.len(), 1);
2669        // we now wait for PONG
2670        let event = poll_fn(|cx| service.poll(cx)).await;
2671        assert_eq!(event, Discv4Event::Pong);
2672        // Ideally we want to assert against service.pending_lookup.len() here - but because the
2673        // service2 sends Pong and Ping consecutivley on_ping(), the pending_lookup table gets
2674        // drained almost immediately - and no way to grab the handle to its intermediary state here
2675        // :(
2676        let event = poll_fn(|cx| service.poll(cx)).await;
2677        assert_eq!(event, Discv4Event::Ping);
2678        // assert that we've added the find_node req here after both sides of the endpoint proof is
2679        // done
2680        assert_eq!(service.pending_find_nodes.len(), 2);
2681    }
2682
2683    #[tokio::test]
2684    async fn test_no_local_in_closest() {
2685        reth_tracing::init_test_tracing();
2686
2687        let config = Discv4Config::builder().build();
2688        let (_discv4, mut service) = create_discv4_with_config(config).await;
2689
2690        let target_key = kad_key(PeerId::random());
2691
2692        let id = PeerId::random();
2693        let key = kad_key(id);
2694        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2695
2696        let _ = service.kbuckets.insert_or_update(
2697            &key,
2698            NodeEntry::new(record),
2699            NodeStatus {
2700                direction: ConnectionDirection::Incoming,
2701                state: ConnectionState::Connected,
2702            },
2703        );
2704
2705        let closest = service
2706            .kbuckets
2707            .closest_values(&target_key)
2708            .map(|n| n.value.record)
2709            .take(MAX_NODES_PER_BUCKET)
2710            .collect::<Vec<_>>();
2711
2712        assert_eq!(closest.len(), 1);
2713        assert!(!closest.iter().any(|r| r.id == *service.local_peer_id()));
2714    }
2715
2716    #[tokio::test]
2717    async fn test_random_lookup() {
2718        reth_tracing::init_test_tracing();
2719
2720        let config = Discv4Config::builder().build();
2721        let (_discv4, mut service) = create_discv4_with_config(config).await;
2722
2723        let target = PeerId::random();
2724
2725        let id = PeerId::random();
2726        let key = kad_key(id);
2727        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2728
2729        let _ = service.kbuckets.insert_or_update(
2730            &key,
2731            NodeEntry::new_proven(record),
2732            NodeStatus {
2733                direction: ConnectionDirection::Incoming,
2734                state: ConnectionState::Connected,
2735            },
2736        );
2737
2738        service.lookup(target);
2739        assert_eq!(service.pending_find_nodes.len(), 1);
2740
2741        let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2742
2743        assert_eq!(ctx.target(), target);
2744        assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2745
2746        ctx.add_node(record);
2747        assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2748    }
2749
2750    #[tokio::test]
2751    async fn test_reping_on_find_node_failures() {
2752        reth_tracing::init_test_tracing();
2753
2754        let config = Discv4Config::builder().build();
2755        let (_discv4, mut service) = create_discv4_with_config(config).await;
2756
2757        let target = PeerId::random();
2758
2759        let id = PeerId::random();
2760        let key = kad_key(id);
2761        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2762
2763        let mut entry = NodeEntry::new_proven(record);
2764        entry.find_node_failures = u8::MAX;
2765        let _ = service.kbuckets.insert_or_update(
2766            &key,
2767            entry,
2768            NodeStatus {
2769                direction: ConnectionDirection::Incoming,
2770                state: ConnectionState::Connected,
2771            },
2772        );
2773
2774        service.lookup(target);
2775        assert_eq!(service.pending_find_nodes.len(), 0);
2776        assert_eq!(service.pending_pings.len(), 1);
2777
2778        service.update_on_pong(record, None);
2779
2780        service
2781            .on_entry(record.id, |entry| {
2782                // reset on pong
2783                assert_eq!(entry.find_node_failures, 0);
2784                assert!(entry.has_endpoint_proof);
2785            })
2786            .unwrap();
2787    }
2788
2789    #[tokio::test]
2790    async fn test_service_commands() {
2791        reth_tracing::init_test_tracing();
2792
2793        let config = Discv4Config::builder().build();
2794        let (discv4, mut service) = create_discv4_with_config(config).await;
2795
2796        service.lookup_self();
2797
2798        let _handle = service.spawn();
2799        discv4.send_lookup_self();
2800        let _ = discv4.lookup_self().await;
2801    }
2802
2803    #[tokio::test]
2804    async fn test_requests_timeout() {
2805        reth_tracing::init_test_tracing();
2806        let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2807
2808        let config = Discv4Config::builder()
2809            .request_timeout(Duration::from_millis(200))
2810            .ping_expiration(Duration::from_millis(200))
2811            .lookup_neighbours_expiration(Duration::from_millis(200))
2812            .add_eip868_pair("eth", fork_id)
2813            .build();
2814        let (_disv4, mut service) = create_discv4_with_config(config).await;
2815
2816        let id = PeerId::random();
2817        let key = kad_key(id);
2818        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2819
2820        let _ = service.kbuckets.insert_or_update(
2821            &key,
2822            NodeEntry::new_proven(record),
2823            NodeStatus {
2824                direction: ConnectionDirection::Incoming,
2825                state: ConnectionState::Connected,
2826            },
2827        );
2828
2829        service.lookup_self();
2830        assert_eq!(service.pending_find_nodes.len(), 1);
2831
2832        let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2833
2834        service.pending_lookup.insert(record.id, (Instant::now(), ctx));
2835
2836        assert_eq!(service.pending_lookup.len(), 1);
2837
2838        let ping = Ping {
2839            from: service.local_node_record.into(),
2840            to: record.into(),
2841            expire: service.ping_expiration(),
2842            enr_sq: service.enr_seq(),
2843        };
2844        let echo_hash = service.send_packet(Message::Ping(ping), record.udp_addr());
2845        let ping_request = PingRequest {
2846            sent_at: Instant::now(),
2847            node: record,
2848            echo_hash,
2849            reason: PingReason::InitialInsert,
2850        };
2851        service.pending_pings.insert(record.id, ping_request);
2852
2853        assert_eq!(service.pending_pings.len(), 1);
2854
2855        tokio::time::sleep(Duration::from_secs(1)).await;
2856
2857        poll_fn(|cx| {
2858            let _ = service.poll(cx);
2859
2860            assert_eq!(service.pending_find_nodes.len(), 0);
2861            assert_eq!(service.pending_lookup.len(), 0);
2862            assert_eq!(service.pending_pings.len(), 0);
2863
2864            Poll::Ready(())
2865        })
2866        .await;
2867    }
2868
2869    // sends a PING packet with wrong 'to' field and expects a PONG response.
2870    #[tokio::test(flavor = "multi_thread")]
2871    async fn test_check_wrong_to() {
2872        reth_tracing::init_test_tracing();
2873
2874        let config = Discv4Config::builder().external_ip_resolver(None).build();
2875        let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2876        let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2877
2878        // ping node 2 with wrong to field
2879        let mut ping = Ping {
2880            from: service_1.local_node_record.into(),
2881            to: service_2.local_node_record.into(),
2882            expire: service_1.ping_expiration(),
2883            enr_sq: service_1.enr_seq(),
2884        };
2885        ping.to.address = "192.0.2.0".parse().unwrap();
2886
2887        let echo_hash = service_1.send_packet(Message::Ping(ping), service_2.local_addr());
2888        let ping_request = PingRequest {
2889            sent_at: Instant::now(),
2890            node: service_2.local_node_record,
2891            echo_hash,
2892            reason: PingReason::InitialInsert,
2893        };
2894        service_1.pending_pings.insert(*service_2.local_peer_id(), ping_request);
2895
2896        // wait for the processed ping
2897        let event = poll_fn(|cx| service_2.poll(cx)).await;
2898        assert_eq!(event, Discv4Event::Ping);
2899
2900        // we now wait for PONG
2901        let event = poll_fn(|cx| service_1.poll(cx)).await;
2902        assert_eq!(event, Discv4Event::Pong);
2903        // followed by a ping
2904        let event = poll_fn(|cx| service_1.poll(cx)).await;
2905        assert_eq!(event, Discv4Event::Ping);
2906    }
2907
2908    #[tokio::test(flavor = "multi_thread")]
2909    async fn test_check_ping_pong() {
2910        reth_tracing::init_test_tracing();
2911
2912        let config = Discv4Config::builder().external_ip_resolver(None).build();
2913        let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2914        let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2915
2916        // send ping from 1 -> 2
2917        service_1.add_node(service_2.local_node_record);
2918
2919        // wait for the processed ping
2920        let event = poll_fn(|cx| service_2.poll(cx)).await;
2921        assert_eq!(event, Discv4Event::Ping);
2922
2923        // node is now in the table but not connected yet
2924        let key1 = kad_key(*service_1.local_peer_id());
2925        match service_2.kbuckets.entry(&key1) {
2926            kbucket::Entry::Present(_entry, status) => {
2927                assert!(!status.is_connected());
2928            }
2929            _ => unreachable!(),
2930        }
2931
2932        // we now wait for PONG
2933        let event = poll_fn(|cx| service_1.poll(cx)).await;
2934        assert_eq!(event, Discv4Event::Pong);
2935
2936        // endpoint is proven
2937        let key2 = kad_key(*service_2.local_peer_id());
2938        match service_1.kbuckets.entry(&key2) {
2939            kbucket::Entry::Present(_entry, status) => {
2940                assert!(status.is_connected());
2941            }
2942            _ => unreachable!(),
2943        }
2944
2945        // we now wait for the PING initiated by 2
2946        let event = poll_fn(|cx| service_1.poll(cx)).await;
2947        assert_eq!(event, Discv4Event::Ping);
2948
2949        // we now wait for PONG
2950        let event = poll_fn(|cx| service_2.poll(cx)).await;
2951
2952        match event {
2953            Discv4Event::EnrRequest => {
2954                // since we support enr in the ping it may also request the enr
2955                let event = poll_fn(|cx| service_2.poll(cx)).await;
2956                match event {
2957                    Discv4Event::EnrRequest => {
2958                        let event = poll_fn(|cx| service_2.poll(cx)).await;
2959                        assert_eq!(event, Discv4Event::Pong);
2960                    }
2961                    Discv4Event::Pong => {}
2962                    _ => {
2963                        unreachable!()
2964                    }
2965                }
2966            }
2967            Discv4Event::Pong => {}
2968            ev => unreachable!("{ev:?}"),
2969        }
2970
2971        // endpoint is proven
2972        match service_2.kbuckets.entry(&key1) {
2973            kbucket::Entry::Present(_entry, status) => {
2974                assert!(status.is_connected());
2975            }
2976            ev => unreachable!("{ev:?}"),
2977        }
2978    }
2979
2980    #[test]
2981    fn test_insert() {
2982        let local_node_record = rng_record(&mut rand::thread_rng());
2983        let mut kbuckets: KBucketsTable<NodeKey, NodeEntry> = KBucketsTable::new(
2984            NodeKey::from(&local_node_record).into(),
2985            Duration::from_secs(60),
2986            MAX_NODES_PER_BUCKET,
2987            None,
2988            None,
2989        );
2990
2991        let new_record = rng_record(&mut rand::thread_rng());
2992        let key = kad_key(new_record.id);
2993        match kbuckets.entry(&key) {
2994            kbucket::Entry::Absent(entry) => {
2995                let node = NodeEntry::new(new_record);
2996                let _ = entry.insert(
2997                    node,
2998                    NodeStatus {
2999                        direction: ConnectionDirection::Outgoing,
3000                        state: ConnectionState::Disconnected,
3001                    },
3002                );
3003            }
3004            _ => {
3005                unreachable!()
3006            }
3007        };
3008        match kbuckets.entry(&key) {
3009            kbucket::Entry::Present(_, _) => {}
3010            _ => {
3011                unreachable!()
3012            }
3013        }
3014    }
3015
3016    #[tokio::test]
3017    async fn test_bootnode_not_in_update_stream() {
3018        reth_tracing::init_test_tracing();
3019        let (_, service_1) = create_discv4().await;
3020        let peerid_1 = *service_1.local_peer_id();
3021
3022        let config = Discv4Config::builder().add_boot_node(service_1.local_node_record).build();
3023        service_1.spawn();
3024
3025        let (_, mut service_2) = create_discv4_with_config(config).await;
3026
3027        let mut updates = service_2.update_stream();
3028
3029        service_2.spawn();
3030
3031        // Poll for events for a reasonable time
3032        let mut bootnode_appeared = false;
3033        let timeout = tokio::time::sleep(Duration::from_secs(1));
3034        tokio::pin!(timeout);
3035
3036        loop {
3037            tokio::select! {
3038                Some(update) = updates.next() => {
3039                    if let DiscoveryUpdate::Added(record) = update {
3040                        if record.id == peerid_1 {
3041                            bootnode_appeared = true;
3042                            break;
3043                        }
3044                    }
3045                }
3046                _ = &mut timeout => break,
3047            }
3048        }
3049
3050        // Assert bootnode did not appear in update stream
3051        assert!(bootnode_appeared, "Bootnode should appear in update stream");
3052    }
3053}