reth_discv4/
lib.rs

1//! Discovery v4 implementation: <https://github.com/ethereum/devp2p/blob/master/discv4.md>
2//!
3//! Discv4 employs a kademlia-like routing table to store and manage discovered peers and topics.
4//! The protocol allows for external IP discovery in NAT environments through regular PING/PONG's
5//! with discovered nodes. Nodes return the external IP address that they have received and a simple
6//! majority is chosen as our external IP address. If an external IP address is updated, this is
7//! produced as an event to notify the swarm (if one is used for this behaviour).
8//!
9//! This implementation consists of a [`Discv4`] and [`Discv4Service`] pair. The service manages the
10//! state and drives the UDP socket. The (optional) [`Discv4`] serves as the frontend to interact
11//! with the service via a channel. Whenever the underlying table changes service produces a
12//! [`DiscoveryUpdate`] that listeners will receive.
13//!
14//! ## Feature Flags
15//!
16//! - `serde` (default): Enable serde support
17//! - `test-utils`: Export utilities for testing
18
19#![doc(
20    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
21    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
22    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
23)]
24#![cfg_attr(not(test), warn(unused_crate_dependencies))]
25#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
26
27use crate::{
28    error::{DecodePacketError, Discv4Error},
29    proto::{FindNode, Message, Neighbours, Packet, Ping, Pong},
30};
31use alloy_primitives::{bytes::Bytes, hex, B256};
32use discv5::{
33    kbucket,
34    kbucket::{
35        BucketInsertResult, Distance, Entry as BucketEntry, InsertResult, KBucketsTable,
36        NodeStatus, MAX_NODES_PER_BUCKET,
37    },
38    ConnectionDirection, ConnectionState,
39};
40use enr::Enr;
41use itertools::Itertools;
42use parking_lot::Mutex;
43use proto::{EnrRequest, EnrResponse};
44use reth_ethereum_forks::ForkId;
45use reth_network_peers::{pk2id, PeerId};
46use secp256k1::SecretKey;
47use std::{
48    cell::RefCell,
49    collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque},
50    fmt,
51    future::poll_fn,
52    io,
53    net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
54    pin::Pin,
55    rc::Rc,
56    sync::Arc,
57    task::{ready, Context, Poll},
58    time::{Duration, Instant, SystemTime, UNIX_EPOCH},
59};
60use tokio::{
61    net::UdpSocket,
62    sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::Sender as OneshotSender},
63    task::{JoinHandle, JoinSet},
64    time::Interval,
65};
66use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
67use tracing::{debug, trace};
68
69pub mod error;
70pub mod proto;
71
72mod config;
73pub use config::{Discv4Config, Discv4ConfigBuilder};
74
75mod node;
76use node::{kad_key, NodeKey};
77
78mod table;
79
80// reexport NodeRecord primitive
81pub use reth_network_peers::NodeRecord;
82
83#[cfg(any(test, feature = "test-utils"))]
84pub mod test_utils;
85
86use crate::table::PongTable;
87use reth_net_nat::ResolveNatInterval;
88/// reexport to get public ip.
89pub use reth_net_nat::{external_ip, NatResolver};
90
91/// The default address for discv4 via UDP
92///
93/// Note: the default TCP address is the same.
94pub const DEFAULT_DISCOVERY_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
95
96/// The default port for discv4 via UDP
97///
98/// Note: the default TCP port is the same.
99pub const DEFAULT_DISCOVERY_PORT: u16 = 30303;
100
101/// The default address for discv4 via UDP: "0.0.0.0:30303"
102///
103/// Note: The default TCP address is the same.
104pub const DEFAULT_DISCOVERY_ADDRESS: SocketAddr =
105    SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT));
106
107/// The maximum size of any packet is 1280 bytes.
108const MAX_PACKET_SIZE: usize = 1280;
109
110/// Length of the UDP datagram packet-header: Hash(32b) + Signature(65b) + Packet Type(1b)
111const MIN_PACKET_SIZE: usize = 32 + 65 + 1;
112
113/// Concurrency factor for `FindNode` requests to pick `ALPHA` closest nodes, <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>
114const ALPHA: usize = 3;
115
116/// Maximum number of nodes to ping at concurrently.
117///
118/// This corresponds to 2 full `Neighbours` responses with 16 _new_ nodes. This will apply some
119/// backpressure in recursive lookups.
120const MAX_NODES_PING: usize = 2 * MAX_NODES_PER_BUCKET;
121
122/// Maximum number of pings to keep queued.
123///
124/// If we are currently sending too many pings, any new pings will be queued. To prevent unbounded
125/// growth of the queue, the queue has a maximum capacity, after which any additional pings will be
126/// discarded.
127///
128/// This corresponds to 2 full `Neighbours` responses with 16 new nodes.
129const MAX_QUEUED_PINGS: usize = 2 * MAX_NODES_PER_BUCKET;
130
131/// The size of the datagram is limited [`MAX_PACKET_SIZE`], 16 nodes, as the discv4 specifies don't
132/// fit in one datagram. The safe number of nodes that always fit in a datagram is 12, with worst
133/// case all of them being IPv6 nodes. This is calculated by `(MAX_PACKET_SIZE - (header + expire +
134/// rlp overhead) / size(rlp(Node_IPv6))`
135/// Even in the best case where all nodes are IPv4, only 14 nodes fit into one packet.
136const SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS: usize = (MAX_PACKET_SIZE - 109) / 91;
137
138/// The timeout used to identify expired nodes, 24h
139///
140/// Mirrors geth's `bondExpiration` of 24h
141const ENDPOINT_PROOF_EXPIRATION: Duration = Duration::from_secs(24 * 60 * 60);
142
143/// Duration used to expire nodes from the routing table 1hr
144const EXPIRE_DURATION: Duration = Duration::from_secs(60 * 60);
145
146// Restricts how many udp messages can be processed in a single [Discv4Service::poll] call.
147//
148// This will act as a manual yield point when draining the socket messages where the most CPU
149// expensive part is handling outgoing messages: encoding and hashing the packet
150const UDP_MESSAGE_POLL_LOOP_BUDGET: i32 = 4;
151
152type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>;
153type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>;
154
155pub(crate) type IngressSender = mpsc::Sender<IngressEvent>;
156pub(crate) type IngressReceiver = mpsc::Receiver<IngressEvent>;
157
158type NodeRecordSender = OneshotSender<Vec<NodeRecord>>;
159
160/// The Discv4 frontend.
161///
162/// This is a cloneable type that communicates with the [`Discv4Service`] by sending commands over a
163/// shared channel.
164///
165/// See also [`Discv4::spawn`]
166#[derive(Debug, Clone)]
167pub struct Discv4 {
168    /// The address of the udp socket
169    local_addr: SocketAddr,
170    /// channel to send commands over to the service
171    to_service: mpsc::UnboundedSender<Discv4Command>,
172    /// Tracks the local node record.
173    ///
174    /// This includes the currently tracked external IP address of the node.
175    node_record: Arc<Mutex<NodeRecord>>,
176}
177
178impl Discv4 {
179    /// Same as [`Self::bind`] but also spawns the service onto a new task.
180    ///
181    /// See also: [`Discv4Service::spawn()`]
182    pub async fn spawn(
183        local_address: SocketAddr,
184        local_enr: NodeRecord,
185        secret_key: SecretKey,
186        config: Discv4Config,
187    ) -> io::Result<Self> {
188        let (discv4, service) = Self::bind(local_address, local_enr, secret_key, config).await?;
189
190        service.spawn();
191
192        Ok(discv4)
193    }
194
195    /// Returns a new instance with the given channel directly
196    ///
197    /// NOTE: this is only intended for test setups.
198    #[cfg(feature = "test-utils")]
199    pub fn noop() -> Self {
200        let (to_service, _rx) = mpsc::unbounded_channel();
201        let local_addr =
202            (IpAddr::from(std::net::Ipv4Addr::UNSPECIFIED), DEFAULT_DISCOVERY_PORT).into();
203        Self {
204            local_addr,
205            to_service,
206            node_record: Arc::new(Mutex::new(NodeRecord::new(
207                "127.0.0.1:3030".parse().unwrap(),
208                PeerId::random(),
209            ))),
210        }
211    }
212
213    /// Binds a new `UdpSocket` and creates the service
214    ///
215    /// ```
216    /// # use std::io;
217    /// use reth_discv4::{Discv4, Discv4Config};
218    /// use reth_network_peers::{pk2id, NodeRecord, PeerId};
219    /// use secp256k1::SECP256K1;
220    /// use std::{net::SocketAddr, str::FromStr};
221    /// # async fn t() -> io::Result<()> {
222    /// // generate a (random) keypair
223    /// let (secret_key, pk) = SECP256K1.generate_keypair(&mut rand_08::thread_rng());
224    /// let id = pk2id(&pk);
225    ///
226    /// let socket = SocketAddr::from_str("0.0.0.0:0").unwrap();
227    /// let local_enr =
228    ///     NodeRecord { address: socket.ip(), tcp_port: socket.port(), udp_port: socket.port(), id };
229    /// let config = Discv4Config::default();
230    ///
231    /// let (discv4, mut service) = Discv4::bind(socket, local_enr, secret_key, config).await.unwrap();
232    ///
233    /// // get an update strea
234    /// let updates = service.update_stream();
235    ///
236    /// let _handle = service.spawn();
237    ///
238    /// // lookup the local node in the DHT
239    /// let _discovered = discv4.lookup_self().await.unwrap();
240    ///
241    /// # Ok(())
242    /// # }
243    /// ```
244    pub async fn bind(
245        local_address: SocketAddr,
246        mut local_node_record: NodeRecord,
247        secret_key: SecretKey,
248        config: Discv4Config,
249    ) -> io::Result<(Self, Discv4Service)> {
250        let socket = UdpSocket::bind(local_address).await?;
251        let local_addr = socket.local_addr()?;
252        local_node_record.udp_port = local_addr.port();
253        trace!(target: "discv4", ?local_addr,"opened UDP socket");
254
255        let service = Discv4Service::new(socket, local_addr, local_node_record, secret_key, config);
256        let discv4 = service.handle();
257        Ok((discv4, service))
258    }
259
260    /// Returns the address of the UDP socket.
261    pub const fn local_addr(&self) -> SocketAddr {
262        self.local_addr
263    }
264
265    /// Returns the [`NodeRecord`] of the local node.
266    ///
267    /// This includes the currently tracked external IP address of the node.
268    pub fn node_record(&self) -> NodeRecord {
269        *self.node_record.lock()
270    }
271
272    /// Returns the currently tracked external IP of the node.
273    pub fn external_ip(&self) -> IpAddr {
274        self.node_record.lock().address
275    }
276
277    /// Sets the [Interval] used for periodically looking up targets over the network
278    pub fn set_lookup_interval(&self, duration: Duration) {
279        self.send_to_service(Discv4Command::SetLookupInterval(duration))
280    }
281
282    /// Starts a `FindNode` recursive lookup that locates the closest nodes to the given node id. See also: <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>
283    ///
284    /// The lookup initiator starts by picking α closest nodes to the target it knows of. The
285    /// initiator then sends concurrent `FindNode` packets to those nodes. α is a system-wide
286    /// concurrency parameter, such as 3. In the recursive step, the initiator resends `FindNode` to
287    /// nodes it has learned about from previous queries. Of the k nodes the initiator has heard of
288    /// closest to the target, it picks α that it has not yet queried and resends `FindNode` to
289    /// them. Nodes that fail to respond quickly are removed from consideration until and unless
290    /// they do respond.
291    //
292    // If a round of FindNode queries fails to return a node any closer than the closest already
293    // seen, the initiator resends the find node to all of the k closest nodes it has not already
294    // queried. The lookup terminates when the initiator has queried and gotten responses from the k
295    // closest nodes it has seen.
296    pub async fn lookup_self(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
297        self.lookup_node(None).await
298    }
299
300    /// Looks up the given node id.
301    ///
302    /// Returning the closest nodes to the given node id.
303    pub async fn lookup(&self, node_id: PeerId) -> Result<Vec<NodeRecord>, Discv4Error> {
304        self.lookup_node(Some(node_id)).await
305    }
306
307    /// Performs a random lookup for node records.
308    pub async fn lookup_random(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
309        let target = PeerId::random();
310        self.lookup_node(Some(target)).await
311    }
312
313    /// Sends a message to the service to lookup the closest nodes
314    pub fn send_lookup(&self, node_id: PeerId) {
315        let cmd = Discv4Command::Lookup { node_id: Some(node_id), tx: None };
316        self.send_to_service(cmd);
317    }
318
319    async fn lookup_node(&self, node_id: Option<PeerId>) -> Result<Vec<NodeRecord>, Discv4Error> {
320        let (tx, rx) = oneshot::channel();
321        let cmd = Discv4Command::Lookup { node_id, tx: Some(tx) };
322        self.to_service.send(cmd)?;
323        Ok(rx.await?)
324    }
325
326    /// Triggers a new self lookup without expecting a response
327    pub fn send_lookup_self(&self) {
328        let cmd = Discv4Command::Lookup { node_id: None, tx: None };
329        self.send_to_service(cmd);
330    }
331
332    /// Removes the peer from the table, if it exists.
333    pub fn remove_peer(&self, node_id: PeerId) {
334        let cmd = Discv4Command::Remove(node_id);
335        self.send_to_service(cmd);
336    }
337
338    /// Adds the node to the table, if it is not already present.
339    pub fn add_node(&self, node_record: NodeRecord) {
340        let cmd = Discv4Command::Add(node_record);
341        self.send_to_service(cmd);
342    }
343
344    /// Adds the peer and id to the ban list.
345    ///
346    /// This will prevent any future inclusion in the table
347    pub fn ban(&self, node_id: PeerId, ip: IpAddr) {
348        let cmd = Discv4Command::Ban(node_id, ip);
349        self.send_to_service(cmd);
350    }
351
352    /// Adds the ip to the ban list.
353    ///
354    /// This will prevent any future inclusion in the table
355    pub fn ban_ip(&self, ip: IpAddr) {
356        let cmd = Discv4Command::BanIp(ip);
357        self.send_to_service(cmd);
358    }
359
360    /// Adds the peer to the ban list.
361    ///
362    /// This will prevent any future inclusion in the table
363    pub fn ban_node(&self, node_id: PeerId) {
364        let cmd = Discv4Command::BanPeer(node_id);
365        self.send_to_service(cmd);
366    }
367
368    /// Sets the tcp port
369    ///
370    /// This will update our [`NodeRecord`]'s tcp port.
371    pub fn set_tcp_port(&self, port: u16) {
372        let cmd = Discv4Command::SetTcpPort(port);
373        self.send_to_service(cmd);
374    }
375
376    /// Sets the pair in the EIP-868 [`Enr`] of the node.
377    ///
378    /// If the key already exists, this will update it.
379    ///
380    /// CAUTION: The value **must** be rlp encoded
381    pub fn set_eip868_rlp_pair(&self, key: Vec<u8>, rlp: Bytes) {
382        let cmd = Discv4Command::SetEIP868RLPPair { key, rlp };
383        self.send_to_service(cmd);
384    }
385
386    /// Sets the pair in the EIP-868 [`Enr`] of the node.
387    ///
388    /// If the key already exists, this will update it.
389    pub fn set_eip868_rlp(&self, key: Vec<u8>, value: impl alloy_rlp::Encodable) {
390        self.set_eip868_rlp_pair(key, Bytes::from(alloy_rlp::encode(&value)))
391    }
392
393    #[inline]
394    fn send_to_service(&self, cmd: Discv4Command) {
395        let _ = self.to_service.send(cmd).map_err(|err| {
396            debug!(
397                target: "discv4",
398                %err,
399                "channel capacity reached, dropping command",
400            )
401        });
402    }
403
404    /// Returns the receiver half of new listener channel that streams [`DiscoveryUpdate`]s.
405    pub async fn update_stream(&self) -> Result<ReceiverStream<DiscoveryUpdate>, Discv4Error> {
406        let (tx, rx) = oneshot::channel();
407        let cmd = Discv4Command::Updates(tx);
408        self.to_service.send(cmd)?;
409        Ok(rx.await?)
410    }
411
412    /// Terminates the spawned [`Discv4Service`].
413    pub fn terminate(&self) {
414        self.send_to_service(Discv4Command::Terminated);
415    }
416}
417
418/// Manages discv4 peer discovery over UDP.
419///
420/// This is a [Stream] to handles incoming and outgoing discv4 messages and emits updates via:
421/// [`Discv4Service::update_stream`].
422///
423/// This type maintains the discv Kademlia routing table and is responsible for performing lookups.
424///
425/// ## Lookups
426///
427/// See also [Recursive Lookups](https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup).
428/// Lookups are either triggered periodically or performaned on demand: [`Discv4::lookup`]
429/// Newly discovered nodes are emitted as [`DiscoveryUpdate::Added`] event to all subscribers:
430/// [`Discv4Service::update_stream`].
431#[must_use = "Stream does nothing unless polled"]
432pub struct Discv4Service {
433    /// Local address of the UDP socket.
434    local_address: SocketAddr,
435    /// The local ENR for EIP-868 <https://eips.ethereum.org/EIPS/eip-868>
436    local_eip_868_enr: Enr<SecretKey>,
437    /// Local ENR of the server.
438    local_node_record: NodeRecord,
439    /// Keeps track of the node record of the local node.
440    shared_node_record: Arc<Mutex<NodeRecord>>,
441    /// The secret key used to sign payloads
442    secret_key: SecretKey,
443    /// The UDP socket for sending and receiving messages.
444    _socket: Arc<UdpSocket>,
445    /// The spawned UDP tasks.
446    ///
447    /// Note: If dropped, the spawned send+receive tasks are aborted.
448    _tasks: JoinSet<()>,
449    /// The routing table.
450    kbuckets: KBucketsTable<NodeKey, NodeEntry>,
451    /// Receiver for incoming messages
452    ///
453    /// Receives incoming messages from the UDP task.
454    ingress: IngressReceiver,
455    /// Sender for sending outgoing messages
456    ///
457    /// Sends outgoind messages to the UDP task.
458    egress: EgressSender,
459    /// Buffered pending pings to apply backpressure.
460    ///
461    /// Lookups behave like bursts of requests: Endpoint proof followed by `FindNode` request. [Recursive lookups](https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup) can trigger multiple followup Pings+FindNode requests.
462    /// A cap on concurrent `Ping` prevents escalation where: A large number of new nodes
463    /// discovered via `FindNode` in a recursive lookup triggers a large number of `Ping`s, and
464    /// followup `FindNode` requests.... Buffering them effectively prevents high `Ping` peaks.
465    queued_pings: VecDeque<(NodeRecord, PingReason)>,
466    /// Currently active pings to specific nodes.
467    pending_pings: HashMap<PeerId, PingRequest>,
468    /// Currently active endpoint proof verification lookups to specific nodes.
469    ///
470    /// Entries here means we've proven the peer's endpoint but haven't completed our end of the
471    /// endpoint proof
472    pending_lookup: HashMap<PeerId, (Instant, LookupContext)>,
473    /// Currently active `FindNode` requests
474    pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
475    /// Currently active ENR requests
476    pending_enr_requests: HashMap<PeerId, EnrRequestState>,
477    /// Copy of he sender half of the commands channel for [Discv4]
478    to_service: mpsc::UnboundedSender<Discv4Command>,
479    /// Receiver half of the commands channel for [Discv4]
480    commands_rx: mpsc::UnboundedReceiver<Discv4Command>,
481    /// All subscribers for table updates
482    update_listeners: Vec<mpsc::Sender<DiscoveryUpdate>>,
483    /// The interval when to trigger random lookups
484    lookup_interval: Interval,
485    /// Used to rotate targets to lookup
486    lookup_rotator: LookupTargetRotator,
487    /// Interval when to recheck active requests
488    evict_expired_requests_interval: Interval,
489    /// Interval when to resend pings.
490    ping_interval: Interval,
491    /// The interval at which to attempt resolving external IP again.
492    resolve_external_ip_interval: Option<ResolveNatInterval>,
493    /// How this services is configured
494    config: Discv4Config,
495    /// Buffered events populated during poll.
496    queued_events: VecDeque<Discv4Event>,
497    /// Keeps track of nodes from which we have received a `Pong` message.
498    received_pongs: PongTable,
499    /// Interval used to expire additionally tracked nodes
500    expire_interval: Interval,
501}
502
503impl Discv4Service {
504    /// Create a new instance for a bound [`UdpSocket`].
505    pub(crate) fn new(
506        socket: UdpSocket,
507        local_address: SocketAddr,
508        local_node_record: NodeRecord,
509        secret_key: SecretKey,
510        config: Discv4Config,
511    ) -> Self {
512        let socket = Arc::new(socket);
513        let (ingress_tx, ingress_rx) = mpsc::channel(config.udp_ingress_message_buffer);
514        let (egress_tx, egress_rx) = mpsc::channel(config.udp_egress_message_buffer);
515        let mut tasks = JoinSet::<()>::new();
516
517        let udp = Arc::clone(&socket);
518        tasks.spawn(receive_loop(udp, ingress_tx, local_node_record.id));
519
520        let udp = Arc::clone(&socket);
521        tasks.spawn(send_loop(udp, egress_rx));
522
523        let kbuckets = KBucketsTable::new(
524            NodeKey::from(&local_node_record).into(),
525            Duration::from_secs(60),
526            MAX_NODES_PER_BUCKET,
527            None,
528            None,
529        );
530
531        let self_lookup_interval = tokio::time::interval(config.lookup_interval);
532
533        // Wait `ping_interval` and then start pinging every `ping_interval` because we want to wait
534        // for
535        let ping_interval = tokio::time::interval_at(
536            tokio::time::Instant::now() + config.ping_interval,
537            config.ping_interval,
538        );
539
540        let evict_expired_requests_interval = tokio::time::interval_at(
541            tokio::time::Instant::now() + config.request_timeout,
542            config.request_timeout,
543        );
544
545        let lookup_rotator = if config.enable_dht_random_walk {
546            LookupTargetRotator::default()
547        } else {
548            LookupTargetRotator::local_only()
549        };
550
551        // for EIP-868 construct an ENR
552        let local_eip_868_enr = {
553            let mut builder = Enr::builder();
554            builder.ip(local_node_record.address);
555            if local_node_record.address.is_ipv4() {
556                builder.udp4(local_node_record.udp_port);
557                builder.tcp4(local_node_record.tcp_port);
558            } else {
559                builder.udp6(local_node_record.udp_port);
560                builder.tcp6(local_node_record.tcp_port);
561            }
562
563            for (key, val) in &config.additional_eip868_rlp_pairs {
564                builder.add_value_rlp(key, val.clone());
565            }
566            builder.build(&secret_key).expect("v4 is set")
567        };
568
569        let (to_service, commands_rx) = mpsc::unbounded_channel();
570
571        let shared_node_record = Arc::new(Mutex::new(local_node_record));
572
573        Self {
574            local_address,
575            local_eip_868_enr,
576            local_node_record,
577            shared_node_record,
578            _socket: socket,
579            kbuckets,
580            secret_key,
581            _tasks: tasks,
582            ingress: ingress_rx,
583            egress: egress_tx,
584            queued_pings: VecDeque::with_capacity(MAX_QUEUED_PINGS),
585            pending_pings: Default::default(),
586            pending_lookup: Default::default(),
587            pending_find_nodes: Default::default(),
588            pending_enr_requests: Default::default(),
589            commands_rx,
590            to_service,
591            update_listeners: Vec::with_capacity(1),
592            lookup_interval: self_lookup_interval,
593            ping_interval,
594            evict_expired_requests_interval,
595            lookup_rotator,
596            resolve_external_ip_interval: config.resolve_external_ip_interval(),
597            config,
598            queued_events: Default::default(),
599            received_pongs: Default::default(),
600            expire_interval: tokio::time::interval(EXPIRE_DURATION),
601        }
602    }
603
604    /// Returns the frontend handle that can communicate with the service via commands.
605    pub fn handle(&self) -> Discv4 {
606        Discv4 {
607            local_addr: self.local_address,
608            to_service: self.to_service.clone(),
609            node_record: self.shared_node_record.clone(),
610        }
611    }
612
613    /// Returns the current enr sequence of the local record.
614    fn enr_seq(&self) -> Option<u64> {
615        self.config.enable_eip868.then(|| self.local_eip_868_enr.seq())
616    }
617
618    /// Sets the [Interval] used for periodically looking up targets over the network
619    pub fn set_lookup_interval(&mut self, duration: Duration) {
620        self.lookup_interval = tokio::time::interval(duration);
621    }
622
623    /// Sets the given ip address as the node's external IP in the node record announced in
624    /// discovery
625    pub fn set_external_ip_addr(&mut self, external_ip: IpAddr) {
626        if self.local_node_record.address != external_ip {
627            debug!(target: "discv4", ?external_ip, "Updating external ip");
628            self.local_node_record.address = external_ip;
629            let _ = self.local_eip_868_enr.set_ip(external_ip, &self.secret_key);
630            let mut lock = self.shared_node_record.lock();
631            *lock = self.local_node_record;
632            debug!(target: "discv4", enr=?self.local_eip_868_enr, "Updated local ENR");
633        }
634    }
635
636    /// Returns the [`PeerId`] that identifies this node
637    pub const fn local_peer_id(&self) -> &PeerId {
638        &self.local_node_record.id
639    }
640
641    /// Returns the address of the UDP socket
642    pub const fn local_addr(&self) -> SocketAddr {
643        self.local_address
644    }
645
646    /// Returns the ENR of this service.
647    ///
648    /// Note: this will include the external address if resolved.
649    pub const fn local_enr(&self) -> NodeRecord {
650        self.local_node_record
651    }
652
653    /// Returns mutable reference to ENR for testing.
654    #[cfg(test)]
655    pub const fn local_enr_mut(&mut self) -> &mut NodeRecord {
656        &mut self.local_node_record
657    }
658
659    /// Returns true if the given `PeerId` is currently in the bucket
660    pub fn contains_node(&self, id: PeerId) -> bool {
661        let key = kad_key(id);
662        self.kbuckets.get_index(&key).is_some()
663    }
664
665    /// Bootstraps the local node to join the DHT.
666    ///
667    /// Bootstrapping is a multi-step operation that starts with a lookup of the local node's
668    /// own ID in the DHT. This introduces the local node to the other nodes
669    /// in the DHT and populates its routing table with the closest proven neighbours.
670    ///
671    /// This is similar to adding all bootnodes via [`Self::add_node`], but does not fire a
672    /// [`DiscoveryUpdate::Added`] event for the given bootnodes. So boot nodes don't appear in the
673    /// update stream, which is usually desirable, since bootnodes should not be connected to.
674    ///
675    /// If adding the configured bootnodes should result in a [`DiscoveryUpdate::Added`], see
676    /// [`Self::add_all_nodes`].
677    ///
678    /// **Note:** This is a noop if there are no bootnodes.
679    pub fn bootstrap(&mut self) {
680        for record in self.config.bootstrap_nodes.clone() {
681            debug!(target: "discv4", ?record, "pinging boot node");
682            let key = kad_key(record.id);
683            let entry = NodeEntry::new(record);
684
685            // insert the boot node in the table
686            match self.kbuckets.insert_or_update(
687                &key,
688                entry,
689                NodeStatus {
690                    state: ConnectionState::Disconnected,
691                    direction: ConnectionDirection::Outgoing,
692                },
693            ) {
694                InsertResult::Failed(_) => {}
695                _ => {
696                    self.try_ping(record, PingReason::InitialInsert);
697                }
698            }
699        }
700    }
701
702    /// Spawns this services onto a new task
703    ///
704    /// Note: requires a running tokio runtime
705    pub fn spawn(mut self) -> JoinHandle<()> {
706        tokio::task::spawn(async move {
707            self.bootstrap();
708
709            while let Some(event) = self.next().await {
710                trace!(target: "discv4", ?event, "processed");
711            }
712            trace!(target: "discv4", "service terminated");
713        })
714    }
715
716    /// Creates a new bounded channel for [`DiscoveryUpdate`]s.
717    pub fn update_stream(&mut self) -> ReceiverStream<DiscoveryUpdate> {
718        let (tx, rx) = mpsc::channel(512);
719        self.update_listeners.push(tx);
720        ReceiverStream::new(rx)
721    }
722
723    /// Looks up the local node in the DHT.
724    pub fn lookup_self(&mut self) {
725        self.lookup(self.local_node_record.id)
726    }
727
728    /// Looks up the given node in the DHT
729    ///
730    /// A `FindNode` packet requests information about nodes close to target. The target is a
731    /// 64-byte secp256k1 public key. When `FindNode` is received, the recipient should reply
732    /// with Neighbors packets containing the closest 16 nodes to target found in its local
733    /// table.
734    //
735    // To guard against traffic amplification attacks, Neighbors replies should only be sent if the
736    // sender of FindNode has been verified by the endpoint proof procedure.
737    pub fn lookup(&mut self, target: PeerId) {
738        self.lookup_with(target, None)
739    }
740
741    /// Starts the recursive lookup process for the given target, <https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup>.
742    ///
743    /// At first the `ALPHA` (==3, defined concurrency factor) nodes that are closest to the target
744    /// in the underlying DHT are selected to seed the lookup via `FindNode` requests. In the
745    /// recursive step, the initiator resends `FindNode` to nodes it has learned about from previous
746    /// queries.
747    ///
748    /// This takes an optional Sender through which all successfully discovered nodes are sent once
749    /// the request has finished.
750    fn lookup_with(&mut self, target: PeerId, tx: Option<NodeRecordSender>) {
751        trace!(target: "discv4", ?target, "Starting lookup");
752        let target_key = kad_key(target);
753
754        // Start a lookup context with the 16 (MAX_NODES_PER_BUCKET) closest nodes to which we have
755        // a valid endpoint proof
756        let ctx = LookupContext::new(
757            target_key.clone(),
758            self.kbuckets
759                .closest_values(&target_key)
760                .filter(|node| {
761                    node.value.has_endpoint_proof &&
762                        !self.pending_find_nodes.contains_key(&node.key.preimage().0)
763                })
764                .take(MAX_NODES_PER_BUCKET)
765                .map(|n| (target_key.distance(&n.key), n.value.record)),
766            tx,
767        );
768
769        // From those 16, pick the 3 closest to start the concurrent lookup.
770        let closest = ctx.closest(ALPHA);
771
772        if closest.is_empty() && self.pending_find_nodes.is_empty() {
773            // no closest nodes, and no lookup in progress: table is empty.
774            // This could happen if all records were deleted from the table due to missed pongs
775            // (e.g. connectivity problems over a long period of time, or issues during initial
776            // bootstrapping) so we attempt to bootstrap again
777            self.bootstrap();
778            return
779        }
780
781        trace!(target: "discv4", ?target, num = closest.len(), "Start lookup closest nodes");
782
783        for node in closest {
784            // here we still want to check against previous request failures and if necessary
785            // re-establish a new endpoint proof because it can be the case that the other node lost
786            // our entry and no longer has an endpoint proof on their end
787            self.find_node_checked(&node, ctx.clone());
788        }
789    }
790
791    /// Sends a new `FindNode` packet to the node with `target` as the lookup target.
792    ///
793    /// CAUTION: This expects there's a valid Endpoint proof to the given `node`.
794    fn find_node(&mut self, node: &NodeRecord, ctx: LookupContext) {
795        trace!(target: "discv4", ?node, lookup=?ctx.target(), "Sending FindNode");
796        ctx.mark_queried(node.id);
797        let id = ctx.target();
798        let msg = Message::FindNode(FindNode { id, expire: self.find_node_expiration() });
799        self.send_packet(msg, node.udp_addr());
800        self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx));
801    }
802
803    /// Sends a new `FindNode` packet to the node with `target` as the lookup target but checks
804    /// whether we should send a new ping first to renew the endpoint proof by checking the
805    /// previously failed findNode requests. It could be that the node is no longer reachable or
806    /// lost our entry.
807    fn find_node_checked(&mut self, node: &NodeRecord, ctx: LookupContext) {
808        let max_failures = self.config.max_find_node_failures;
809        let needs_ping = self
810            .on_entry(node.id, |entry| entry.exceeds_find_node_failures(max_failures))
811            .unwrap_or(true);
812        if needs_ping {
813            self.try_ping(*node, PingReason::Lookup(*node, ctx))
814        } else {
815            self.find_node(node, ctx)
816        }
817    }
818
819    /// Notifies all listeners.
820    ///
821    /// Removes all listeners that are closed.
822    fn notify(&mut self, update: DiscoveryUpdate) {
823        self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) {
824            Ok(()) => true,
825            Err(err) => match err {
826                TrySendError::Full(_) => true,
827                TrySendError::Closed(_) => false,
828            },
829        });
830    }
831
832    /// Adds the ip to the ban list indefinitely
833    pub fn ban_ip(&mut self, ip: IpAddr) {
834        self.config.ban_list.ban_ip(ip);
835    }
836
837    /// Adds the peer to the ban list indefinitely.
838    pub fn ban_node(&mut self, node_id: PeerId) {
839        self.remove_node(node_id);
840        self.config.ban_list.ban_peer(node_id);
841    }
842
843    /// Adds the ip to the ban list until the given timestamp.
844    pub fn ban_ip_until(&mut self, ip: IpAddr, until: Instant) {
845        self.config.ban_list.ban_ip_until(ip, until);
846    }
847
848    /// Adds the peer to the ban list and bans it until the given timestamp
849    pub fn ban_node_until(&mut self, node_id: PeerId, until: Instant) {
850        self.remove_node(node_id);
851        self.config.ban_list.ban_peer_until(node_id, until);
852    }
853
854    /// Removes a `node_id` from the routing table.
855    ///
856    /// This allows applications, for whatever reason, to remove nodes from the local routing
857    /// table. Returns `true` if the node was in the table and `false` otherwise.
858    pub fn remove_node(&mut self, node_id: PeerId) -> bool {
859        let key = kad_key(node_id);
860        self.remove_key(node_id, key)
861    }
862
863    /// Removes a `node_id` from the routing table but only if there are enough other nodes in the
864    /// bucket (bucket must be at least half full)
865    ///
866    /// Returns `true` if the node was removed
867    pub fn soft_remove_node(&mut self, node_id: PeerId) -> bool {
868        let key = kad_key(node_id);
869        let Some(bucket) = self.kbuckets.get_bucket(&key) else { return false };
870        if bucket.num_entries() < MAX_NODES_PER_BUCKET / 2 {
871            // skip half empty bucket
872            return false
873        }
874        self.remove_key(node_id, key)
875    }
876
877    fn remove_key(&mut self, node_id: PeerId, key: discv5::Key<NodeKey>) -> bool {
878        let removed = self.kbuckets.remove(&key);
879        if removed {
880            trace!(target: "discv4", ?node_id, "removed node");
881            self.notify(DiscoveryUpdate::Removed(node_id));
882        }
883        removed
884    }
885
886    /// Gets the number of entries that are considered connected.
887    pub fn num_connected(&self) -> usize {
888        self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected())
889    }
890
891    /// Check if the peer has an active bond.
892    fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool {
893        if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) {
894            if timestamp.elapsed() < self.config.bond_expiration {
895                return true
896            }
897        }
898        false
899    }
900
901    /// Applies a closure on the pending or present [`NodeEntry`].
902    fn on_entry<F, R>(&mut self, peer_id: PeerId, f: F) -> Option<R>
903    where
904        F: FnOnce(&NodeEntry) -> R,
905    {
906        let key = kad_key(peer_id);
907        match self.kbuckets.entry(&key) {
908            BucketEntry::Present(entry, _) => Some(f(entry.value())),
909            BucketEntry::Pending(mut entry, _) => Some(f(entry.value())),
910            _ => None,
911        }
912    }
913
914    /// Update the entry on RE-ping.
915    ///
916    /// Invoked when we received the Pong to our [`PingReason::RePing`] ping.
917    ///
918    /// On re-ping we check for a changed `enr_seq` if eip868 is enabled and when it changed we sent
919    /// a followup request to retrieve the updated ENR
920    fn update_on_reping(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
921        if record.id == self.local_node_record.id {
922            return
923        }
924
925        // If EIP868 extension is disabled then we want to ignore this
926        if !self.config.enable_eip868 {
927            last_enr_seq = None;
928        }
929
930        let key = kad_key(record.id);
931        let old_enr = match self.kbuckets.entry(&key) {
932            kbucket::Entry::Present(mut entry, _) => {
933                entry.value_mut().update_with_enr(last_enr_seq)
934            }
935            kbucket::Entry::Pending(mut entry, _) => entry.value().update_with_enr(last_enr_seq),
936            _ => return,
937        };
938
939        // Check if ENR was updated
940        match (last_enr_seq, old_enr) {
941            (Some(new), Some(old)) => {
942                if new > old {
943                    self.send_enr_request(record);
944                }
945            }
946            (Some(_), None) => {
947                // got an ENR
948                self.send_enr_request(record);
949            }
950            _ => {}
951        };
952    }
953
954    /// Callback invoked when we receive a pong from the peer.
955    fn update_on_pong(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
956        if record.id == *self.local_peer_id() {
957            return
958        }
959
960        // If EIP868 extension is disabled then we want to ignore this
961        if !self.config.enable_eip868 {
962            last_enr_seq = None;
963        }
964
965        // if the peer included a enr seq in the pong then we can try to request the ENR of that
966        // node
967        let has_enr_seq = last_enr_seq.is_some();
968
969        let key = kad_key(record.id);
970        match self.kbuckets.entry(&key) {
971            kbucket::Entry::Present(mut entry, old_status) => {
972                // endpoint is now proven
973                entry.value_mut().establish_proof();
974                entry.value_mut().update_with_enr(last_enr_seq);
975
976                if !old_status.is_connected() {
977                    let _ = entry.update(ConnectionState::Connected, Some(old_status.direction));
978                    trace!(target: "discv4", ?record, "added after successful endpoint proof");
979                    self.notify(DiscoveryUpdate::Added(record));
980
981                    if has_enr_seq {
982                        // request the ENR of the node
983                        self.send_enr_request(record);
984                    }
985                }
986            }
987            kbucket::Entry::Pending(mut entry, mut status) => {
988                // endpoint is now proven
989                entry.value().establish_proof();
990                entry.value().update_with_enr(last_enr_seq);
991
992                if !status.is_connected() {
993                    status.state = ConnectionState::Connected;
994                    let _ = entry.update(status);
995                    trace!(target: "discv4", ?record, "added after successful endpoint proof");
996                    self.notify(DiscoveryUpdate::Added(record));
997
998                    if has_enr_seq {
999                        // request the ENR of the node
1000                        self.send_enr_request(record);
1001                    }
1002                }
1003            }
1004            _ => {}
1005        };
1006    }
1007
1008    /// Adds all nodes
1009    ///
1010    /// See [`Self::add_node`]
1011    pub fn add_all_nodes(&mut self, records: impl IntoIterator<Item = NodeRecord>) {
1012        for record in records {
1013            self.add_node(record);
1014        }
1015    }
1016
1017    /// If the node's not in the table yet, this will add it to the table and start the endpoint
1018    /// proof by sending a ping to the node.
1019    ///
1020    /// Returns `true` if the record was added successfully, and `false` if the node is either
1021    /// already in the table or the record's bucket is full.
1022    pub fn add_node(&mut self, record: NodeRecord) -> bool {
1023        let key = kad_key(record.id);
1024        match self.kbuckets.entry(&key) {
1025            kbucket::Entry::Absent(entry) => {
1026                let node = NodeEntry::new(record);
1027                match entry.insert(
1028                    node,
1029                    NodeStatus {
1030                        direction: ConnectionDirection::Outgoing,
1031                        state: ConnectionState::Disconnected,
1032                    },
1033                ) {
1034                    BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1035                        trace!(target: "discv4", ?record, "inserted new record");
1036                    }
1037                    _ => return false,
1038                }
1039            }
1040            _ => return false,
1041        }
1042
1043        // send the initial ping to the _new_ node
1044        self.try_ping(record, PingReason::InitialInsert);
1045        true
1046    }
1047
1048    /// Encodes the packet, sends it and returns the hash.
1049    pub(crate) fn send_packet(&self, msg: Message, to: SocketAddr) -> B256 {
1050        let (payload, hash) = msg.encode(&self.secret_key);
1051        trace!(target: "discv4", r#type=?msg.msg_type(), ?to, ?hash, "sending packet");
1052        let _ = self.egress.try_send((payload, to)).map_err(|err| {
1053            debug!(
1054                target: "discv4",
1055                %err,
1056                "dropped outgoing packet",
1057            );
1058        });
1059        hash
1060    }
1061
1062    /// Message handler for an incoming `Ping`
1063    fn on_ping(&mut self, ping: Ping, remote_addr: SocketAddr, remote_id: PeerId, hash: B256) {
1064        if self.is_expired(ping.expire) {
1065            // ping's expiration timestamp is in the past
1066            return
1067        }
1068
1069        // create the record
1070        let record = NodeRecord {
1071            address: remote_addr.ip(),
1072            udp_port: remote_addr.port(),
1073            tcp_port: ping.from.tcp_port,
1074            id: remote_id,
1075        }
1076        .into_ipv4_mapped();
1077
1078        let key = kad_key(record.id);
1079
1080        // See also <https://github.com/ethereum/devp2p/blob/master/discv4.md#ping-packet-0x01>:
1081        // > If no communication with the sender of this ping has occurred within the last 12h, a
1082        // > ping should be sent in addition to pong in order to receive an endpoint proof.
1083        //
1084        // Note: we only mark if the node is absent because the `last 12h` condition is handled by
1085        // the ping interval
1086        let mut is_new_insert = false;
1087        let mut needs_bond = false;
1088        let mut is_proven = false;
1089
1090        let old_enr = match self.kbuckets.entry(&key) {
1091            kbucket::Entry::Present(mut entry, _) => {
1092                if entry.value().is_expired() {
1093                    // If no communication with the sender has occurred within the last 12h, a ping
1094                    // should be sent in addition to pong in order to receive an endpoint proof.
1095                    needs_bond = true;
1096                } else {
1097                    is_proven = entry.value().has_endpoint_proof;
1098                }
1099                entry.value_mut().update_with_enr(ping.enr_sq)
1100            }
1101            kbucket::Entry::Pending(mut entry, _) => {
1102                if entry.value().is_expired() {
1103                    // If no communication with the sender has occurred within the last 12h, a ping
1104                    // should be sent in addition to pong in order to receive an endpoint proof.
1105                    needs_bond = true;
1106                } else {
1107                    is_proven = entry.value().has_endpoint_proof;
1108                }
1109                entry.value().update_with_enr(ping.enr_sq)
1110            }
1111            kbucket::Entry::Absent(entry) => {
1112                let mut node = NodeEntry::new(record);
1113                node.last_enr_seq = ping.enr_sq;
1114
1115                match entry.insert(
1116                    node,
1117                    NodeStatus {
1118                        direction: ConnectionDirection::Incoming,
1119                        // mark as disconnected until endpoint proof established on pong
1120                        state: ConnectionState::Disconnected,
1121                    },
1122                ) {
1123                    BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1124                        // mark as new insert if insert was successful
1125                        is_new_insert = true;
1126                    }
1127                    BucketInsertResult::Full => {
1128                        // we received a ping but the corresponding bucket for the peer is already
1129                        // full, we can't add any additional peers to that bucket, but we still want
1130                        // to emit an event that we discovered the node
1131                        trace!(target: "discv4", ?record, "discovered new record but bucket is full");
1132                        self.notify(DiscoveryUpdate::DiscoveredAtCapacity(record));
1133                        needs_bond = true;
1134                    }
1135                    BucketInsertResult::TooManyIncoming | BucketInsertResult::NodeExists => {
1136                        needs_bond = true;
1137                        // insert unsuccessful but we still want to send the pong
1138                    }
1139                    BucketInsertResult::FailedFilter => return,
1140                }
1141
1142                None
1143            }
1144            kbucket::Entry::SelfEntry => return,
1145        };
1146
1147        // send the pong first, but the PONG and optionally PING don't need to be send in a
1148        // particular order
1149        let pong = Message::Pong(Pong {
1150            // we use the actual address of the peer
1151            to: record.into(),
1152            echo: hash,
1153            expire: ping.expire,
1154            enr_sq: self.enr_seq(),
1155        });
1156        self.send_packet(pong, remote_addr);
1157
1158        // if node was absent also send a ping to establish the endpoint proof from our end
1159        if is_new_insert {
1160            self.try_ping(record, PingReason::InitialInsert);
1161        } else if needs_bond {
1162            self.try_ping(record, PingReason::EstablishBond);
1163        } else if is_proven {
1164            // if node has been proven, this means we've received a pong and verified its endpoint
1165            // proof. We've also sent a pong above to verify our endpoint proof, so we can now
1166            // send our find_nodes request if PingReason::Lookup
1167            if let Some((_, ctx)) = self.pending_lookup.remove(&record.id) {
1168                if self.pending_find_nodes.contains_key(&record.id) {
1169                    // there's already another pending request, unmark it so the next round can
1170                    // try to send it
1171                    ctx.unmark_queried(record.id);
1172                } else {
1173                    // we just received a ping from that peer so we can send a find node request
1174                    // directly
1175                    self.find_node(&record, ctx);
1176                }
1177            }
1178        } else {
1179            // Request ENR if included in the ping
1180            match (ping.enr_sq, old_enr) {
1181                (Some(new), Some(old)) => {
1182                    if new > old {
1183                        self.send_enr_request(record);
1184                    }
1185                }
1186                (Some(_), None) => {
1187                    self.send_enr_request(record);
1188                }
1189                _ => {}
1190            };
1191        }
1192    }
1193
1194    // Guarding function for [`Self::send_ping`] that applies pre-checks
1195    fn try_ping(&mut self, node: NodeRecord, reason: PingReason) {
1196        if node.id == *self.local_peer_id() {
1197            // don't ping ourselves
1198            return
1199        }
1200
1201        if self.pending_pings.contains_key(&node.id) ||
1202            self.pending_find_nodes.contains_key(&node.id)
1203        {
1204            return
1205        }
1206
1207        if self.queued_pings.iter().any(|(n, _)| n.id == node.id) {
1208            return
1209        }
1210
1211        if self.pending_pings.len() < MAX_NODES_PING {
1212            self.send_ping(node, reason);
1213        } else if self.queued_pings.len() < MAX_QUEUED_PINGS {
1214            self.queued_pings.push_back((node, reason));
1215        }
1216    }
1217
1218    /// Sends a ping message to the node's UDP address.
1219    ///
1220    /// Returns the echo hash of the ping message.
1221    pub(crate) fn send_ping(&mut self, node: NodeRecord, reason: PingReason) -> B256 {
1222        let remote_addr = node.udp_addr();
1223        let id = node.id;
1224        let ping = Ping {
1225            from: self.local_node_record.into(),
1226            to: node.into(),
1227            expire: self.ping_expiration(),
1228            enr_sq: self.enr_seq(),
1229        };
1230        trace!(target: "discv4", ?ping, "sending ping");
1231        let echo_hash = self.send_packet(Message::Ping(ping), remote_addr);
1232
1233        self.pending_pings
1234            .insert(id, PingRequest { sent_at: Instant::now(), node, echo_hash, reason });
1235        echo_hash
1236    }
1237
1238    /// Sends an enr request message to the node's UDP address.
1239    ///
1240    /// Returns the echo hash of the ping message.
1241    pub(crate) fn send_enr_request(&mut self, node: NodeRecord) {
1242        if !self.config.enable_eip868 {
1243            return
1244        }
1245        let remote_addr = node.udp_addr();
1246        let enr_request = EnrRequest { expire: self.enr_request_expiration() };
1247
1248        trace!(target: "discv4", ?enr_request, "sending enr request");
1249        let echo_hash = self.send_packet(Message::EnrRequest(enr_request), remote_addr);
1250
1251        self.pending_enr_requests
1252            .insert(node.id, EnrRequestState { sent_at: Instant::now(), echo_hash });
1253    }
1254
1255    /// Message handler for an incoming `Pong`.
1256    fn on_pong(&mut self, pong: Pong, remote_addr: SocketAddr, remote_id: PeerId) {
1257        if self.is_expired(pong.expire) {
1258            return
1259        }
1260
1261        let PingRequest { node, reason, .. } = match self.pending_pings.entry(remote_id) {
1262            Entry::Occupied(entry) => {
1263                {
1264                    let request = entry.get();
1265                    if request.echo_hash != pong.echo {
1266                        trace!(target: "discv4", from=?remote_addr, expected=?request.echo_hash, echo_hash=?pong.echo,"Got unexpected Pong");
1267                        return
1268                    }
1269                }
1270                entry.remove()
1271            }
1272            Entry::Vacant(_) => return,
1273        };
1274
1275        // keep track of the pong
1276        self.received_pongs.on_pong(remote_id, remote_addr.ip());
1277
1278        match reason {
1279            PingReason::InitialInsert => {
1280                self.update_on_pong(node, pong.enr_sq);
1281            }
1282            PingReason::EstablishBond => {
1283                // same as `InitialInsert` which renews the bond if the peer is in the table
1284                self.update_on_pong(node, pong.enr_sq);
1285            }
1286            PingReason::RePing => {
1287                self.update_on_reping(node, pong.enr_sq);
1288            }
1289            PingReason::Lookup(node, ctx) => {
1290                self.update_on_pong(node, pong.enr_sq);
1291                // insert node and assoc. lookup_context into the pending_lookup table to complete
1292                // our side of the endpoint proof verification.
1293                // Start the lookup timer here - and evict accordingly. Note that this is a separate
1294                // timer than the ping_request timer.
1295                self.pending_lookup.insert(node.id, (Instant::now(), ctx));
1296            }
1297        }
1298    }
1299
1300    /// Handler for an incoming `FindNode` message
1301    fn on_find_node(&mut self, msg: FindNode, remote_addr: SocketAddr, node_id: PeerId) {
1302        if self.is_expired(msg.expire) {
1303            // expiration timestamp is in the past
1304            return
1305        }
1306        if node_id == *self.local_peer_id() {
1307            // ignore find node requests to ourselves
1308            return
1309        }
1310
1311        if self.has_bond(node_id, remote_addr.ip()) {
1312            self.respond_closest(msg.id, remote_addr)
1313        }
1314    }
1315
1316    /// Handler for incoming `EnrResponse` message
1317    fn on_enr_response(&mut self, msg: EnrResponse, remote_addr: SocketAddr, id: PeerId) {
1318        trace!(target: "discv4", ?remote_addr, ?msg, "received ENR response");
1319        if let Some(resp) = self.pending_enr_requests.remove(&id) {
1320            // ensure the ENR's public key matches the expected node id
1321            let enr_id = pk2id(&msg.enr.public_key());
1322            if id != enr_id {
1323                return
1324            }
1325
1326            if resp.echo_hash == msg.request_hash {
1327                let key = kad_key(id);
1328                let fork_id = msg.eth_fork_id();
1329                let (record, old_fork_id) = match self.kbuckets.entry(&key) {
1330                    kbucket::Entry::Present(mut entry, _) => {
1331                        let id = entry.value_mut().update_with_fork_id(fork_id);
1332                        (entry.value().record, id)
1333                    }
1334                    kbucket::Entry::Pending(mut entry, _) => {
1335                        let id = entry.value().update_with_fork_id(fork_id);
1336                        (entry.value().record, id)
1337                    }
1338                    _ => return,
1339                };
1340                match (fork_id, old_fork_id) {
1341                    (Some(new), Some(old)) => {
1342                        if new != old {
1343                            self.notify(DiscoveryUpdate::EnrForkId(record, new))
1344                        }
1345                    }
1346                    (Some(new), None) => self.notify(DiscoveryUpdate::EnrForkId(record, new)),
1347                    _ => {}
1348                }
1349            }
1350        }
1351    }
1352
1353    /// Handler for incoming `EnrRequest` message
1354    fn on_enr_request(
1355        &self,
1356        msg: EnrRequest,
1357        remote_addr: SocketAddr,
1358        id: PeerId,
1359        request_hash: B256,
1360    ) {
1361        if !self.config.enable_eip868 || self.is_expired(msg.expire) {
1362            return
1363        }
1364
1365        if self.has_bond(id, remote_addr.ip()) {
1366            self.send_packet(
1367                Message::EnrResponse(EnrResponse {
1368                    request_hash,
1369                    enr: self.local_eip_868_enr.clone(),
1370                }),
1371                remote_addr,
1372            );
1373        }
1374    }
1375
1376    /// Handler for incoming `Neighbours` messages that are handled if they're responses to
1377    /// `FindNode` requests.
1378    fn on_neighbours(&mut self, msg: Neighbours, remote_addr: SocketAddr, node_id: PeerId) {
1379        if self.is_expired(msg.expire) {
1380            // response is expired
1381            return
1382        }
1383        // check if this request was expected
1384        let ctx = match self.pending_find_nodes.entry(node_id) {
1385            Entry::Occupied(mut entry) => {
1386                {
1387                    let request = entry.get_mut();
1388                    // Mark the request as answered
1389                    request.answered = true;
1390                    let total = request.response_count + msg.nodes.len();
1391
1392                    // Neighbours response is exactly 1 bucket (16 entries).
1393                    if total <= MAX_NODES_PER_BUCKET {
1394                        request.response_count = total;
1395                    } else {
1396                        trace!(target: "discv4", total, from=?remote_addr, "Received neighbors packet entries exceeds max nodes per bucket");
1397                        return
1398                    }
1399                };
1400
1401                if entry.get().response_count == MAX_NODES_PER_BUCKET {
1402                    // node responding with a full bucket of records
1403                    let ctx = entry.remove().lookup_context;
1404                    ctx.mark_responded(node_id);
1405                    ctx
1406                } else {
1407                    entry.get().lookup_context.clone()
1408                }
1409            }
1410            Entry::Vacant(_) => {
1411                // received neighbours response without requesting it
1412                trace!(target: "discv4", from=?remote_addr, "Received unsolicited Neighbours");
1413                return
1414            }
1415        };
1416
1417        // log the peers we discovered
1418        trace!(target: "discv4",
1419            target=format!("{:#?}", node_id),
1420            peers_count=msg.nodes.len(),
1421            peers=format!("[{:#}]", msg.nodes.iter()
1422                .map(|node_rec| node_rec.id
1423            ).format(", ")),
1424            "Received peers from Neighbours packet"
1425        );
1426
1427        // This is the recursive lookup step where we initiate new FindNode requests for new nodes
1428        // that were discovered.
1429        for node in msg.nodes.into_iter().map(NodeRecord::into_ipv4_mapped) {
1430            // prevent banned peers from being added to the context
1431            if self.config.ban_list.is_banned(&node.id, &node.address) {
1432                trace!(target: "discv4", peer_id=?node.id, ip=?node.address, "ignoring banned record");
1433                continue
1434            }
1435
1436            ctx.add_node(node);
1437        }
1438
1439        // get the next closest nodes, not yet queried nodes and start over.
1440        let closest =
1441            ctx.filter_closest(ALPHA, |node| !self.pending_find_nodes.contains_key(&node.id));
1442
1443        for closest in closest {
1444            let key = kad_key(closest.id);
1445            match self.kbuckets.entry(&key) {
1446                BucketEntry::Absent(entry) => {
1447                    // the node's endpoint is not proven yet, so we need to ping it first, on
1448                    // success, we will add the node to the pending_lookup table, and wait to send
1449                    // back a Pong before initiating a FindNode request.
1450                    // In order to prevent that this node is selected again on subsequent responses,
1451                    // while the ping is still active, we always mark it as queried.
1452                    ctx.mark_queried(closest.id);
1453                    let node = NodeEntry::new(closest);
1454                    match entry.insert(
1455                        node,
1456                        NodeStatus {
1457                            direction: ConnectionDirection::Outgoing,
1458                            state: ConnectionState::Disconnected,
1459                        },
1460                    ) {
1461                        BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1462                            // only ping if the node was added to the table
1463                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1464                        }
1465                        BucketInsertResult::Full => {
1466                            // new node but the node's bucket is already full
1467                            self.notify(DiscoveryUpdate::DiscoveredAtCapacity(closest))
1468                        }
1469                        _ => {}
1470                    }
1471                }
1472                BucketEntry::SelfEntry => {
1473                    // we received our own node entry
1474                }
1475                BucketEntry::Present(entry, _) => {
1476                    if entry.value().has_endpoint_proof {
1477                        if entry
1478                            .value()
1479                            .exceeds_find_node_failures(self.config.max_find_node_failures)
1480                        {
1481                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1482                        } else {
1483                            self.find_node(&closest, ctx.clone());
1484                        }
1485                    }
1486                }
1487                BucketEntry::Pending(mut entry, _) => {
1488                    if entry.value().has_endpoint_proof {
1489                        if entry
1490                            .value()
1491                            .exceeds_find_node_failures(self.config.max_find_node_failures)
1492                        {
1493                            self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1494                        } else {
1495                            self.find_node(&closest, ctx.clone());
1496                        }
1497                    }
1498                }
1499            }
1500        }
1501    }
1502
1503    /// Sends a Neighbours packet for `target` to the given addr
1504    fn respond_closest(&mut self, target: PeerId, to: SocketAddr) {
1505        let key = kad_key(target);
1506        let expire = self.send_neighbours_expiration();
1507
1508        // get the MAX_NODES_PER_BUCKET closest nodes to the target
1509        let closest_nodes =
1510            self.kbuckets.closest_values(&key).take(MAX_NODES_PER_BUCKET).collect::<Vec<_>>();
1511
1512        for nodes in closest_nodes.chunks(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) {
1513            let nodes = nodes.iter().map(|node| node.value.record).collect::<Vec<NodeRecord>>();
1514            trace!(target: "discv4", len = nodes.len(), to=?to,"Sent neighbours packet");
1515            let msg = Message::Neighbours(Neighbours { nodes, expire });
1516            self.send_packet(msg, to);
1517        }
1518    }
1519
1520    fn evict_expired_requests(&mut self, now: Instant) {
1521        self.pending_enr_requests.retain(|_node_id, enr_request| {
1522            now.duration_since(enr_request.sent_at) < self.config.enr_expiration
1523        });
1524
1525        let mut failed_pings = Vec::new();
1526        self.pending_pings.retain(|node_id, ping_request| {
1527            if now.duration_since(ping_request.sent_at) > self.config.ping_expiration {
1528                failed_pings.push(*node_id);
1529                return false
1530            }
1531            true
1532        });
1533
1534        if !failed_pings.is_empty() {
1535            // remove nodes that failed to pong
1536            trace!(target: "discv4", num=%failed_pings.len(), "evicting nodes due to failed pong");
1537            for node_id in failed_pings {
1538                self.remove_node(node_id);
1539            }
1540        }
1541
1542        let mut failed_lookups = Vec::new();
1543        self.pending_lookup.retain(|node_id, (lookup_sent_at, _)| {
1544            if now.duration_since(*lookup_sent_at) > self.config.request_timeout {
1545                failed_lookups.push(*node_id);
1546                return false
1547            }
1548            true
1549        });
1550
1551        if !failed_lookups.is_empty() {
1552            // remove nodes that failed the e2e lookup process, so we can restart it
1553            trace!(target: "discv4", num=%failed_lookups.len(), "evicting nodes due to failed lookup");
1554            for node_id in failed_lookups {
1555                self.remove_node(node_id);
1556            }
1557        }
1558
1559        self.evict_failed_find_nodes(now);
1560    }
1561
1562    /// Handles failed responses to `FindNode`
1563    fn evict_failed_find_nodes(&mut self, now: Instant) {
1564        let mut failed_find_nodes = Vec::new();
1565        self.pending_find_nodes.retain(|node_id, find_node_request| {
1566            if now.duration_since(find_node_request.sent_at) > self.config.neighbours_expiration {
1567                if !find_node_request.answered {
1568                    // node actually responded but with fewer entries than expected, but we don't
1569                    // treat this as an hard error since it responded.
1570                    failed_find_nodes.push(*node_id);
1571                }
1572                return false
1573            }
1574            true
1575        });
1576
1577        if failed_find_nodes.is_empty() {
1578            return
1579        }
1580
1581        trace!(target: "discv4", num=%failed_find_nodes.len(), "processing failed find nodes");
1582
1583        for node_id in failed_find_nodes {
1584            let key = kad_key(node_id);
1585            let failures = match self.kbuckets.entry(&key) {
1586                kbucket::Entry::Present(mut entry, _) => {
1587                    entry.value_mut().inc_failed_request();
1588                    entry.value().find_node_failures
1589                }
1590                kbucket::Entry::Pending(mut entry, _) => {
1591                    entry.value().inc_failed_request();
1592                    entry.value().find_node_failures
1593                }
1594                _ => continue,
1595            };
1596
1597            // if the node failed to respond anything useful multiple times, remove the node from
1598            // the table, but only if there are enough other nodes in the bucket (bucket must be at
1599            // least half full)
1600            if failures > self.config.max_find_node_failures {
1601                self.soft_remove_node(node_id);
1602            }
1603        }
1604    }
1605
1606    /// Re-pings all nodes which endpoint proofs are considered expired: [`NodeEntry::is_expired`]
1607    ///
1608    /// This will send a `Ping` to the nodes, if a node fails to respond with a `Pong` to renew the
1609    /// endpoint proof it will be removed from the table.
1610    fn re_ping_oldest(&mut self) {
1611        let mut nodes = self
1612            .kbuckets
1613            .iter_ref()
1614            .filter(|entry| entry.node.value.is_expired())
1615            .map(|n| n.node.value)
1616            .collect::<Vec<_>>();
1617        nodes.sort_by(|a, b| a.last_seen.cmp(&b.last_seen));
1618        let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::<Vec<_>>();
1619        for node in to_ping {
1620            self.try_ping(node, PingReason::RePing)
1621        }
1622    }
1623
1624    /// Returns true if the expiration timestamp is in the past.
1625    fn is_expired(&self, expiration: u64) -> bool {
1626        self.ensure_not_expired(expiration).is_err()
1627    }
1628
1629    /// Validate that given timestamp is not expired.
1630    ///
1631    /// Note: this accepts the timestamp as u64 because this is used by the wire protocol, but the
1632    /// UNIX timestamp (number of non-leap seconds since January 1, 1970 0:00:00 UTC) is supposed to
1633    /// be an i64.
1634    ///
1635    /// Returns an error if:
1636    ///  - invalid UNIX timestamp (larger than `i64::MAX`)
1637    ///  - timestamp is expired (lower than current local UNIX timestamp)
1638    fn ensure_not_expired(&self, timestamp: u64) -> Result<(), ()> {
1639        // ensure the timestamp is a valid UNIX timestamp
1640        let _ = i64::try_from(timestamp).map_err(drop)?;
1641
1642        let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
1643        if self.config.enforce_expiration_timestamps && timestamp < now {
1644            trace!(target: "discv4", "Expired packet");
1645            return Err(())
1646        }
1647        Ok(())
1648    }
1649
1650    /// Pops buffered ping requests and sends them.
1651    fn ping_buffered(&mut self) {
1652        while self.pending_pings.len() < MAX_NODES_PING {
1653            match self.queued_pings.pop_front() {
1654                Some((next, reason)) => self.try_ping(next, reason),
1655                None => break,
1656            }
1657        }
1658    }
1659
1660    fn ping_expiration(&self) -> u64 {
1661        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.ping_expiration)
1662            .as_secs()
1663    }
1664
1665    fn find_node_expiration(&self) -> u64 {
1666        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.request_timeout)
1667            .as_secs()
1668    }
1669
1670    fn enr_request_expiration(&self) -> u64 {
1671        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.enr_expiration)
1672            .as_secs()
1673    }
1674
1675    fn send_neighbours_expiration(&self) -> u64 {
1676        (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.neighbours_expiration)
1677            .as_secs()
1678    }
1679
1680    /// Polls the socket and advances the state.
1681    ///
1682    /// To prevent traffic amplification attacks, implementations must verify that the sender of a
1683    /// query participates in the discovery protocol. The sender of a packet is considered verified
1684    /// if it has sent a valid Pong response with matching ping hash within the last 12 hours.
1685    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Discv4Event> {
1686        loop {
1687            // drain buffered events first
1688            if let Some(event) = self.queued_events.pop_front() {
1689                return Poll::Ready(event)
1690            }
1691
1692            // trigger self lookup
1693            if self.config.enable_lookup {
1694                while self.lookup_interval.poll_tick(cx).is_ready() {
1695                    let target = self.lookup_rotator.next(&self.local_node_record.id);
1696                    self.lookup_with(target, None);
1697                }
1698            }
1699
1700            // re-ping some peers
1701            while self.ping_interval.poll_tick(cx).is_ready() {
1702                self.re_ping_oldest();
1703            }
1704
1705            if let Some(Poll::Ready(Some(ip))) =
1706                self.resolve_external_ip_interval.as_mut().map(|r| r.poll_tick(cx))
1707            {
1708                self.set_external_ip_addr(ip);
1709            }
1710
1711            // drain all incoming `Discv4` commands, this channel can never close
1712            while let Poll::Ready(Some(cmd)) = self.commands_rx.poll_recv(cx) {
1713                match cmd {
1714                    Discv4Command::Add(enr) => {
1715                        self.add_node(enr);
1716                    }
1717                    Discv4Command::Lookup { node_id, tx } => {
1718                        let node_id = node_id.unwrap_or(self.local_node_record.id);
1719                        self.lookup_with(node_id, tx);
1720                    }
1721                    Discv4Command::SetLookupInterval(duration) => {
1722                        self.set_lookup_interval(duration);
1723                    }
1724                    Discv4Command::Updates(tx) => {
1725                        let rx = self.update_stream();
1726                        let _ = tx.send(rx);
1727                    }
1728                    Discv4Command::BanPeer(node_id) => self.ban_node(node_id),
1729                    Discv4Command::Remove(node_id) => {
1730                        self.remove_node(node_id);
1731                    }
1732                    Discv4Command::Ban(node_id, ip) => {
1733                        self.ban_node(node_id);
1734                        self.ban_ip(ip);
1735                    }
1736                    Discv4Command::BanIp(ip) => {
1737                        self.ban_ip(ip);
1738                    }
1739                    Discv4Command::SetEIP868RLPPair { key, rlp } => {
1740                        debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair");
1741
1742                        let _ = self.local_eip_868_enr.insert_raw_rlp(key, rlp, &self.secret_key);
1743                    }
1744                    Discv4Command::SetTcpPort(port) => {
1745                        debug!(target: "discv4", %port, "Update tcp port");
1746                        self.local_node_record.tcp_port = port;
1747                        if self.local_node_record.address.is_ipv4() {
1748                            let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key);
1749                        } else {
1750                            let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key);
1751                        }
1752                    }
1753
1754                    Discv4Command::Terminated => {
1755                        // terminate the service
1756                        self.queued_events.push_back(Discv4Event::Terminated);
1757                    }
1758                }
1759            }
1760
1761            // restricts how many messages we process in a single poll before yielding back control
1762            let mut udp_message_budget = UDP_MESSAGE_POLL_LOOP_BUDGET;
1763
1764            // process all incoming datagrams
1765            while let Poll::Ready(Some(event)) = self.ingress.poll_recv(cx) {
1766                match event {
1767                    IngressEvent::RecvError(err) => {
1768                        debug!(target: "discv4", %err, "failed to read datagram");
1769                    }
1770                    IngressEvent::BadPacket(from, err, data) => {
1771                        trace!(target: "discv4", ?from, %err, packet=?hex::encode(&data), "bad packet");
1772                    }
1773                    IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => {
1774                        trace!(target: "discv4", r#type=?msg.msg_type(), from=?remote_addr,"received packet");
1775                        let event = match msg {
1776                            Message::Ping(ping) => {
1777                                self.on_ping(ping, remote_addr, node_id, hash);
1778                                Discv4Event::Ping
1779                            }
1780                            Message::Pong(pong) => {
1781                                self.on_pong(pong, remote_addr, node_id);
1782                                Discv4Event::Pong
1783                            }
1784                            Message::FindNode(msg) => {
1785                                self.on_find_node(msg, remote_addr, node_id);
1786                                Discv4Event::FindNode
1787                            }
1788                            Message::Neighbours(msg) => {
1789                                self.on_neighbours(msg, remote_addr, node_id);
1790                                Discv4Event::Neighbours
1791                            }
1792                            Message::EnrRequest(msg) => {
1793                                self.on_enr_request(msg, remote_addr, node_id, hash);
1794                                Discv4Event::EnrRequest
1795                            }
1796                            Message::EnrResponse(msg) => {
1797                                self.on_enr_response(msg, remote_addr, node_id);
1798                                Discv4Event::EnrResponse
1799                            }
1800                        };
1801
1802                        self.queued_events.push_back(event);
1803                    }
1804                }
1805
1806                udp_message_budget -= 1;
1807                if udp_message_budget < 0 {
1808                    trace!(target: "discv4", budget=UDP_MESSAGE_POLL_LOOP_BUDGET, "exhausted message poll budget");
1809                    if self.queued_events.is_empty() {
1810                        // we've exceeded the message budget and have no events to process
1811                        // this will make sure we're woken up again
1812                        cx.waker().wake_by_ref();
1813                    }
1814                    break
1815                }
1816            }
1817
1818            // try resending buffered pings
1819            self.ping_buffered();
1820
1821            // evict expired requests
1822            while self.evict_expired_requests_interval.poll_tick(cx).is_ready() {
1823                self.evict_expired_requests(Instant::now());
1824            }
1825
1826            // evict expired nodes
1827            while self.expire_interval.poll_tick(cx).is_ready() {
1828                self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION);
1829            }
1830
1831            if self.queued_events.is_empty() {
1832                return Poll::Pending
1833            }
1834        }
1835    }
1836}
1837
1838/// Endless future impl
1839impl Stream for Discv4Service {
1840    type Item = Discv4Event;
1841
1842    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1843        // Poll the internal poll method
1844        match ready!(self.get_mut().poll(cx)) {
1845            // if the service is terminated, return None to terminate the stream
1846            Discv4Event::Terminated => Poll::Ready(None),
1847            // For any other event, return Poll::Ready(Some(event))
1848            ev => Poll::Ready(Some(ev)),
1849        }
1850    }
1851}
1852
1853impl fmt::Debug for Discv4Service {
1854    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1855        f.debug_struct("Discv4Service")
1856            .field("local_address", &self.local_address)
1857            .field("local_peer_id", &self.local_peer_id())
1858            .field("local_node_record", &self.local_node_record)
1859            .field("queued_pings", &self.queued_pings)
1860            .field("pending_lookup", &self.pending_lookup)
1861            .field("pending_find_nodes", &self.pending_find_nodes)
1862            .field("lookup_interval", &self.lookup_interval)
1863            .finish_non_exhaustive()
1864    }
1865}
1866
1867/// The Event type the Service stream produces.
1868///
1869/// This is mainly used for testing purposes and represents messages the service processed
1870#[derive(Debug, Eq, PartialEq)]
1871pub enum Discv4Event {
1872    /// A `Ping` message was handled.
1873    Ping,
1874    /// A `Pong` message was handled.
1875    Pong,
1876    /// A `FindNode` message was handled.
1877    FindNode,
1878    /// A `Neighbours` message was handled.
1879    Neighbours,
1880    /// A `EnrRequest` message was handled.
1881    EnrRequest,
1882    /// A `EnrResponse` message was handled.
1883    EnrResponse,
1884    /// Service is being terminated
1885    Terminated,
1886}
1887
1888/// Continuously reads new messages from the channel and writes them to the socket
1889pub(crate) async fn send_loop(udp: Arc<UdpSocket>, rx: EgressReceiver) {
1890    let mut stream = ReceiverStream::new(rx);
1891    while let Some((payload, to)) = stream.next().await {
1892        match udp.send_to(&payload, to).await {
1893            Ok(size) => {
1894                trace!(target: "discv4", ?to, ?size,"sent payload");
1895            }
1896            Err(err) => {
1897                debug!(target: "discv4", ?to, %err,"Failed to send datagram.");
1898            }
1899        }
1900    }
1901}
1902
1903/// Rate limits the number of incoming packets from individual IPs to 1 packet/second
1904const MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP: usize = 60usize;
1905
1906/// Continuously awaits new incoming messages and sends them back through the channel.
1907///
1908/// The receive loop enforce primitive rate limiting for ips to prevent message spams from
1909/// individual IPs
1910pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_id: PeerId) {
1911    let send = |event: IngressEvent| async {
1912        let _ = tx.send(event).await.map_err(|err| {
1913            debug!(
1914                target: "discv4",
1915                 %err,
1916                "failed send incoming packet",
1917            )
1918        });
1919    };
1920
1921    let mut cache = ReceiveCache::default();
1922
1923    // tick at half the rate of the limit
1924    let tick = MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP / 2;
1925    let mut interval = tokio::time::interval(Duration::from_secs(tick as u64));
1926
1927    let mut buf = [0; MAX_PACKET_SIZE];
1928    loop {
1929        let res = udp.recv_from(&mut buf).await;
1930        match res {
1931            Err(err) => {
1932                debug!(target: "discv4", %err, "Failed to read datagram.");
1933                send(IngressEvent::RecvError(err)).await;
1934            }
1935            Ok((read, remote_addr)) => {
1936                // rate limit incoming packets by IP
1937                if cache.inc_ip(remote_addr.ip()) > MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP {
1938                    trace!(target: "discv4", ?remote_addr, "Too many incoming packets from IP.");
1939                    continue
1940                }
1941
1942                let packet = &buf[..read];
1943                match Message::decode(packet) {
1944                    Ok(packet) => {
1945                        if packet.node_id == local_id {
1946                            // received our own message
1947                            debug!(target: "discv4", ?remote_addr, "Received own packet.");
1948                            continue
1949                        }
1950
1951                        // skip if we've already received the same packet
1952                        if cache.contains_packet(packet.hash) {
1953                            debug!(target: "discv4", ?remote_addr, "Received duplicate packet.");
1954                            continue
1955                        }
1956
1957                        send(IngressEvent::Packet(remote_addr, packet)).await;
1958                    }
1959                    Err(err) => {
1960                        trace!(target: "discv4", %err,"Failed to decode packet");
1961                        send(IngressEvent::BadPacket(remote_addr, err, packet.to_vec())).await
1962                    }
1963                }
1964            }
1965        }
1966
1967        // reset the tracked ips if the interval has passed
1968        if poll_fn(|cx| match interval.poll_tick(cx) {
1969            Poll::Ready(_) => Poll::Ready(true),
1970            Poll::Pending => Poll::Ready(false),
1971        })
1972        .await
1973        {
1974            cache.tick_ips(tick);
1975        }
1976    }
1977}
1978
1979/// A cache for received packets and their source address.
1980///
1981/// This is used to discard duplicated packets and rate limit messages from the same source.
1982struct ReceiveCache {
1983    /// keeps track of how many messages we've received from a given IP address since the last
1984    /// tick.
1985    ///
1986    /// This is used to count the number of messages received from a given IP address within an
1987    /// interval.
1988    ip_messages: HashMap<IpAddr, usize>,
1989    // keeps track of unique packet hashes
1990    unique_packets: schnellru::LruMap<B256, ()>,
1991}
1992
1993impl ReceiveCache {
1994    /// Updates the counter for each IP address and removes IPs that have exceeded the limit.
1995    ///
1996    /// This will decrement the counter for each IP address and remove IPs that have reached 0.
1997    fn tick_ips(&mut self, tick: usize) {
1998        self.ip_messages.retain(|_, count| {
1999            if let Some(reset) = count.checked_sub(tick) {
2000                *count = reset;
2001                true
2002            } else {
2003                false
2004            }
2005        });
2006    }
2007
2008    /// Increases the counter for the given IP address and returns the new count.
2009    fn inc_ip(&mut self, ip: IpAddr) -> usize {
2010        let ctn = self.ip_messages.entry(ip).or_default();
2011        *ctn = ctn.saturating_add(1);
2012        *ctn
2013    }
2014
2015    /// Returns true if we previously received the packet
2016    fn contains_packet(&mut self, hash: B256) -> bool {
2017        !self.unique_packets.insert(hash, ())
2018    }
2019}
2020
2021impl Default for ReceiveCache {
2022    fn default() -> Self {
2023        Self {
2024            ip_messages: Default::default(),
2025            unique_packets: schnellru::LruMap::new(schnellru::ByLength::new(32)),
2026        }
2027    }
2028}
2029
2030/// The commands sent from the frontend [Discv4] to the service [`Discv4Service`].
2031enum Discv4Command {
2032    Add(NodeRecord),
2033    SetTcpPort(u16),
2034    SetEIP868RLPPair { key: Vec<u8>, rlp: Bytes },
2035    Ban(PeerId, IpAddr),
2036    BanPeer(PeerId),
2037    BanIp(IpAddr),
2038    Remove(PeerId),
2039    Lookup { node_id: Option<PeerId>, tx: Option<NodeRecordSender> },
2040    SetLookupInterval(Duration),
2041    Updates(OneshotSender<ReceiverStream<DiscoveryUpdate>>),
2042    Terminated,
2043}
2044
2045/// Event type receiver produces
2046#[derive(Debug)]
2047pub(crate) enum IngressEvent {
2048    /// Encountered an error when reading a datagram message.
2049    RecvError(io::Error),
2050    /// Received a bad message
2051    BadPacket(SocketAddr, DecodePacketError, Vec<u8>),
2052    /// Received a datagram from an address.
2053    Packet(SocketAddr, Packet),
2054}
2055
2056/// Tracks a sent ping
2057#[derive(Debug)]
2058struct PingRequest {
2059    // Timestamp when the request was sent.
2060    sent_at: Instant,
2061    // Node to which the request was sent.
2062    node: NodeRecord,
2063    // Hash sent in the Ping request
2064    echo_hash: B256,
2065    /// Why this ping was sent.
2066    reason: PingReason,
2067}
2068
2069/// Rotates the `PeerId` that is periodically looked up.
2070///
2071/// By selecting different targets, the lookups will be seeded with different ALPHA seed nodes.
2072#[derive(Debug)]
2073struct LookupTargetRotator {
2074    interval: usize,
2075    counter: usize,
2076}
2077
2078// === impl LookupTargetRotator ===
2079
2080impl LookupTargetRotator {
2081    /// Returns a rotator that always returns the local target.
2082    const fn local_only() -> Self {
2083        Self { interval: 1, counter: 0 }
2084    }
2085}
2086
2087impl Default for LookupTargetRotator {
2088    fn default() -> Self {
2089        Self {
2090            // every 4th lookup is our own node
2091            interval: 4,
2092            counter: 3,
2093        }
2094    }
2095}
2096
2097impl LookupTargetRotator {
2098    /// this will return the next node id to lookup
2099    fn next(&mut self, local: &PeerId) -> PeerId {
2100        self.counter += 1;
2101        self.counter %= self.interval;
2102        if self.counter == 0 {
2103            return *local
2104        }
2105        PeerId::random()
2106    }
2107}
2108
2109/// Tracks lookups across multiple `FindNode` requests.
2110///
2111/// If this type is dropped by all Clones, it will send all the discovered nodes to the listener, if
2112/// one is present.
2113#[derive(Clone, Debug)]
2114struct LookupContext {
2115    inner: Rc<LookupContextInner>,
2116}
2117
2118impl LookupContext {
2119    /// Create new context for a recursive lookup
2120    fn new(
2121        target: discv5::Key<NodeKey>,
2122        nearest_nodes: impl IntoIterator<Item = (Distance, NodeRecord)>,
2123        listener: Option<NodeRecordSender>,
2124    ) -> Self {
2125        let closest_nodes = nearest_nodes
2126            .into_iter()
2127            .map(|(distance, record)| {
2128                (distance, QueryNode { record, queried: false, responded: false })
2129            })
2130            .collect();
2131
2132        let inner = Rc::new(LookupContextInner {
2133            target,
2134            closest_nodes: RefCell::new(closest_nodes),
2135            listener,
2136        });
2137        Self { inner }
2138    }
2139
2140    /// Returns the target of this lookup
2141    fn target(&self) -> PeerId {
2142        self.inner.target.preimage().0
2143    }
2144
2145    fn closest(&self, num: usize) -> Vec<NodeRecord> {
2146        self.inner
2147            .closest_nodes
2148            .borrow()
2149            .iter()
2150            .filter(|(_, node)| !node.queried)
2151            .map(|(_, n)| n.record)
2152            .take(num)
2153            .collect()
2154    }
2155
2156    /// Returns the closest nodes that have not been queried yet.
2157    fn filter_closest<P>(&self, num: usize, filter: P) -> Vec<NodeRecord>
2158    where
2159        P: FnMut(&NodeRecord) -> bool,
2160    {
2161        self.inner
2162            .closest_nodes
2163            .borrow()
2164            .iter()
2165            .filter(|(_, node)| !node.queried)
2166            .map(|(_, n)| n.record)
2167            .filter(filter)
2168            .take(num)
2169            .collect()
2170    }
2171
2172    /// Inserts the node if it's missing
2173    fn add_node(&self, record: NodeRecord) {
2174        let distance = self.inner.target.distance(&kad_key(record.id));
2175        let mut closest = self.inner.closest_nodes.borrow_mut();
2176        if let btree_map::Entry::Vacant(entry) = closest.entry(distance) {
2177            entry.insert(QueryNode { record, queried: false, responded: false });
2178        }
2179    }
2180
2181    fn set_queried(&self, id: PeerId, val: bool) {
2182        if let Some((_, node)) =
2183            self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2184        {
2185            node.queried = val;
2186        }
2187    }
2188
2189    /// Marks the node as queried
2190    fn mark_queried(&self, id: PeerId) {
2191        self.set_queried(id, true)
2192    }
2193
2194    /// Marks the node as not queried
2195    fn unmark_queried(&self, id: PeerId) {
2196        self.set_queried(id, false)
2197    }
2198
2199    /// Marks the node as responded
2200    fn mark_responded(&self, id: PeerId) {
2201        if let Some((_, node)) =
2202            self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2203        {
2204            node.responded = true;
2205        }
2206    }
2207}
2208
2209// SAFETY: The [`Discv4Service`] is intended to be spawned as task which requires `Send`.
2210// The `LookupContext` is shared by all active `FindNode` requests that are part of the lookup step.
2211// Which can modify the context. The shared context is only ever accessed mutably when a `Neighbour`
2212// response is processed and all Clones are stored inside [`Discv4Service`], in other words it is
2213// guaranteed that there's only 1 owner ([`Discv4Service`]) of all possible [`Rc`] clones of
2214// [`LookupContext`].
2215unsafe impl Send for LookupContext {}
2216#[derive(Debug)]
2217struct LookupContextInner {
2218    /// The target to lookup.
2219    target: discv5::Key<NodeKey>,
2220    /// The closest nodes
2221    closest_nodes: RefCell<BTreeMap<Distance, QueryNode>>,
2222    /// A listener for all the nodes retrieved in this lookup
2223    ///
2224    /// This is present if the lookup was triggered manually via [Discv4] and we want to return all
2225    /// the nodes once the lookup finishes.
2226    listener: Option<NodeRecordSender>,
2227}
2228
2229impl Drop for LookupContextInner {
2230    fn drop(&mut self) {
2231        if let Some(tx) = self.listener.take() {
2232            // there's only 1 instance shared across `FindNode` requests, if this is dropped then
2233            // all requests finished, and we can send all results back
2234            let nodes = self
2235                .closest_nodes
2236                .take()
2237                .into_values()
2238                .filter(|node| node.responded)
2239                .map(|node| node.record)
2240                .collect();
2241            let _ = tx.send(nodes);
2242        }
2243    }
2244}
2245
2246/// Tracks the state of a recursive lookup step
2247#[derive(Debug, Clone, Copy)]
2248struct QueryNode {
2249    record: NodeRecord,
2250    queried: bool,
2251    responded: bool,
2252}
2253
2254#[derive(Debug)]
2255struct FindNodeRequest {
2256    // Timestamp when the request was sent.
2257    sent_at: Instant,
2258    // Number of items sent by the node
2259    response_count: usize,
2260    // Whether the request has been answered yet.
2261    answered: bool,
2262    /// Response buffer
2263    lookup_context: LookupContext,
2264}
2265
2266// === impl FindNodeRequest ===
2267
2268impl FindNodeRequest {
2269    fn new(resp: LookupContext) -> Self {
2270        Self { sent_at: Instant::now(), response_count: 0, answered: false, lookup_context: resp }
2271    }
2272}
2273
2274#[derive(Debug)]
2275struct EnrRequestState {
2276    // Timestamp when the request was sent.
2277    sent_at: Instant,
2278    // Hash sent in the Ping request
2279    echo_hash: B256,
2280}
2281
2282/// Stored node info.
2283#[derive(Debug, Clone, Eq, PartialEq)]
2284struct NodeEntry {
2285    /// Node record info.
2286    record: NodeRecord,
2287    /// Timestamp of last pong.
2288    last_seen: Instant,
2289    /// Last enr seq we retrieved via a ENR request.
2290    last_enr_seq: Option<u64>,
2291    /// `ForkId` if retrieved via ENR requests.
2292    fork_id: Option<ForkId>,
2293    /// Counter for failed _consecutive_ findNode requests.
2294    find_node_failures: u8,
2295    /// Whether the endpoint of the peer is proven.
2296    has_endpoint_proof: bool,
2297}
2298
2299// === impl NodeEntry ===
2300
2301impl NodeEntry {
2302    /// Creates a new, unpopulated entry
2303    fn new(record: NodeRecord) -> Self {
2304        Self {
2305            record,
2306            last_seen: Instant::now(),
2307            last_enr_seq: None,
2308            fork_id: None,
2309            find_node_failures: 0,
2310            has_endpoint_proof: false,
2311        }
2312    }
2313
2314    #[cfg(test)]
2315    fn new_proven(record: NodeRecord) -> Self {
2316        let mut node = Self::new(record);
2317        node.has_endpoint_proof = true;
2318        node
2319    }
2320
2321    /// Marks the entry with an established proof and resets the consecutive failure counter.
2322    const fn establish_proof(&mut self) {
2323        self.has_endpoint_proof = true;
2324        self.find_node_failures = 0;
2325    }
2326
2327    /// Returns true if the tracked find node failures exceed the max amount
2328    const fn exceeds_find_node_failures(&self, max_failures: u8) -> bool {
2329        self.find_node_failures >= max_failures
2330    }
2331
2332    /// Updates the last timestamp and sets the enr seq
2333    fn update_with_enr(&mut self, last_enr_seq: Option<u64>) -> Option<u64> {
2334        self.update_now(|s| std::mem::replace(&mut s.last_enr_seq, last_enr_seq))
2335    }
2336
2337    /// Increases the failed request counter
2338    const fn inc_failed_request(&mut self) {
2339        self.find_node_failures += 1;
2340    }
2341
2342    /// Updates the last timestamp and sets the enr seq
2343    fn update_with_fork_id(&mut self, fork_id: Option<ForkId>) -> Option<ForkId> {
2344        self.update_now(|s| std::mem::replace(&mut s.fork_id, fork_id))
2345    }
2346
2347    /// Updates the `last_seen` timestamp and calls the closure
2348    fn update_now<F, R>(&mut self, f: F) -> R
2349    where
2350        F: FnOnce(&mut Self) -> R,
2351    {
2352        self.last_seen = Instant::now();
2353        f(self)
2354    }
2355}
2356
2357// === impl NodeEntry ===
2358
2359impl NodeEntry {
2360    /// Returns true if the node should be re-pinged.
2361    fn is_expired(&self) -> bool {
2362        self.last_seen.elapsed() > (ENDPOINT_PROOF_EXPIRATION / 2)
2363    }
2364}
2365
2366/// Represents why a ping is issued
2367#[derive(Debug)]
2368enum PingReason {
2369    /// Initial ping to a previously unknown peer that was inserted into the table.
2370    InitialInsert,
2371    /// A ping to a peer to establish a bond (endpoint proof).
2372    EstablishBond,
2373    /// Re-ping a peer.
2374    RePing,
2375    /// Part of a lookup to ensure endpoint is proven before we can send a `FindNode` request.
2376    Lookup(NodeRecord, LookupContext),
2377}
2378
2379/// Represents node related updates state changes in the underlying node table
2380#[derive(Debug, Clone)]
2381pub enum DiscoveryUpdate {
2382    /// A new node was discovered _and_ added to the table.
2383    Added(NodeRecord),
2384    /// A new node was discovered but _not_ added to the table because it is currently full.
2385    DiscoveredAtCapacity(NodeRecord),
2386    /// Received a [`ForkId`] via EIP-868 for the given [`NodeRecord`].
2387    EnrForkId(NodeRecord, ForkId),
2388    /// Node that was removed from the table
2389    Removed(PeerId),
2390    /// A series of updates
2391    Batch(Vec<DiscoveryUpdate>),
2392}
2393
2394#[cfg(test)]
2395mod tests {
2396    use super::*;
2397    use crate::test_utils::{create_discv4, create_discv4_with_config, rng_endpoint, rng_record};
2398    use alloy_primitives::hex;
2399    use alloy_rlp::{Decodable, Encodable};
2400    use rand_08::Rng;
2401    use reth_ethereum_forks::{EnrForkIdEntry, ForkHash};
2402    use reth_network_peers::mainnet_nodes;
2403    use std::future::poll_fn;
2404
2405    #[tokio::test]
2406    async fn test_configured_enr_forkid_entry() {
2407        let fork: ForkId = ForkId { hash: ForkHash([220, 233, 108, 45]), next: 0u64 };
2408        let mut disc_conf = Discv4Config::default();
2409        disc_conf.add_eip868_pair("eth", EnrForkIdEntry::from(fork));
2410        let (_discv4, service) = create_discv4_with_config(disc_conf).await;
2411        let eth = service.local_eip_868_enr.get_raw_rlp(b"eth").unwrap();
2412        let fork_entry_id = EnrForkIdEntry::decode(&mut &eth[..]).unwrap();
2413
2414        let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2415        let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2416        let expected = EnrForkIdEntry {
2417            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2418        };
2419        assert_eq!(expected, fork_entry_id);
2420        assert_eq!(expected, decoded);
2421    }
2422
2423    #[test]
2424    fn test_enr_forkid_entry_decode() {
2425        let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2426        let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2427        let expected = EnrForkIdEntry {
2428            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2429        };
2430        assert_eq!(expected, decoded);
2431    }
2432
2433    #[test]
2434    fn test_enr_forkid_entry_encode() {
2435        let original = EnrForkIdEntry {
2436            fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2437        };
2438        let expected: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2439        let mut encoded = Vec::with_capacity(expected.len());
2440        original.encode(&mut encoded);
2441        assert_eq!(&expected[..], encoded.as_slice());
2442    }
2443
2444    #[test]
2445    fn test_local_rotator() {
2446        let id = PeerId::random();
2447        let mut rotator = LookupTargetRotator::local_only();
2448        assert_eq!(rotator.next(&id), id);
2449        assert_eq!(rotator.next(&id), id);
2450    }
2451
2452    #[test]
2453    fn test_rotator() {
2454        let id = PeerId::random();
2455        let mut rotator = LookupTargetRotator::default();
2456        assert_eq!(rotator.next(&id), id);
2457        assert_ne!(rotator.next(&id), id);
2458        assert_ne!(rotator.next(&id), id);
2459        assert_ne!(rotator.next(&id), id);
2460        assert_eq!(rotator.next(&id), id);
2461    }
2462
2463    #[tokio::test]
2464    async fn test_pending_ping() {
2465        let (_, mut service) = create_discv4().await;
2466
2467        let local_addr = service.local_addr();
2468
2469        let mut num_inserted = 0;
2470        loop {
2471            let node = NodeRecord::new(local_addr, PeerId::random());
2472            if service.add_node(node) {
2473                num_inserted += 1;
2474                assert!(service.pending_pings.contains_key(&node.id));
2475                assert_eq!(service.pending_pings.len(), num_inserted);
2476                if num_inserted == MAX_NODES_PING {
2477                    break
2478                }
2479            }
2480        }
2481
2482        // `pending_pings` is full, insert into `queued_pings`.
2483        num_inserted = 0;
2484        for _ in 0..MAX_NODES_PING {
2485            let node = NodeRecord::new(local_addr, PeerId::random());
2486            if service.add_node(node) {
2487                num_inserted += 1;
2488                assert!(!service.pending_pings.contains_key(&node.id));
2489                assert_eq!(service.pending_pings.len(), MAX_NODES_PING);
2490                assert_eq!(service.queued_pings.len(), num_inserted);
2491            }
2492        }
2493    }
2494
2495    // Bootstraps with mainnet boot nodes
2496    #[tokio::test(flavor = "multi_thread")]
2497    #[ignore]
2498    async fn test_mainnet_lookup() {
2499        reth_tracing::init_test_tracing();
2500        let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2501
2502        let all_nodes = mainnet_nodes();
2503        let config = Discv4Config::builder()
2504            .add_boot_nodes(all_nodes)
2505            .lookup_interval(Duration::from_secs(1))
2506            .add_eip868_pair("eth", fork_id)
2507            .build();
2508        let (_discv4, mut service) = create_discv4_with_config(config).await;
2509
2510        let mut updates = service.update_stream();
2511
2512        let _handle = service.spawn();
2513
2514        let mut table = HashMap::new();
2515        while let Some(update) = updates.next().await {
2516            match update {
2517                DiscoveryUpdate::EnrForkId(record, fork_id) => {
2518                    println!("{record:?}, {fork_id:?}");
2519                }
2520                DiscoveryUpdate::Added(record) => {
2521                    table.insert(record.id, record);
2522                }
2523                DiscoveryUpdate::Removed(id) => {
2524                    table.remove(&id);
2525                }
2526                _ => {}
2527            }
2528            println!("total peers {}", table.len());
2529        }
2530    }
2531
2532    #[tokio::test]
2533    async fn test_mapped_ipv4() {
2534        reth_tracing::init_test_tracing();
2535        let mut rng = rand_08::thread_rng();
2536        let config = Discv4Config::builder().build();
2537        let (_discv4, mut service) = create_discv4_with_config(config).await;
2538
2539        let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2540        let v6 = v4.to_ipv6_mapped();
2541        let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2542
2543        let ping = Ping {
2544            from: rng_endpoint(&mut rng),
2545            to: rng_endpoint(&mut rng),
2546            expire: service.ping_expiration(),
2547            enr_sq: Some(rng.r#gen()),
2548        };
2549
2550        let id = PeerId::random();
2551        service.on_ping(ping, addr, id, B256::random());
2552
2553        let key = kad_key(id);
2554        match service.kbuckets.entry(&key) {
2555            kbucket::Entry::Present(entry, _) => {
2556                let node_addr = entry.value().record.address;
2557                assert!(node_addr.is_ipv4());
2558                assert_eq!(node_addr, IpAddr::from(v4));
2559            }
2560            _ => unreachable!(),
2561        };
2562    }
2563
2564    #[tokio::test]
2565    async fn test_respect_ping_expiration() {
2566        reth_tracing::init_test_tracing();
2567        let mut rng = rand_08::thread_rng();
2568        let config = Discv4Config::builder().build();
2569        let (_discv4, mut service) = create_discv4_with_config(config).await;
2570
2571        let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2572        let v6 = v4.to_ipv6_mapped();
2573        let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2574
2575        let ping = Ping {
2576            from: rng_endpoint(&mut rng),
2577            to: rng_endpoint(&mut rng),
2578            expire: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() - 1,
2579            enr_sq: Some(rng.r#gen()),
2580        };
2581
2582        let id = PeerId::random();
2583        service.on_ping(ping, addr, id, B256::random());
2584
2585        let key = kad_key(id);
2586        match service.kbuckets.entry(&key) {
2587            kbucket::Entry::Absent(_) => {}
2588            _ => unreachable!(),
2589        };
2590    }
2591
2592    #[tokio::test]
2593    async fn test_single_lookups() {
2594        reth_tracing::init_test_tracing();
2595
2596        let config = Discv4Config::builder().build();
2597        let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2598
2599        let id = PeerId::random();
2600        let key = kad_key(id);
2601        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2602
2603        let _ = service.kbuckets.insert_or_update(
2604            &key,
2605            NodeEntry::new_proven(record),
2606            NodeStatus {
2607                direction: ConnectionDirection::Incoming,
2608                state: ConnectionState::Connected,
2609            },
2610        );
2611
2612        service.lookup_self();
2613        assert_eq!(service.pending_find_nodes.len(), 1);
2614
2615        poll_fn(|cx| {
2616            let _ = service.poll(cx);
2617            assert_eq!(service.pending_find_nodes.len(), 1);
2618
2619            Poll::Ready(())
2620        })
2621        .await;
2622    }
2623
2624    #[tokio::test]
2625    async fn test_on_neighbours_recursive_lookup() {
2626        reth_tracing::init_test_tracing();
2627
2628        let config = Discv4Config::builder().build();
2629        let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2630        let (_discv4, mut service2) = create_discv4_with_config(config).await;
2631
2632        let id = PeerId::random();
2633        let key = kad_key(id);
2634        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2635
2636        let _ = service.kbuckets.insert_or_update(
2637            &key,
2638            NodeEntry::new_proven(record),
2639            NodeStatus {
2640                direction: ConnectionDirection::Incoming,
2641                state: ConnectionState::Connected,
2642            },
2643        );
2644        // Needed in this test to populate self.pending_find_nodes for as a prereq to a valid
2645        // on_neighbours request
2646        service.lookup_self();
2647        assert_eq!(service.pending_find_nodes.len(), 1);
2648
2649        poll_fn(|cx| {
2650            let _ = service.poll(cx);
2651            assert_eq!(service.pending_find_nodes.len(), 1);
2652
2653            Poll::Ready(())
2654        })
2655        .await;
2656
2657        let expiry = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() +
2658            10000000000000;
2659        let msg = Neighbours { nodes: vec![service2.local_node_record], expire: expiry };
2660        service.on_neighbours(msg, record.tcp_addr(), id);
2661        // wait for the processed ping
2662        let event = poll_fn(|cx| service2.poll(cx)).await;
2663        assert_eq!(event, Discv4Event::Ping);
2664        // assert that no find_node req has been added here on top of the initial one, since both
2665        // sides of the endpoint proof is not completed here
2666        assert_eq!(service.pending_find_nodes.len(), 1);
2667        // we now wait for PONG
2668        let event = poll_fn(|cx| service.poll(cx)).await;
2669        assert_eq!(event, Discv4Event::Pong);
2670        // Ideally we want to assert against service.pending_lookup.len() here - but because the
2671        // service2 sends Pong and Ping consecutivley on_ping(), the pending_lookup table gets
2672        // drained almost immediately - and no way to grab the handle to its intermediary state here
2673        // :(
2674        let event = poll_fn(|cx| service.poll(cx)).await;
2675        assert_eq!(event, Discv4Event::Ping);
2676        // assert that we've added the find_node req here after both sides of the endpoint proof is
2677        // done
2678        assert_eq!(service.pending_find_nodes.len(), 2);
2679    }
2680
2681    #[tokio::test]
2682    async fn test_no_local_in_closest() {
2683        reth_tracing::init_test_tracing();
2684
2685        let config = Discv4Config::builder().build();
2686        let (_discv4, mut service) = create_discv4_with_config(config).await;
2687
2688        let target_key = kad_key(PeerId::random());
2689
2690        let id = PeerId::random();
2691        let key = kad_key(id);
2692        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2693
2694        let _ = service.kbuckets.insert_or_update(
2695            &key,
2696            NodeEntry::new(record),
2697            NodeStatus {
2698                direction: ConnectionDirection::Incoming,
2699                state: ConnectionState::Connected,
2700            },
2701        );
2702
2703        let closest = service
2704            .kbuckets
2705            .closest_values(&target_key)
2706            .map(|n| n.value.record)
2707            .take(MAX_NODES_PER_BUCKET)
2708            .collect::<Vec<_>>();
2709
2710        assert_eq!(closest.len(), 1);
2711        assert!(!closest.iter().any(|r| r.id == *service.local_peer_id()));
2712    }
2713
2714    #[tokio::test]
2715    async fn test_random_lookup() {
2716        reth_tracing::init_test_tracing();
2717
2718        let config = Discv4Config::builder().build();
2719        let (_discv4, mut service) = create_discv4_with_config(config).await;
2720
2721        let target = PeerId::random();
2722
2723        let id = PeerId::random();
2724        let key = kad_key(id);
2725        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2726
2727        let _ = service.kbuckets.insert_or_update(
2728            &key,
2729            NodeEntry::new_proven(record),
2730            NodeStatus {
2731                direction: ConnectionDirection::Incoming,
2732                state: ConnectionState::Connected,
2733            },
2734        );
2735
2736        service.lookup(target);
2737        assert_eq!(service.pending_find_nodes.len(), 1);
2738
2739        let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2740
2741        assert_eq!(ctx.target(), target);
2742        assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2743
2744        ctx.add_node(record);
2745        assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2746    }
2747
2748    #[tokio::test]
2749    async fn test_reping_on_find_node_failures() {
2750        reth_tracing::init_test_tracing();
2751
2752        let config = Discv4Config::builder().build();
2753        let (_discv4, mut service) = create_discv4_with_config(config).await;
2754
2755        let target = PeerId::random();
2756
2757        let id = PeerId::random();
2758        let key = kad_key(id);
2759        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2760
2761        let mut entry = NodeEntry::new_proven(record);
2762        entry.find_node_failures = u8::MAX;
2763        let _ = service.kbuckets.insert_or_update(
2764            &key,
2765            entry,
2766            NodeStatus {
2767                direction: ConnectionDirection::Incoming,
2768                state: ConnectionState::Connected,
2769            },
2770        );
2771
2772        service.lookup(target);
2773        assert_eq!(service.pending_find_nodes.len(), 0);
2774        assert_eq!(service.pending_pings.len(), 1);
2775
2776        service.update_on_pong(record, None);
2777
2778        service
2779            .on_entry(record.id, |entry| {
2780                // reset on pong
2781                assert_eq!(entry.find_node_failures, 0);
2782                assert!(entry.has_endpoint_proof);
2783            })
2784            .unwrap();
2785    }
2786
2787    #[tokio::test]
2788    async fn test_service_commands() {
2789        reth_tracing::init_test_tracing();
2790
2791        let config = Discv4Config::builder().build();
2792        let (discv4, mut service) = create_discv4_with_config(config).await;
2793
2794        service.lookup_self();
2795
2796        let _handle = service.spawn();
2797        discv4.send_lookup_self();
2798        let _ = discv4.lookup_self().await;
2799    }
2800
2801    #[tokio::test]
2802    async fn test_requests_timeout() {
2803        reth_tracing::init_test_tracing();
2804        let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2805
2806        let config = Discv4Config::builder()
2807            .request_timeout(Duration::from_millis(200))
2808            .ping_expiration(Duration::from_millis(200))
2809            .lookup_neighbours_expiration(Duration::from_millis(200))
2810            .add_eip868_pair("eth", fork_id)
2811            .build();
2812        let (_disv4, mut service) = create_discv4_with_config(config).await;
2813
2814        let id = PeerId::random();
2815        let key = kad_key(id);
2816        let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2817
2818        let _ = service.kbuckets.insert_or_update(
2819            &key,
2820            NodeEntry::new_proven(record),
2821            NodeStatus {
2822                direction: ConnectionDirection::Incoming,
2823                state: ConnectionState::Connected,
2824            },
2825        );
2826
2827        service.lookup_self();
2828        assert_eq!(service.pending_find_nodes.len(), 1);
2829
2830        let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2831
2832        service.pending_lookup.insert(record.id, (Instant::now(), ctx));
2833
2834        assert_eq!(service.pending_lookup.len(), 1);
2835
2836        let ping = Ping {
2837            from: service.local_node_record.into(),
2838            to: record.into(),
2839            expire: service.ping_expiration(),
2840            enr_sq: service.enr_seq(),
2841        };
2842        let echo_hash = service.send_packet(Message::Ping(ping), record.udp_addr());
2843        let ping_request = PingRequest {
2844            sent_at: Instant::now(),
2845            node: record,
2846            echo_hash,
2847            reason: PingReason::InitialInsert,
2848        };
2849        service.pending_pings.insert(record.id, ping_request);
2850
2851        assert_eq!(service.pending_pings.len(), 1);
2852
2853        tokio::time::sleep(Duration::from_secs(1)).await;
2854
2855        poll_fn(|cx| {
2856            let _ = service.poll(cx);
2857
2858            assert_eq!(service.pending_find_nodes.len(), 0);
2859            assert_eq!(service.pending_lookup.len(), 0);
2860            assert_eq!(service.pending_pings.len(), 0);
2861
2862            Poll::Ready(())
2863        })
2864        .await;
2865    }
2866
2867    // sends a PING packet with wrong 'to' field and expects a PONG response.
2868    #[tokio::test(flavor = "multi_thread")]
2869    async fn test_check_wrong_to() {
2870        reth_tracing::init_test_tracing();
2871
2872        let config = Discv4Config::builder().external_ip_resolver(None).build();
2873        let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2874        let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2875
2876        // ping node 2 with wrong to field
2877        let mut ping = Ping {
2878            from: service_1.local_node_record.into(),
2879            to: service_2.local_node_record.into(),
2880            expire: service_1.ping_expiration(),
2881            enr_sq: service_1.enr_seq(),
2882        };
2883        ping.to.address = "192.0.2.0".parse().unwrap();
2884
2885        let echo_hash = service_1.send_packet(Message::Ping(ping), service_2.local_addr());
2886        let ping_request = PingRequest {
2887            sent_at: Instant::now(),
2888            node: service_2.local_node_record,
2889            echo_hash,
2890            reason: PingReason::InitialInsert,
2891        };
2892        service_1.pending_pings.insert(*service_2.local_peer_id(), ping_request);
2893
2894        // wait for the processed ping
2895        let event = poll_fn(|cx| service_2.poll(cx)).await;
2896        assert_eq!(event, Discv4Event::Ping);
2897
2898        // we now wait for PONG
2899        let event = poll_fn(|cx| service_1.poll(cx)).await;
2900        assert_eq!(event, Discv4Event::Pong);
2901        // followed by a ping
2902        let event = poll_fn(|cx| service_1.poll(cx)).await;
2903        assert_eq!(event, Discv4Event::Ping);
2904    }
2905
2906    #[tokio::test(flavor = "multi_thread")]
2907    async fn test_check_ping_pong() {
2908        reth_tracing::init_test_tracing();
2909
2910        let config = Discv4Config::builder().external_ip_resolver(None).build();
2911        let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2912        let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2913
2914        // send ping from 1 -> 2
2915        service_1.add_node(service_2.local_node_record);
2916
2917        // wait for the processed ping
2918        let event = poll_fn(|cx| service_2.poll(cx)).await;
2919        assert_eq!(event, Discv4Event::Ping);
2920
2921        // node is now in the table but not connected yet
2922        let key1 = kad_key(*service_1.local_peer_id());
2923        match service_2.kbuckets.entry(&key1) {
2924            kbucket::Entry::Present(_entry, status) => {
2925                assert!(!status.is_connected());
2926            }
2927            _ => unreachable!(),
2928        }
2929
2930        // we now wait for PONG
2931        let event = poll_fn(|cx| service_1.poll(cx)).await;
2932        assert_eq!(event, Discv4Event::Pong);
2933
2934        // endpoint is proven
2935        let key2 = kad_key(*service_2.local_peer_id());
2936        match service_1.kbuckets.entry(&key2) {
2937            kbucket::Entry::Present(_entry, status) => {
2938                assert!(status.is_connected());
2939            }
2940            _ => unreachable!(),
2941        }
2942
2943        // we now wait for the PING initiated by 2
2944        let event = poll_fn(|cx| service_1.poll(cx)).await;
2945        assert_eq!(event, Discv4Event::Ping);
2946
2947        // we now wait for PONG
2948        let event = poll_fn(|cx| service_2.poll(cx)).await;
2949
2950        match event {
2951            Discv4Event::EnrRequest => {
2952                // since we support enr in the ping it may also request the enr
2953                let event = poll_fn(|cx| service_2.poll(cx)).await;
2954                match event {
2955                    Discv4Event::EnrRequest => {
2956                        let event = poll_fn(|cx| service_2.poll(cx)).await;
2957                        assert_eq!(event, Discv4Event::Pong);
2958                    }
2959                    Discv4Event::Pong => {}
2960                    _ => {
2961                        unreachable!()
2962                    }
2963                }
2964            }
2965            Discv4Event::Pong => {}
2966            ev => unreachable!("{ev:?}"),
2967        }
2968
2969        // endpoint is proven
2970        match service_2.kbuckets.entry(&key1) {
2971            kbucket::Entry::Present(_entry, status) => {
2972                assert!(status.is_connected());
2973            }
2974            ev => unreachable!("{ev:?}"),
2975        }
2976    }
2977
2978    #[test]
2979    fn test_insert() {
2980        let local_node_record = rng_record(&mut rand_08::thread_rng());
2981        let mut kbuckets: KBucketsTable<NodeKey, NodeEntry> = KBucketsTable::new(
2982            NodeKey::from(&local_node_record).into(),
2983            Duration::from_secs(60),
2984            MAX_NODES_PER_BUCKET,
2985            None,
2986            None,
2987        );
2988
2989        let new_record = rng_record(&mut rand_08::thread_rng());
2990        let key = kad_key(new_record.id);
2991        match kbuckets.entry(&key) {
2992            kbucket::Entry::Absent(entry) => {
2993                let node = NodeEntry::new(new_record);
2994                let _ = entry.insert(
2995                    node,
2996                    NodeStatus {
2997                        direction: ConnectionDirection::Outgoing,
2998                        state: ConnectionState::Disconnected,
2999                    },
3000                );
3001            }
3002            _ => {
3003                unreachable!()
3004            }
3005        };
3006        match kbuckets.entry(&key) {
3007            kbucket::Entry::Present(_, _) => {}
3008            _ => {
3009                unreachable!()
3010            }
3011        }
3012    }
3013
3014    #[tokio::test]
3015    async fn test_bootnode_not_in_update_stream() {
3016        reth_tracing::init_test_tracing();
3017        let (_, service_1) = create_discv4().await;
3018        let peerid_1 = *service_1.local_peer_id();
3019
3020        let config = Discv4Config::builder().add_boot_node(service_1.local_node_record).build();
3021        service_1.spawn();
3022
3023        let (_, mut service_2) = create_discv4_with_config(config).await;
3024
3025        let mut updates = service_2.update_stream();
3026
3027        service_2.spawn();
3028
3029        // Poll for events for a reasonable time
3030        let mut bootnode_appeared = false;
3031        let timeout = tokio::time::sleep(Duration::from_secs(1));
3032        tokio::pin!(timeout);
3033
3034        loop {
3035            tokio::select! {
3036                Some(update) = updates.next() => {
3037                    if let DiscoveryUpdate::Added(record) = update {
3038                        if record.id == peerid_1 {
3039                            bootnode_appeared = true;
3040                            break;
3041                        }
3042                    }
3043                }
3044                _ = &mut timeout => break,
3045            }
3046        }
3047
3048        // Assert bootnode did not appear in update stream
3049        assert!(bootnode_appeared, "Bootnode should appear in update stream");
3050    }
3051}