1#![doc(
20 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
21 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
22 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
23)]
24#![cfg_attr(not(test), warn(unused_crate_dependencies))]
25#![cfg_attr(docsrs, feature(doc_cfg))]
26
27use crate::{
28 error::{DecodePacketError, Discv4Error},
29 proto::{FindNode, Message, Neighbours, Packet, Ping, Pong},
30};
31use alloy_primitives::{bytes::Bytes, hex, B256};
32use discv5::{
33 kbucket,
34 kbucket::{
35 BucketInsertResult, Distance, Entry as BucketEntry, InsertResult, KBucketsTable,
36 NodeStatus, MAX_NODES_PER_BUCKET,
37 },
38 ConnectionDirection, ConnectionState,
39};
40use enr::Enr;
41use itertools::Itertools;
42use parking_lot::Mutex;
43use proto::{EnrRequest, EnrResponse};
44use reth_ethereum_forks::ForkId;
45use reth_network_peers::{pk2id, PeerId};
46use secp256k1::SecretKey;
47use std::{
48 cell::RefCell,
49 collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque},
50 fmt,
51 future::poll_fn,
52 io,
53 net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
54 pin::Pin,
55 rc::Rc,
56 sync::Arc,
57 task::{ready, Context, Poll},
58 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
59};
60use tokio::{
61 net::UdpSocket,
62 sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::Sender as OneshotSender},
63 task::{JoinHandle, JoinSet},
64 time::Interval,
65};
66use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
67use tracing::{debug, trace};
68
69pub mod error;
70pub mod proto;
71
72mod config;
73pub use config::{Discv4Config, Discv4ConfigBuilder};
74
75mod node;
76use node::{kad_key, NodeKey};
77
78mod table;
79
80pub use reth_network_peers::NodeRecord;
82
83#[cfg(any(test, feature = "test-utils"))]
84pub mod test_utils;
85
86use crate::table::PongTable;
87use reth_net_nat::ResolveNatInterval;
88pub use reth_net_nat::{external_ip, NatResolver};
90
91pub const DEFAULT_DISCOVERY_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
95
96pub const DEFAULT_DISCOVERY_PORT: u16 = 30303;
100
101pub const DEFAULT_DISCOVERY_ADDRESS: SocketAddr =
105 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT));
106
107const MAX_PACKET_SIZE: usize = 1280;
109
110const MIN_PACKET_SIZE: usize = 32 + 65 + 1;
112
113const ALPHA: usize = 3;
115
116const MAX_NODES_PING: usize = 2 * MAX_NODES_PER_BUCKET;
121
122const MAX_QUEUED_PINGS: usize = 2 * MAX_NODES_PER_BUCKET;
130
131const SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS: usize = (MAX_PACKET_SIZE - 109) / 91;
137
138const ENDPOINT_PROOF_EXPIRATION: Duration = Duration::from_secs(24 * 60 * 60);
142
143const EXPIRE_DURATION: Duration = Duration::from_secs(60 * 60);
145
146const UDP_MESSAGE_POLL_LOOP_BUDGET: i32 = 4;
151
152type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>;
153type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>;
154
155pub(crate) type IngressSender = mpsc::Sender<IngressEvent>;
156pub(crate) type IngressReceiver = mpsc::Receiver<IngressEvent>;
157
158type NodeRecordSender = OneshotSender<Vec<NodeRecord>>;
159
160#[derive(Debug, Clone)]
167pub struct Discv4 {
168 local_addr: SocketAddr,
170 to_service: mpsc::UnboundedSender<Discv4Command>,
172 node_record: Arc<Mutex<NodeRecord>>,
176}
177
178impl Discv4 {
179 pub async fn spawn(
183 local_address: SocketAddr,
184 local_enr: NodeRecord,
185 secret_key: SecretKey,
186 config: Discv4Config,
187 ) -> io::Result<Self> {
188 let (discv4, service) = Self::bind(local_address, local_enr, secret_key, config).await?;
189
190 service.spawn();
191
192 Ok(discv4)
193 }
194
195 #[cfg(feature = "test-utils")]
199 pub fn noop() -> Self {
200 let (to_service, _rx) = mpsc::unbounded_channel();
201 let local_addr =
202 (IpAddr::from(std::net::Ipv4Addr::UNSPECIFIED), DEFAULT_DISCOVERY_PORT).into();
203 Self {
204 local_addr,
205 to_service,
206 node_record: Arc::new(Mutex::new(NodeRecord::new(
207 "127.0.0.1:3030".parse().unwrap(),
208 PeerId::random(),
209 ))),
210 }
211 }
212
213 pub async fn bind(
245 local_address: SocketAddr,
246 mut local_node_record: NodeRecord,
247 secret_key: SecretKey,
248 config: Discv4Config,
249 ) -> io::Result<(Self, Discv4Service)> {
250 let socket = UdpSocket::bind(local_address).await?;
251 let local_addr = socket.local_addr()?;
252 local_node_record.udp_port = local_addr.port();
253 trace!(target: "discv4", ?local_addr,"opened UDP socket");
254
255 let mut service =
256 Discv4Service::new(socket, local_addr, local_node_record, secret_key, config);
257
258 service.resolve_external_ip();
260
261 let discv4 = service.handle();
262 Ok((discv4, service))
263 }
264
265 pub const fn local_addr(&self) -> SocketAddr {
267 self.local_addr
268 }
269
270 pub fn node_record(&self) -> NodeRecord {
274 *self.node_record.lock()
275 }
276
277 pub fn external_ip(&self) -> IpAddr {
279 self.node_record.lock().address
280 }
281
282 pub fn set_lookup_interval(&self, duration: Duration) {
284 self.send_to_service(Discv4Command::SetLookupInterval(duration))
285 }
286
287 pub async fn lookup_self(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
302 self.lookup_node(None).await
303 }
304
305 pub async fn lookup(&self, node_id: PeerId) -> Result<Vec<NodeRecord>, Discv4Error> {
309 self.lookup_node(Some(node_id)).await
310 }
311
312 pub async fn lookup_random(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
314 let target = PeerId::random();
315 self.lookup_node(Some(target)).await
316 }
317
318 pub fn send_lookup(&self, node_id: PeerId) {
320 let cmd = Discv4Command::Lookup { node_id: Some(node_id), tx: None };
321 self.send_to_service(cmd);
322 }
323
324 async fn lookup_node(&self, node_id: Option<PeerId>) -> Result<Vec<NodeRecord>, Discv4Error> {
325 let (tx, rx) = oneshot::channel();
326 let cmd = Discv4Command::Lookup { node_id, tx: Some(tx) };
327 self.to_service.send(cmd)?;
328 Ok(rx.await?)
329 }
330
331 pub fn send_lookup_self(&self) {
333 let cmd = Discv4Command::Lookup { node_id: None, tx: None };
334 self.send_to_service(cmd);
335 }
336
337 pub fn remove_peer(&self, node_id: PeerId) {
339 let cmd = Discv4Command::Remove(node_id);
340 self.send_to_service(cmd);
341 }
342
343 pub fn add_node(&self, node_record: NodeRecord) {
345 let cmd = Discv4Command::Add(node_record);
346 self.send_to_service(cmd);
347 }
348
349 pub fn ban(&self, node_id: PeerId, ip: IpAddr) {
353 let cmd = Discv4Command::Ban(node_id, ip);
354 self.send_to_service(cmd);
355 }
356
357 pub fn ban_ip(&self, ip: IpAddr) {
361 let cmd = Discv4Command::BanIp(ip);
362 self.send_to_service(cmd);
363 }
364
365 pub fn ban_node(&self, node_id: PeerId) {
369 let cmd = Discv4Command::BanPeer(node_id);
370 self.send_to_service(cmd);
371 }
372
373 pub fn set_tcp_port(&self, port: u16) {
377 let cmd = Discv4Command::SetTcpPort(port);
378 self.send_to_service(cmd);
379 }
380
381 pub fn set_eip868_rlp_pair(&self, key: Vec<u8>, rlp: Bytes) {
387 let cmd = Discv4Command::SetEIP868RLPPair { key, rlp };
388 self.send_to_service(cmd);
389 }
390
391 pub fn set_eip868_rlp(&self, key: Vec<u8>, value: impl alloy_rlp::Encodable) {
395 self.set_eip868_rlp_pair(key, Bytes::from(alloy_rlp::encode(&value)))
396 }
397
398 #[inline]
399 fn send_to_service(&self, cmd: Discv4Command) {
400 let _ = self.to_service.send(cmd).map_err(|err| {
401 debug!(
402 target: "discv4",
403 %err,
404 "channel capacity reached, dropping command",
405 )
406 });
407 }
408
409 pub async fn update_stream(&self) -> Result<ReceiverStream<DiscoveryUpdate>, Discv4Error> {
411 let (tx, rx) = oneshot::channel();
412 let cmd = Discv4Command::Updates(tx);
413 self.to_service.send(cmd)?;
414 Ok(rx.await?)
415 }
416
417 pub fn terminate(&self) {
419 self.send_to_service(Discv4Command::Terminated);
420 }
421}
422
423#[must_use = "Stream does nothing unless polled"]
437pub struct Discv4Service {
438 local_address: SocketAddr,
440 local_eip_868_enr: Enr<SecretKey>,
442 local_node_record: NodeRecord,
444 shared_node_record: Arc<Mutex<NodeRecord>>,
446 secret_key: SecretKey,
448 _socket: Arc<UdpSocket>,
450 _tasks: JoinSet<()>,
454 kbuckets: KBucketsTable<NodeKey, NodeEntry>,
456 ingress: IngressReceiver,
460 egress: EgressSender,
464 queued_pings: VecDeque<(NodeRecord, PingReason)>,
471 pending_pings: HashMap<PeerId, PingRequest>,
473 pending_lookup: HashMap<PeerId, (Instant, LookupContext)>,
478 pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
480 pending_enr_requests: HashMap<PeerId, EnrRequestState>,
482 to_service: mpsc::UnboundedSender<Discv4Command>,
484 commands_rx: mpsc::UnboundedReceiver<Discv4Command>,
486 update_listeners: Vec<mpsc::Sender<DiscoveryUpdate>>,
488 lookup_interval: Interval,
490 lookup_rotator: LookupTargetRotator,
492 evict_expired_requests_interval: Interval,
494 ping_interval: Interval,
496 resolve_external_ip_interval: Option<ResolveNatInterval>,
498 config: Discv4Config,
500 queued_events: VecDeque<Discv4Event>,
502 received_pongs: PongTable,
504 expire_interval: Interval,
506}
507
508impl Discv4Service {
509 pub(crate) fn new(
511 socket: UdpSocket,
512 local_address: SocketAddr,
513 local_node_record: NodeRecord,
514 secret_key: SecretKey,
515 config: Discv4Config,
516 ) -> Self {
517 let socket = Arc::new(socket);
518 let (ingress_tx, ingress_rx) = mpsc::channel(config.udp_ingress_message_buffer);
519 let (egress_tx, egress_rx) = mpsc::channel(config.udp_egress_message_buffer);
520 let mut tasks = JoinSet::<()>::new();
521
522 let udp = Arc::clone(&socket);
523 tasks.spawn(receive_loop(udp, ingress_tx, local_node_record.id));
524
525 let udp = Arc::clone(&socket);
526 tasks.spawn(send_loop(udp, egress_rx));
527
528 let kbuckets = KBucketsTable::new(
529 NodeKey::from(&local_node_record).into(),
530 Duration::from_secs(60),
531 MAX_NODES_PER_BUCKET,
532 None,
533 None,
534 );
535
536 let self_lookup_interval = tokio::time::interval(config.lookup_interval);
537
538 let ping_interval = tokio::time::interval_at(
541 tokio::time::Instant::now() + config.ping_interval,
542 config.ping_interval,
543 );
544
545 let evict_expired_requests_interval = tokio::time::interval_at(
546 tokio::time::Instant::now() + config.request_timeout,
547 config.request_timeout,
548 );
549
550 let lookup_rotator = if config.enable_dht_random_walk {
551 LookupTargetRotator::default()
552 } else {
553 LookupTargetRotator::local_only()
554 };
555
556 let local_eip_868_enr = {
558 let mut builder = Enr::builder();
559 builder.ip(local_node_record.address);
560 if local_node_record.address.is_ipv4() {
561 builder.udp4(local_node_record.udp_port);
562 builder.tcp4(local_node_record.tcp_port);
563 } else {
564 builder.udp6(local_node_record.udp_port);
565 builder.tcp6(local_node_record.tcp_port);
566 }
567
568 for (key, val) in &config.additional_eip868_rlp_pairs {
569 builder.add_value_rlp(key, val.clone());
570 }
571 builder.build(&secret_key).expect("v4 is set")
572 };
573
574 let (to_service, commands_rx) = mpsc::unbounded_channel();
575
576 let shared_node_record = Arc::new(Mutex::new(local_node_record));
577
578 Self {
579 local_address,
580 local_eip_868_enr,
581 local_node_record,
582 shared_node_record,
583 _socket: socket,
584 kbuckets,
585 secret_key,
586 _tasks: tasks,
587 ingress: ingress_rx,
588 egress: egress_tx,
589 queued_pings: VecDeque::with_capacity(MAX_QUEUED_PINGS),
590 pending_pings: Default::default(),
591 pending_lookup: Default::default(),
592 pending_find_nodes: Default::default(),
593 pending_enr_requests: Default::default(),
594 commands_rx,
595 to_service,
596 update_listeners: Vec::with_capacity(1),
597 lookup_interval: self_lookup_interval,
598 ping_interval,
599 evict_expired_requests_interval,
600 lookup_rotator,
601 resolve_external_ip_interval: config.resolve_external_ip_interval(),
602 config,
603 queued_events: Default::default(),
604 received_pongs: Default::default(),
605 expire_interval: tokio::time::interval(EXPIRE_DURATION),
606 }
607 }
608
609 pub fn handle(&self) -> Discv4 {
611 Discv4 {
612 local_addr: self.local_address,
613 to_service: self.to_service.clone(),
614 node_record: self.shared_node_record.clone(),
615 }
616 }
617
618 fn enr_seq(&self) -> Option<u64> {
620 self.config.enable_eip868.then(|| self.local_eip_868_enr.seq())
621 }
622
623 pub fn set_lookup_interval(&mut self, duration: Duration) {
625 self.lookup_interval = tokio::time::interval(duration);
626 }
627
628 fn resolve_external_ip(&mut self) {
632 if let Some(r) = &self.resolve_external_ip_interval &&
633 let Some(external_ip) =
634 r.resolver().clone().as_external_ip(self.local_node_record.udp_port)
635 {
636 self.set_external_ip_addr(external_ip);
637 }
638 }
639
640 pub fn set_external_ip_addr(&mut self, external_ip: IpAddr) {
643 if self.local_node_record.address != external_ip {
644 debug!(target: "discv4", ?external_ip, "Updating external ip");
645 self.local_node_record.address = external_ip;
646 let _ = self.local_eip_868_enr.set_ip(external_ip, &self.secret_key);
647 let mut lock = self.shared_node_record.lock();
648 *lock = self.local_node_record;
649 debug!(target: "discv4", enr=?self.local_eip_868_enr, "Updated local ENR");
650 }
651 }
652
653 pub const fn local_peer_id(&self) -> &PeerId {
655 &self.local_node_record.id
656 }
657
658 pub const fn local_addr(&self) -> SocketAddr {
660 self.local_address
661 }
662
663 pub const fn local_enr(&self) -> NodeRecord {
667 self.local_node_record
668 }
669
670 #[cfg(test)]
672 pub const fn local_enr_mut(&mut self) -> &mut NodeRecord {
673 &mut self.local_node_record
674 }
675
676 pub fn contains_node(&self, id: PeerId) -> bool {
678 let key = kad_key(id);
679 self.kbuckets.get_index(&key).is_some()
680 }
681
682 pub fn bootstrap(&mut self) {
697 for record in self.config.bootstrap_nodes.clone() {
698 debug!(target: "discv4", ?record, "pinging boot node");
699 let key = kad_key(record.id);
700 let entry = NodeEntry::new(record);
701
702 match self.kbuckets.insert_or_update(
704 &key,
705 entry,
706 NodeStatus {
707 state: ConnectionState::Disconnected,
708 direction: ConnectionDirection::Outgoing,
709 },
710 ) {
711 InsertResult::Failed(_) => {}
712 _ => {
713 self.try_ping(record, PingReason::InitialInsert);
714 }
715 }
716 }
717 }
718
719 pub fn spawn(mut self) -> JoinHandle<()> {
723 tokio::task::spawn(async move {
724 self.bootstrap();
725
726 while let Some(event) = self.next().await {
727 trace!(target: "discv4", ?event, "processed");
728 }
729 trace!(target: "discv4", "service terminated");
730 })
731 }
732
733 pub fn update_stream(&mut self) -> ReceiverStream<DiscoveryUpdate> {
735 let (tx, rx) = mpsc::channel(512);
736 self.update_listeners.push(tx);
737 ReceiverStream::new(rx)
738 }
739
740 pub fn lookup_self(&mut self) {
742 self.lookup(self.local_node_record.id)
743 }
744
745 pub fn lookup(&mut self, target: PeerId) {
755 self.lookup_with(target, None)
756 }
757
758 fn lookup_with(&mut self, target: PeerId, tx: Option<NodeRecordSender>) {
768 trace!(target: "discv4", ?target, "Starting lookup");
769 let target_key = kad_key(target);
770
771 let ctx = LookupContext::new(
774 target_key.clone(),
775 self.kbuckets
776 .closest_values(&target_key)
777 .filter(|node| {
778 node.value.has_endpoint_proof &&
779 !self.pending_find_nodes.contains_key(&node.key.preimage().0)
780 })
781 .take(MAX_NODES_PER_BUCKET)
782 .map(|n| (target_key.distance(&n.key), n.value.record)),
783 tx,
784 );
785
786 let closest = ctx.closest(ALPHA);
788
789 if closest.is_empty() && self.pending_find_nodes.is_empty() {
790 self.bootstrap();
795 return
796 }
797
798 trace!(target: "discv4", ?target, num = closest.len(), "Start lookup closest nodes");
799
800 for node in closest {
801 self.find_node_checked(&node, ctx.clone());
805 }
806 }
807
808 fn find_node(&mut self, node: &NodeRecord, ctx: LookupContext) {
812 trace!(target: "discv4", ?node, lookup=?ctx.target(), "Sending FindNode");
813 ctx.mark_queried(node.id);
814 let id = ctx.target();
815 let msg = Message::FindNode(FindNode { id, expire: self.find_node_expiration() });
816 self.send_packet(msg, node.udp_addr());
817 self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx));
818 }
819
820 fn find_node_checked(&mut self, node: &NodeRecord, ctx: LookupContext) {
825 let max_failures = self.config.max_find_node_failures;
826 let needs_ping = self
827 .on_entry(node.id, |entry| entry.exceeds_find_node_failures(max_failures))
828 .unwrap_or(true);
829 if needs_ping {
830 self.try_ping(*node, PingReason::Lookup(*node, ctx))
831 } else {
832 self.find_node(node, ctx)
833 }
834 }
835
836 fn notify(&mut self, update: DiscoveryUpdate) {
840 self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) {
841 Ok(()) => true,
842 Err(err) => match err {
843 TrySendError::Full(_) => true,
844 TrySendError::Closed(_) => false,
845 },
846 });
847 }
848
849 pub fn ban_ip(&mut self, ip: IpAddr) {
851 self.config.ban_list.ban_ip(ip);
852 }
853
854 pub fn ban_node(&mut self, node_id: PeerId) {
856 self.remove_node(node_id);
857 self.config.ban_list.ban_peer(node_id);
858 }
859
860 pub fn ban_ip_until(&mut self, ip: IpAddr, until: Instant) {
862 self.config.ban_list.ban_ip_until(ip, until);
863 }
864
865 pub fn ban_node_until(&mut self, node_id: PeerId, until: Instant) {
867 self.remove_node(node_id);
868 self.config.ban_list.ban_peer_until(node_id, until);
869 }
870
871 pub fn remove_node(&mut self, node_id: PeerId) -> bool {
876 let key = kad_key(node_id);
877 self.remove_key(node_id, key)
878 }
879
880 pub fn soft_remove_node(&mut self, node_id: PeerId) -> bool {
885 let key = kad_key(node_id);
886 let Some(bucket) = self.kbuckets.get_bucket(&key) else { return false };
887 if bucket.num_entries() < MAX_NODES_PER_BUCKET / 2 {
888 return false
890 }
891 self.remove_key(node_id, key)
892 }
893
894 fn remove_key(&mut self, node_id: PeerId, key: discv5::Key<NodeKey>) -> bool {
895 let removed = self.kbuckets.remove(&key);
896 if removed {
897 trace!(target: "discv4", ?node_id, "removed node");
898 self.notify(DiscoveryUpdate::Removed(node_id));
899 }
900 removed
901 }
902
903 pub fn num_connected(&self) -> usize {
905 self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected())
906 }
907
908 fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool {
910 if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) &&
911 timestamp.elapsed() < self.config.bond_expiration
912 {
913 return true
914 }
915 false
916 }
917
918 fn on_entry<F, R>(&mut self, peer_id: PeerId, f: F) -> Option<R>
920 where
921 F: FnOnce(&NodeEntry) -> R,
922 {
923 let key = kad_key(peer_id);
924 match self.kbuckets.entry(&key) {
925 BucketEntry::Present(entry, _) => Some(f(entry.value())),
926 BucketEntry::Pending(mut entry, _) => Some(f(entry.value())),
927 _ => None,
928 }
929 }
930
931 fn update_on_reping(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
938 if record.id == self.local_node_record.id {
939 return
940 }
941
942 if !self.config.enable_eip868 {
944 last_enr_seq = None;
945 }
946
947 let key = kad_key(record.id);
948 let old_enr = match self.kbuckets.entry(&key) {
949 kbucket::Entry::Present(mut entry, _) => {
950 entry.value_mut().update_with_enr(last_enr_seq)
951 }
952 kbucket::Entry::Pending(mut entry, _) => entry.value().update_with_enr(last_enr_seq),
953 _ => return,
954 };
955
956 match (last_enr_seq, old_enr) {
958 (Some(new), Some(old)) => {
959 if new > old {
960 self.send_enr_request(record);
961 }
962 }
963 (Some(_), None) => {
964 self.send_enr_request(record);
966 }
967 _ => {}
968 };
969 }
970
971 fn update_on_pong(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
973 if record.id == *self.local_peer_id() {
974 return
975 }
976
977 if !self.config.enable_eip868 {
979 last_enr_seq = None;
980 }
981
982 let has_enr_seq = last_enr_seq.is_some();
985
986 let key = kad_key(record.id);
987 match self.kbuckets.entry(&key) {
988 kbucket::Entry::Present(mut entry, old_status) => {
989 entry.value_mut().establish_proof();
991 entry.value_mut().update_with_enr(last_enr_seq);
992
993 if !old_status.is_connected() {
994 let _ = entry.update(ConnectionState::Connected, Some(old_status.direction));
995 trace!(target: "discv4", ?record, "added after successful endpoint proof");
996 self.notify(DiscoveryUpdate::Added(record));
997
998 if has_enr_seq {
999 self.send_enr_request(record);
1001 }
1002 }
1003 }
1004 kbucket::Entry::Pending(mut entry, mut status) => {
1005 entry.value().establish_proof();
1007 entry.value().update_with_enr(last_enr_seq);
1008
1009 if !status.is_connected() {
1010 status.state = ConnectionState::Connected;
1011 let _ = entry.update(status);
1012 trace!(target: "discv4", ?record, "added after successful endpoint proof");
1013 self.notify(DiscoveryUpdate::Added(record));
1014
1015 if has_enr_seq {
1016 self.send_enr_request(record);
1018 }
1019 }
1020 }
1021 _ => {}
1022 };
1023 }
1024
1025 pub fn add_all_nodes(&mut self, records: impl IntoIterator<Item = NodeRecord>) {
1029 for record in records {
1030 self.add_node(record);
1031 }
1032 }
1033
1034 pub fn add_node(&mut self, record: NodeRecord) -> bool {
1040 let key = kad_key(record.id);
1041 match self.kbuckets.entry(&key) {
1042 kbucket::Entry::Absent(entry) => {
1043 let node = NodeEntry::new(record);
1044 match entry.insert(
1045 node,
1046 NodeStatus {
1047 direction: ConnectionDirection::Outgoing,
1048 state: ConnectionState::Disconnected,
1049 },
1050 ) {
1051 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1052 trace!(target: "discv4", ?record, "inserted new record");
1053 }
1054 _ => return false,
1055 }
1056 }
1057 _ => return false,
1058 }
1059
1060 self.try_ping(record, PingReason::InitialInsert);
1062 true
1063 }
1064
1065 pub(crate) fn send_packet(&self, msg: Message, to: SocketAddr) -> B256 {
1067 let (payload, hash) = msg.encode(&self.secret_key);
1068 trace!(target: "discv4", r#type=?msg.msg_type(), ?to, ?hash, "sending packet");
1069 let _ = self.egress.try_send((payload, to)).map_err(|err| {
1070 debug!(
1071 target: "discv4",
1072 %err,
1073 "dropped outgoing packet",
1074 );
1075 });
1076 hash
1077 }
1078
1079 fn on_ping(&mut self, ping: Ping, remote_addr: SocketAddr, remote_id: PeerId, hash: B256) {
1081 if self.is_expired(ping.expire) {
1082 return
1084 }
1085
1086 let record = NodeRecord {
1088 address: remote_addr.ip(),
1089 udp_port: remote_addr.port(),
1090 tcp_port: ping.from.tcp_port,
1091 id: remote_id,
1092 }
1093 .into_ipv4_mapped();
1094
1095 let key = kad_key(record.id);
1096
1097 let mut is_new_insert = false;
1104 let mut needs_bond = false;
1105 let mut is_proven = false;
1106
1107 let old_enr = match self.kbuckets.entry(&key) {
1108 kbucket::Entry::Present(mut entry, _) => {
1109 if entry.value().is_expired() {
1110 needs_bond = true;
1113 } else {
1114 is_proven = entry.value().has_endpoint_proof;
1115 }
1116 entry.value_mut().update_with_enr(ping.enr_sq)
1117 }
1118 kbucket::Entry::Pending(mut entry, _) => {
1119 if entry.value().is_expired() {
1120 needs_bond = true;
1123 } else {
1124 is_proven = entry.value().has_endpoint_proof;
1125 }
1126 entry.value().update_with_enr(ping.enr_sq)
1127 }
1128 kbucket::Entry::Absent(entry) => {
1129 let mut node = NodeEntry::new(record);
1130 node.last_enr_seq = ping.enr_sq;
1131
1132 match entry.insert(
1133 node,
1134 NodeStatus {
1135 direction: ConnectionDirection::Incoming,
1136 state: ConnectionState::Disconnected,
1138 },
1139 ) {
1140 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1141 is_new_insert = true;
1143 }
1144 BucketInsertResult::Full => {
1145 trace!(target: "discv4", ?record, "discovered new record but bucket is full");
1149 self.notify(DiscoveryUpdate::DiscoveredAtCapacity(record));
1150 needs_bond = true;
1151 }
1152 BucketInsertResult::TooManyIncoming | BucketInsertResult::NodeExists => {
1153 needs_bond = true;
1154 }
1156 BucketInsertResult::FailedFilter => return,
1157 }
1158
1159 None
1160 }
1161 kbucket::Entry::SelfEntry => return,
1162 };
1163
1164 let pong = Message::Pong(Pong {
1167 to: record.into(),
1169 echo: hash,
1170 expire: ping.expire,
1171 enr_sq: self.enr_seq(),
1172 });
1173 self.send_packet(pong, remote_addr);
1174
1175 if is_new_insert {
1177 self.try_ping(record, PingReason::InitialInsert);
1178 } else if needs_bond {
1179 self.try_ping(record, PingReason::EstablishBond);
1180 } else if is_proven {
1181 if let Some((_, ctx)) = self.pending_lookup.remove(&record.id) {
1185 if self.pending_find_nodes.contains_key(&record.id) {
1186 ctx.unmark_queried(record.id);
1189 } else {
1190 self.find_node(&record, ctx);
1193 }
1194 }
1195 } else {
1196 match (ping.enr_sq, old_enr) {
1198 (Some(new), Some(old)) => {
1199 if new > old {
1200 self.send_enr_request(record);
1201 }
1202 }
1203 (Some(_), None) => {
1204 self.send_enr_request(record);
1205 }
1206 _ => {}
1207 };
1208 }
1209 }
1210
1211 fn try_ping(&mut self, node: NodeRecord, reason: PingReason) {
1213 if node.id == *self.local_peer_id() {
1214 return
1216 }
1217
1218 if self.pending_pings.contains_key(&node.id) ||
1219 self.pending_find_nodes.contains_key(&node.id)
1220 {
1221 return
1222 }
1223
1224 if self.queued_pings.iter().any(|(n, _)| n.id == node.id) {
1225 return
1226 }
1227
1228 if self.pending_pings.len() < MAX_NODES_PING {
1229 self.send_ping(node, reason);
1230 } else if self.queued_pings.len() < MAX_QUEUED_PINGS {
1231 self.queued_pings.push_back((node, reason));
1232 }
1233 }
1234
1235 pub(crate) fn send_ping(&mut self, node: NodeRecord, reason: PingReason) -> B256 {
1239 let remote_addr = node.udp_addr();
1240 let id = node.id;
1241 let ping = Ping {
1242 from: self.local_node_record.into(),
1243 to: node.into(),
1244 expire: self.ping_expiration(),
1245 enr_sq: self.enr_seq(),
1246 };
1247 trace!(target: "discv4", ?ping, "sending ping");
1248 let echo_hash = self.send_packet(Message::Ping(ping), remote_addr);
1249
1250 self.pending_pings
1251 .insert(id, PingRequest { sent_at: Instant::now(), node, echo_hash, reason });
1252 echo_hash
1253 }
1254
1255 pub(crate) fn send_enr_request(&mut self, node: NodeRecord) {
1259 if !self.config.enable_eip868 {
1260 return
1261 }
1262 let remote_addr = node.udp_addr();
1263 let enr_request = EnrRequest { expire: self.enr_request_expiration() };
1264
1265 trace!(target: "discv4", ?enr_request, "sending enr request");
1266 let echo_hash = self.send_packet(Message::EnrRequest(enr_request), remote_addr);
1267
1268 self.pending_enr_requests
1269 .insert(node.id, EnrRequestState { sent_at: Instant::now(), echo_hash });
1270 }
1271
1272 fn on_pong(&mut self, pong: Pong, remote_addr: SocketAddr, remote_id: PeerId) {
1274 if self.is_expired(pong.expire) {
1275 return
1276 }
1277
1278 let PingRequest { node, reason, .. } = match self.pending_pings.entry(remote_id) {
1279 Entry::Occupied(entry) => {
1280 {
1281 let request = entry.get();
1282 if request.echo_hash != pong.echo {
1283 trace!(target: "discv4", from=?remote_addr, expected=?request.echo_hash, echo_hash=?pong.echo,"Got unexpected Pong");
1284 return
1285 }
1286 }
1287 entry.remove()
1288 }
1289 Entry::Vacant(_) => return,
1290 };
1291
1292 self.received_pongs.on_pong(remote_id, remote_addr.ip());
1294
1295 match reason {
1296 PingReason::InitialInsert => {
1297 self.update_on_pong(node, pong.enr_sq);
1298 }
1299 PingReason::EstablishBond => {
1300 self.update_on_pong(node, pong.enr_sq);
1302 }
1303 PingReason::RePing => {
1304 self.update_on_reping(node, pong.enr_sq);
1305 }
1306 PingReason::Lookup(node, ctx) => {
1307 self.update_on_pong(node, pong.enr_sq);
1308 self.pending_lookup.insert(node.id, (Instant::now(), ctx));
1313 }
1314 }
1315 }
1316
1317 fn on_find_node(&mut self, msg: FindNode, remote_addr: SocketAddr, node_id: PeerId) {
1319 if self.is_expired(msg.expire) {
1320 return
1322 }
1323 if node_id == *self.local_peer_id() {
1324 return
1326 }
1327
1328 if self.has_bond(node_id, remote_addr.ip()) {
1329 self.respond_closest(msg.id, remote_addr)
1330 }
1331 }
1332
1333 fn on_enr_response(&mut self, msg: EnrResponse, remote_addr: SocketAddr, id: PeerId) {
1335 trace!(target: "discv4", ?remote_addr, ?msg, "received ENR response");
1336 if let Some(resp) = self.pending_enr_requests.remove(&id) {
1337 let enr_id = pk2id(&msg.enr.public_key());
1339 if id != enr_id {
1340 return
1341 }
1342
1343 if resp.echo_hash == msg.request_hash {
1344 let key = kad_key(id);
1345 let fork_id = msg.eth_fork_id();
1346 let (record, old_fork_id) = match self.kbuckets.entry(&key) {
1347 kbucket::Entry::Present(mut entry, _) => {
1348 let id = entry.value_mut().update_with_fork_id(fork_id);
1349 (entry.value().record, id)
1350 }
1351 kbucket::Entry::Pending(mut entry, _) => {
1352 let id = entry.value().update_with_fork_id(fork_id);
1353 (entry.value().record, id)
1354 }
1355 _ => return,
1356 };
1357 match (fork_id, old_fork_id) {
1358 (Some(new), Some(old)) => {
1359 if new != old {
1360 self.notify(DiscoveryUpdate::EnrForkId(record, new))
1361 }
1362 }
1363 (Some(new), None) => self.notify(DiscoveryUpdate::EnrForkId(record, new)),
1364 _ => {}
1365 }
1366 }
1367 }
1368 }
1369
1370 fn on_enr_request(
1372 &self,
1373 msg: EnrRequest,
1374 remote_addr: SocketAddr,
1375 id: PeerId,
1376 request_hash: B256,
1377 ) {
1378 if !self.config.enable_eip868 || self.is_expired(msg.expire) {
1379 return
1380 }
1381
1382 if self.has_bond(id, remote_addr.ip()) {
1383 self.send_packet(
1384 Message::EnrResponse(EnrResponse {
1385 request_hash,
1386 enr: self.local_eip_868_enr.clone(),
1387 }),
1388 remote_addr,
1389 );
1390 }
1391 }
1392
1393 fn on_neighbours(&mut self, msg: Neighbours, remote_addr: SocketAddr, node_id: PeerId) {
1396 if self.is_expired(msg.expire) {
1397 return
1399 }
1400 let ctx = match self.pending_find_nodes.entry(node_id) {
1402 Entry::Occupied(mut entry) => {
1403 {
1404 let request = entry.get_mut();
1405 request.answered = true;
1407 let total = request.response_count + msg.nodes.len();
1408
1409 if total <= MAX_NODES_PER_BUCKET {
1411 request.response_count = total;
1412 } else {
1413 trace!(target: "discv4", total, from=?remote_addr, "Received neighbors packet entries exceeds max nodes per bucket");
1414 return
1415 }
1416 };
1417
1418 if entry.get().response_count == MAX_NODES_PER_BUCKET {
1419 let ctx = entry.remove().lookup_context;
1421 ctx.mark_responded(node_id);
1422 ctx
1423 } else {
1424 entry.get().lookup_context.clone()
1425 }
1426 }
1427 Entry::Vacant(_) => {
1428 trace!(target: "discv4", from=?remote_addr, "Received unsolicited Neighbours");
1430 return
1431 }
1432 };
1433
1434 trace!(target: "discv4",
1436 target=format!("{:#?}", node_id),
1437 peers_count=msg.nodes.len(),
1438 peers=format!("[{:#}]", msg.nodes.iter()
1439 .map(|node_rec| node_rec.id
1440 ).format(", ")),
1441 "Received peers from Neighbours packet"
1442 );
1443
1444 for node in msg.nodes.into_iter().map(NodeRecord::into_ipv4_mapped) {
1447 if self.config.ban_list.is_banned(&node.id, &node.address) {
1449 trace!(target: "discv4", peer_id=?node.id, ip=?node.address, "ignoring banned record");
1450 continue
1451 }
1452
1453 ctx.add_node(node);
1454 }
1455
1456 let closest =
1458 ctx.filter_closest(ALPHA, |node| !self.pending_find_nodes.contains_key(&node.id));
1459
1460 for closest in closest {
1461 let key = kad_key(closest.id);
1462 match self.kbuckets.entry(&key) {
1463 BucketEntry::Absent(entry) => {
1464 ctx.mark_queried(closest.id);
1470 let node = NodeEntry::new(closest);
1471 match entry.insert(
1472 node,
1473 NodeStatus {
1474 direction: ConnectionDirection::Outgoing,
1475 state: ConnectionState::Disconnected,
1476 },
1477 ) {
1478 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1479 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1481 }
1482 BucketInsertResult::Full => {
1483 self.notify(DiscoveryUpdate::DiscoveredAtCapacity(closest))
1485 }
1486 _ => {}
1487 }
1488 }
1489 BucketEntry::SelfEntry => {
1490 }
1492 BucketEntry::Present(entry, _) => {
1493 if entry.value().has_endpoint_proof {
1494 if entry
1495 .value()
1496 .exceeds_find_node_failures(self.config.max_find_node_failures)
1497 {
1498 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1499 } else {
1500 self.find_node(&closest, ctx.clone());
1501 }
1502 }
1503 }
1504 BucketEntry::Pending(mut entry, _) => {
1505 if entry.value().has_endpoint_proof {
1506 if entry
1507 .value()
1508 .exceeds_find_node_failures(self.config.max_find_node_failures)
1509 {
1510 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1511 } else {
1512 self.find_node(&closest, ctx.clone());
1513 }
1514 }
1515 }
1516 }
1517 }
1518 }
1519
1520 fn respond_closest(&mut self, target: PeerId, to: SocketAddr) {
1522 let key = kad_key(target);
1523 let expire = self.send_neighbours_expiration();
1524
1525 let closest_nodes =
1527 self.kbuckets.closest_values(&key).take(MAX_NODES_PER_BUCKET).collect::<Vec<_>>();
1528
1529 for nodes in closest_nodes.chunks(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) {
1530 let nodes = nodes.iter().map(|node| node.value.record).collect::<Vec<NodeRecord>>();
1531 trace!(target: "discv4", len = nodes.len(), to=?to,"Sent neighbours packet");
1532 let msg = Message::Neighbours(Neighbours { nodes, expire });
1533 self.send_packet(msg, to);
1534 }
1535 }
1536
1537 fn evict_expired_requests(&mut self, now: Instant) {
1538 self.pending_enr_requests.retain(|_node_id, enr_request| {
1539 now.duration_since(enr_request.sent_at) < self.config.enr_expiration
1540 });
1541
1542 let mut failed_pings = Vec::new();
1543 self.pending_pings.retain(|node_id, ping_request| {
1544 if now.duration_since(ping_request.sent_at) > self.config.ping_expiration {
1545 failed_pings.push(*node_id);
1546 return false
1547 }
1548 true
1549 });
1550
1551 if !failed_pings.is_empty() {
1552 trace!(target: "discv4", num=%failed_pings.len(), "evicting nodes due to failed pong");
1554 for node_id in failed_pings {
1555 self.remove_node(node_id);
1556 }
1557 }
1558
1559 let mut failed_lookups = Vec::new();
1560 self.pending_lookup.retain(|node_id, (lookup_sent_at, _)| {
1561 if now.duration_since(*lookup_sent_at) > self.config.request_timeout {
1562 failed_lookups.push(*node_id);
1563 return false
1564 }
1565 true
1566 });
1567
1568 if !failed_lookups.is_empty() {
1569 trace!(target: "discv4", num=%failed_lookups.len(), "evicting nodes due to failed lookup");
1571 for node_id in failed_lookups {
1572 self.remove_node(node_id);
1573 }
1574 }
1575
1576 self.evict_failed_find_nodes(now);
1577 }
1578
1579 fn evict_failed_find_nodes(&mut self, now: Instant) {
1581 let mut failed_find_nodes = Vec::new();
1582 self.pending_find_nodes.retain(|node_id, find_node_request| {
1583 if now.duration_since(find_node_request.sent_at) > self.config.neighbours_expiration {
1584 if !find_node_request.answered {
1585 failed_find_nodes.push(*node_id);
1588 }
1589 return false
1590 }
1591 true
1592 });
1593
1594 if failed_find_nodes.is_empty() {
1595 return
1596 }
1597
1598 trace!(target: "discv4", num=%failed_find_nodes.len(), "processing failed find nodes");
1599
1600 for node_id in failed_find_nodes {
1601 let key = kad_key(node_id);
1602 let failures = match self.kbuckets.entry(&key) {
1603 kbucket::Entry::Present(mut entry, _) => {
1604 entry.value_mut().inc_failed_request();
1605 entry.value().find_node_failures
1606 }
1607 kbucket::Entry::Pending(mut entry, _) => {
1608 entry.value().inc_failed_request();
1609 entry.value().find_node_failures
1610 }
1611 _ => continue,
1612 };
1613
1614 if failures > self.config.max_find_node_failures {
1618 self.soft_remove_node(node_id);
1619 }
1620 }
1621 }
1622
1623 fn re_ping_oldest(&mut self) {
1628 let mut nodes = self
1629 .kbuckets
1630 .iter_ref()
1631 .filter(|entry| entry.node.value.is_expired())
1632 .map(|n| n.node.value)
1633 .collect::<Vec<_>>();
1634 nodes.sort_by(|a, b| a.last_seen.cmp(&b.last_seen));
1635 let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::<Vec<_>>();
1636 for node in to_ping {
1637 self.try_ping(node, PingReason::RePing)
1638 }
1639 }
1640
1641 fn is_expired(&self, expiration: u64) -> bool {
1643 self.ensure_not_expired(expiration).is_err()
1644 }
1645
1646 fn ensure_not_expired(&self, timestamp: u64) -> Result<(), ()> {
1656 let _ = i64::try_from(timestamp).map_err(drop)?;
1658
1659 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
1660 if self.config.enforce_expiration_timestamps && timestamp < now {
1661 trace!(target: "discv4", "Expired packet");
1662 return Err(())
1663 }
1664 Ok(())
1665 }
1666
1667 fn ping_buffered(&mut self) {
1669 while self.pending_pings.len() < MAX_NODES_PING {
1670 match self.queued_pings.pop_front() {
1671 Some((next, reason)) => self.try_ping(next, reason),
1672 None => break,
1673 }
1674 }
1675 }
1676
1677 fn ping_expiration(&self) -> u64 {
1678 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.ping_expiration)
1679 .as_secs()
1680 }
1681
1682 fn find_node_expiration(&self) -> u64 {
1683 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.request_timeout)
1684 .as_secs()
1685 }
1686
1687 fn enr_request_expiration(&self) -> u64 {
1688 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.enr_expiration)
1689 .as_secs()
1690 }
1691
1692 fn send_neighbours_expiration(&self) -> u64 {
1693 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.neighbours_expiration)
1694 .as_secs()
1695 }
1696
1697 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Discv4Event> {
1703 loop {
1704 if let Some(event) = self.queued_events.pop_front() {
1706 return Poll::Ready(event)
1707 }
1708
1709 if self.config.enable_lookup {
1711 while self.lookup_interval.poll_tick(cx).is_ready() {
1712 let target = self.lookup_rotator.next(&self.local_node_record.id);
1713 self.lookup_with(target, None);
1714 }
1715 }
1716
1717 while self.ping_interval.poll_tick(cx).is_ready() {
1719 self.re_ping_oldest();
1720 }
1721
1722 if let Some(Poll::Ready(Some(ip))) =
1723 self.resolve_external_ip_interval.as_mut().map(|r| r.poll_tick(cx))
1724 {
1725 self.set_external_ip_addr(ip);
1726 }
1727
1728 while let Poll::Ready(Some(cmd)) = self.commands_rx.poll_recv(cx) {
1730 match cmd {
1731 Discv4Command::Add(enr) => {
1732 self.add_node(enr);
1733 }
1734 Discv4Command::Lookup { node_id, tx } => {
1735 let node_id = node_id.unwrap_or(self.local_node_record.id);
1736 self.lookup_with(node_id, tx);
1737 }
1738 Discv4Command::SetLookupInterval(duration) => {
1739 self.set_lookup_interval(duration);
1740 }
1741 Discv4Command::Updates(tx) => {
1742 let rx = self.update_stream();
1743 let _ = tx.send(rx);
1744 }
1745 Discv4Command::BanPeer(node_id) => self.ban_node(node_id),
1746 Discv4Command::Remove(node_id) => {
1747 self.remove_node(node_id);
1748 }
1749 Discv4Command::Ban(node_id, ip) => {
1750 self.ban_node(node_id);
1751 self.ban_ip(ip);
1752 }
1753 Discv4Command::BanIp(ip) => {
1754 self.ban_ip(ip);
1755 }
1756 Discv4Command::SetEIP868RLPPair { key, rlp } => {
1757 debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair");
1758
1759 let _ = self.local_eip_868_enr.insert_raw_rlp(key, rlp, &self.secret_key);
1760 }
1761 Discv4Command::SetTcpPort(port) => {
1762 debug!(target: "discv4", %port, "Update tcp port");
1763 self.local_node_record.tcp_port = port;
1764 if self.local_node_record.address.is_ipv4() {
1765 let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key);
1766 } else {
1767 let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key);
1768 }
1769 }
1770
1771 Discv4Command::Terminated => {
1772 self.queued_events.push_back(Discv4Event::Terminated);
1774 }
1775 }
1776 }
1777
1778 let mut udp_message_budget = UDP_MESSAGE_POLL_LOOP_BUDGET;
1780
1781 while let Poll::Ready(Some(event)) = self.ingress.poll_recv(cx) {
1783 match event {
1784 IngressEvent::RecvError(err) => {
1785 debug!(target: "discv4", %err, "failed to read datagram");
1786 }
1787 IngressEvent::BadPacket(from, err, data) => {
1788 trace!(target: "discv4", ?from, %err, packet=?hex::encode(&data), "bad packet");
1789 }
1790 IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => {
1791 trace!(target: "discv4", r#type=?msg.msg_type(), from=?remote_addr,"received packet");
1792 let event = match msg {
1793 Message::Ping(ping) => {
1794 self.on_ping(ping, remote_addr, node_id, hash);
1795 Discv4Event::Ping
1796 }
1797 Message::Pong(pong) => {
1798 self.on_pong(pong, remote_addr, node_id);
1799 Discv4Event::Pong
1800 }
1801 Message::FindNode(msg) => {
1802 self.on_find_node(msg, remote_addr, node_id);
1803 Discv4Event::FindNode
1804 }
1805 Message::Neighbours(msg) => {
1806 self.on_neighbours(msg, remote_addr, node_id);
1807 Discv4Event::Neighbours
1808 }
1809 Message::EnrRequest(msg) => {
1810 self.on_enr_request(msg, remote_addr, node_id, hash);
1811 Discv4Event::EnrRequest
1812 }
1813 Message::EnrResponse(msg) => {
1814 self.on_enr_response(msg, remote_addr, node_id);
1815 Discv4Event::EnrResponse
1816 }
1817 };
1818
1819 self.queued_events.push_back(event);
1820 }
1821 }
1822
1823 udp_message_budget -= 1;
1824 if udp_message_budget < 0 {
1825 trace!(target: "discv4", budget=UDP_MESSAGE_POLL_LOOP_BUDGET, "exhausted message poll budget");
1826 if self.queued_events.is_empty() {
1827 cx.waker().wake_by_ref();
1830 }
1831 break
1832 }
1833 }
1834
1835 self.ping_buffered();
1837
1838 while self.evict_expired_requests_interval.poll_tick(cx).is_ready() {
1840 self.evict_expired_requests(Instant::now());
1841 }
1842
1843 while self.expire_interval.poll_tick(cx).is_ready() {
1845 self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION);
1846 }
1847
1848 if self.queued_events.is_empty() {
1849 return Poll::Pending
1850 }
1851 }
1852 }
1853}
1854
1855impl Stream for Discv4Service {
1857 type Item = Discv4Event;
1858
1859 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1860 match ready!(self.get_mut().poll(cx)) {
1862 Discv4Event::Terminated => Poll::Ready(None),
1864 ev => Poll::Ready(Some(ev)),
1866 }
1867 }
1868}
1869
1870impl fmt::Debug for Discv4Service {
1871 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1872 f.debug_struct("Discv4Service")
1873 .field("local_address", &self.local_address)
1874 .field("local_peer_id", &self.local_peer_id())
1875 .field("local_node_record", &self.local_node_record)
1876 .field("queued_pings", &self.queued_pings)
1877 .field("pending_lookup", &self.pending_lookup)
1878 .field("pending_find_nodes", &self.pending_find_nodes)
1879 .field("lookup_interval", &self.lookup_interval)
1880 .finish_non_exhaustive()
1881 }
1882}
1883
1884#[derive(Debug, Eq, PartialEq)]
1888pub enum Discv4Event {
1889 Ping,
1891 Pong,
1893 FindNode,
1895 Neighbours,
1897 EnrRequest,
1899 EnrResponse,
1901 Terminated,
1903}
1904
1905pub(crate) async fn send_loop(udp: Arc<UdpSocket>, rx: EgressReceiver) {
1907 let mut stream = ReceiverStream::new(rx);
1908 while let Some((payload, to)) = stream.next().await {
1909 match udp.send_to(&payload, to).await {
1910 Ok(size) => {
1911 trace!(target: "discv4", ?to, ?size,"sent payload");
1912 }
1913 Err(err) => {
1914 debug!(target: "discv4", ?to, %err,"Failed to send datagram.");
1915 }
1916 }
1917 }
1918}
1919
1920const MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP: usize = 60usize;
1922
1923pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_id: PeerId) {
1928 let send = |event: IngressEvent| async {
1929 let _ = tx.send(event).await.map_err(|err| {
1930 debug!(
1931 target: "discv4",
1932 %err,
1933 "failed send incoming packet",
1934 )
1935 });
1936 };
1937
1938 let mut cache = ReceiveCache::default();
1939
1940 let tick = MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP / 2;
1942 let mut interval = tokio::time::interval(Duration::from_secs(tick as u64));
1943
1944 let mut buf = [0; MAX_PACKET_SIZE];
1945 loop {
1946 let res = udp.recv_from(&mut buf).await;
1947 match res {
1948 Err(err) => {
1949 debug!(target: "discv4", %err, "Failed to read datagram.");
1950 send(IngressEvent::RecvError(err)).await;
1951 }
1952 Ok((read, remote_addr)) => {
1953 if cache.inc_ip(remote_addr.ip()) > MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP {
1955 trace!(target: "discv4", ?remote_addr, "Too many incoming packets from IP.");
1956 continue
1957 }
1958
1959 let packet = &buf[..read];
1960 match Message::decode(packet) {
1961 Ok(packet) => {
1962 if packet.node_id == local_id {
1963 debug!(target: "discv4", ?remote_addr, "Received own packet.");
1965 continue
1966 }
1967
1968 if cache.contains_packet(packet.hash) {
1970 debug!(target: "discv4", ?remote_addr, "Received duplicate packet.");
1971 continue
1972 }
1973
1974 send(IngressEvent::Packet(remote_addr, packet)).await;
1975 }
1976 Err(err) => {
1977 trace!(target: "discv4", %err,"Failed to decode packet");
1978 send(IngressEvent::BadPacket(remote_addr, err, packet.to_vec())).await
1979 }
1980 }
1981 }
1982 }
1983
1984 if poll_fn(|cx| match interval.poll_tick(cx) {
1986 Poll::Ready(_) => Poll::Ready(true),
1987 Poll::Pending => Poll::Ready(false),
1988 })
1989 .await
1990 {
1991 cache.tick_ips(tick);
1992 }
1993 }
1994}
1995
1996struct ReceiveCache {
2000 ip_messages: HashMap<IpAddr, usize>,
2006 unique_packets: schnellru::LruMap<B256, ()>,
2008}
2009
2010impl ReceiveCache {
2011 fn tick_ips(&mut self, tick: usize) {
2015 self.ip_messages.retain(|_, count| {
2016 if let Some(reset) = count.checked_sub(tick) {
2017 *count = reset;
2018 true
2019 } else {
2020 false
2021 }
2022 });
2023 }
2024
2025 fn inc_ip(&mut self, ip: IpAddr) -> usize {
2027 let ctn = self.ip_messages.entry(ip).or_default();
2028 *ctn = ctn.saturating_add(1);
2029 *ctn
2030 }
2031
2032 fn contains_packet(&mut self, hash: B256) -> bool {
2034 !self.unique_packets.insert(hash, ())
2035 }
2036}
2037
2038impl Default for ReceiveCache {
2039 fn default() -> Self {
2040 Self {
2041 ip_messages: Default::default(),
2042 unique_packets: schnellru::LruMap::new(schnellru::ByLength::new(32)),
2043 }
2044 }
2045}
2046
2047enum Discv4Command {
2049 Add(NodeRecord),
2050 SetTcpPort(u16),
2051 SetEIP868RLPPair { key: Vec<u8>, rlp: Bytes },
2052 Ban(PeerId, IpAddr),
2053 BanPeer(PeerId),
2054 BanIp(IpAddr),
2055 Remove(PeerId),
2056 Lookup { node_id: Option<PeerId>, tx: Option<NodeRecordSender> },
2057 SetLookupInterval(Duration),
2058 Updates(OneshotSender<ReceiverStream<DiscoveryUpdate>>),
2059 Terminated,
2060}
2061
2062#[derive(Debug)]
2064pub(crate) enum IngressEvent {
2065 RecvError(io::Error),
2067 BadPacket(SocketAddr, DecodePacketError, Vec<u8>),
2069 Packet(SocketAddr, Packet),
2071}
2072
2073#[derive(Debug)]
2075struct PingRequest {
2076 sent_at: Instant,
2078 node: NodeRecord,
2080 echo_hash: B256,
2082 reason: PingReason,
2084}
2085
2086#[derive(Debug)]
2090struct LookupTargetRotator {
2091 interval: usize,
2092 counter: usize,
2093}
2094
2095impl LookupTargetRotator {
2098 const fn local_only() -> Self {
2100 Self { interval: 1, counter: 0 }
2101 }
2102}
2103
2104impl Default for LookupTargetRotator {
2105 fn default() -> Self {
2106 Self {
2107 interval: 4,
2109 counter: 3,
2110 }
2111 }
2112}
2113
2114impl LookupTargetRotator {
2115 fn next(&mut self, local: &PeerId) -> PeerId {
2117 self.counter += 1;
2118 self.counter %= self.interval;
2119 if self.counter == 0 {
2120 return *local
2121 }
2122 PeerId::random()
2123 }
2124}
2125
2126#[derive(Clone, Debug)]
2131struct LookupContext {
2132 inner: Rc<LookupContextInner>,
2133}
2134
2135impl LookupContext {
2136 fn new(
2138 target: discv5::Key<NodeKey>,
2139 nearest_nodes: impl IntoIterator<Item = (Distance, NodeRecord)>,
2140 listener: Option<NodeRecordSender>,
2141 ) -> Self {
2142 let closest_nodes = nearest_nodes
2143 .into_iter()
2144 .map(|(distance, record)| {
2145 (distance, QueryNode { record, queried: false, responded: false })
2146 })
2147 .collect();
2148
2149 let inner = Rc::new(LookupContextInner {
2150 target,
2151 closest_nodes: RefCell::new(closest_nodes),
2152 listener,
2153 });
2154 Self { inner }
2155 }
2156
2157 fn target(&self) -> PeerId {
2159 self.inner.target.preimage().0
2160 }
2161
2162 fn closest(&self, num: usize) -> Vec<NodeRecord> {
2163 self.inner
2164 .closest_nodes
2165 .borrow()
2166 .iter()
2167 .filter(|(_, node)| !node.queried)
2168 .map(|(_, n)| n.record)
2169 .take(num)
2170 .collect()
2171 }
2172
2173 fn filter_closest<P>(&self, num: usize, filter: P) -> Vec<NodeRecord>
2175 where
2176 P: FnMut(&NodeRecord) -> bool,
2177 {
2178 self.inner
2179 .closest_nodes
2180 .borrow()
2181 .iter()
2182 .filter(|(_, node)| !node.queried)
2183 .map(|(_, n)| n.record)
2184 .filter(filter)
2185 .take(num)
2186 .collect()
2187 }
2188
2189 fn add_node(&self, record: NodeRecord) {
2191 let distance = self.inner.target.distance(&kad_key(record.id));
2192 let mut closest = self.inner.closest_nodes.borrow_mut();
2193 if let btree_map::Entry::Vacant(entry) = closest.entry(distance) {
2194 entry.insert(QueryNode { record, queried: false, responded: false });
2195 }
2196 }
2197
2198 fn set_queried(&self, id: PeerId, val: bool) {
2199 if let Some((_, node)) =
2200 self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2201 {
2202 node.queried = val;
2203 }
2204 }
2205
2206 fn mark_queried(&self, id: PeerId) {
2208 self.set_queried(id, true)
2209 }
2210
2211 fn unmark_queried(&self, id: PeerId) {
2213 self.set_queried(id, false)
2214 }
2215
2216 fn mark_responded(&self, id: PeerId) {
2218 if let Some((_, node)) =
2219 self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2220 {
2221 node.responded = true;
2222 }
2223 }
2224}
2225
2226unsafe impl Send for LookupContext {}
2233#[derive(Debug)]
2234struct LookupContextInner {
2235 target: discv5::Key<NodeKey>,
2237 closest_nodes: RefCell<BTreeMap<Distance, QueryNode>>,
2239 listener: Option<NodeRecordSender>,
2244}
2245
2246impl Drop for LookupContextInner {
2247 fn drop(&mut self) {
2248 if let Some(tx) = self.listener.take() {
2249 let nodes = self
2252 .closest_nodes
2253 .take()
2254 .into_values()
2255 .filter(|node| node.responded)
2256 .map(|node| node.record)
2257 .collect();
2258 let _ = tx.send(nodes);
2259 }
2260 }
2261}
2262
2263#[derive(Debug, Clone, Copy)]
2265struct QueryNode {
2266 record: NodeRecord,
2267 queried: bool,
2268 responded: bool,
2269}
2270
2271#[derive(Debug)]
2272struct FindNodeRequest {
2273 sent_at: Instant,
2275 response_count: usize,
2277 answered: bool,
2279 lookup_context: LookupContext,
2281}
2282
2283impl FindNodeRequest {
2286 fn new(resp: LookupContext) -> Self {
2287 Self { sent_at: Instant::now(), response_count: 0, answered: false, lookup_context: resp }
2288 }
2289}
2290
2291#[derive(Debug)]
2292struct EnrRequestState {
2293 sent_at: Instant,
2295 echo_hash: B256,
2297}
2298
2299#[derive(Debug, Clone, Eq, PartialEq)]
2301struct NodeEntry {
2302 record: NodeRecord,
2304 last_seen: Instant,
2306 last_enr_seq: Option<u64>,
2308 fork_id: Option<ForkId>,
2310 find_node_failures: u8,
2312 has_endpoint_proof: bool,
2314}
2315
2316impl NodeEntry {
2319 fn new(record: NodeRecord) -> Self {
2321 Self {
2322 record,
2323 last_seen: Instant::now(),
2324 last_enr_seq: None,
2325 fork_id: None,
2326 find_node_failures: 0,
2327 has_endpoint_proof: false,
2328 }
2329 }
2330
2331 #[cfg(test)]
2332 fn new_proven(record: NodeRecord) -> Self {
2333 let mut node = Self::new(record);
2334 node.has_endpoint_proof = true;
2335 node
2336 }
2337
2338 const fn establish_proof(&mut self) {
2340 self.has_endpoint_proof = true;
2341 self.find_node_failures = 0;
2342 }
2343
2344 const fn exceeds_find_node_failures(&self, max_failures: u8) -> bool {
2346 self.find_node_failures >= max_failures
2347 }
2348
2349 fn update_with_enr(&mut self, last_enr_seq: Option<u64>) -> Option<u64> {
2351 self.update_now(|s| std::mem::replace(&mut s.last_enr_seq, last_enr_seq))
2352 }
2353
2354 const fn inc_failed_request(&mut self) {
2356 self.find_node_failures += 1;
2357 }
2358
2359 fn update_with_fork_id(&mut self, fork_id: Option<ForkId>) -> Option<ForkId> {
2361 self.update_now(|s| std::mem::replace(&mut s.fork_id, fork_id))
2362 }
2363
2364 fn update_now<F, R>(&mut self, f: F) -> R
2366 where
2367 F: FnOnce(&mut Self) -> R,
2368 {
2369 self.last_seen = Instant::now();
2370 f(self)
2371 }
2372}
2373
2374impl NodeEntry {
2377 fn is_expired(&self) -> bool {
2379 self.last_seen.elapsed() > (ENDPOINT_PROOF_EXPIRATION / 2)
2380 }
2381}
2382
2383#[derive(Debug)]
2385enum PingReason {
2386 InitialInsert,
2388 EstablishBond,
2390 RePing,
2392 Lookup(NodeRecord, LookupContext),
2394}
2395
2396#[derive(Debug, Clone)]
2398pub enum DiscoveryUpdate {
2399 Added(NodeRecord),
2401 DiscoveredAtCapacity(NodeRecord),
2403 EnrForkId(NodeRecord, ForkId),
2405 Removed(PeerId),
2407 Batch(Vec<Self>),
2409}
2410
2411#[cfg(test)]
2412mod tests {
2413 use super::*;
2414 use crate::test_utils::{create_discv4, create_discv4_with_config, rng_endpoint, rng_record};
2415 use alloy_primitives::hex;
2416 use alloy_rlp::{Decodable, Encodable};
2417 use rand_08::Rng;
2418 use reth_ethereum_forks::{EnrForkIdEntry, ForkHash};
2419 use reth_network_peers::mainnet_nodes;
2420 use std::future::poll_fn;
2421
2422 #[tokio::test]
2423 async fn test_configured_enr_forkid_entry() {
2424 let fork: ForkId = ForkId { hash: ForkHash([220, 233, 108, 45]), next: 0u64 };
2425 let mut disc_conf = Discv4Config::default();
2426 disc_conf.add_eip868_pair("eth", EnrForkIdEntry::from(fork));
2427 let (_discv4, service) = create_discv4_with_config(disc_conf).await;
2428 let eth = service.local_eip_868_enr.get_raw_rlp(b"eth").unwrap();
2429 let fork_entry_id = EnrForkIdEntry::decode(&mut ð[..]).unwrap();
2430
2431 let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2432 let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2433 let expected = EnrForkIdEntry {
2434 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2435 };
2436 assert_eq!(expected, fork_entry_id);
2437 assert_eq!(expected, decoded);
2438 }
2439
2440 #[test]
2441 fn test_enr_forkid_entry_decode() {
2442 let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2443 let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2444 let expected = EnrForkIdEntry {
2445 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2446 };
2447 assert_eq!(expected, decoded);
2448 }
2449
2450 #[test]
2451 fn test_enr_forkid_entry_encode() {
2452 let original = EnrForkIdEntry {
2453 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2454 };
2455 let expected: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2456 let mut encoded = Vec::with_capacity(expected.len());
2457 original.encode(&mut encoded);
2458 assert_eq!(&expected[..], encoded.as_slice());
2459 }
2460
2461 #[test]
2462 fn test_local_rotator() {
2463 let id = PeerId::random();
2464 let mut rotator = LookupTargetRotator::local_only();
2465 assert_eq!(rotator.next(&id), id);
2466 assert_eq!(rotator.next(&id), id);
2467 }
2468
2469 #[test]
2470 fn test_rotator() {
2471 let id = PeerId::random();
2472 let mut rotator = LookupTargetRotator::default();
2473 assert_eq!(rotator.next(&id), id);
2474 assert_ne!(rotator.next(&id), id);
2475 assert_ne!(rotator.next(&id), id);
2476 assert_ne!(rotator.next(&id), id);
2477 assert_eq!(rotator.next(&id), id);
2478 }
2479
2480 #[tokio::test]
2481 async fn test_pending_ping() {
2482 let (_, mut service) = create_discv4().await;
2483
2484 let local_addr = service.local_addr();
2485
2486 let mut num_inserted = 0;
2487 loop {
2488 let node = NodeRecord::new(local_addr, PeerId::random());
2489 if service.add_node(node) {
2490 num_inserted += 1;
2491 assert!(service.pending_pings.contains_key(&node.id));
2492 assert_eq!(service.pending_pings.len(), num_inserted);
2493 if num_inserted == MAX_NODES_PING {
2494 break
2495 }
2496 }
2497 }
2498
2499 num_inserted = 0;
2501 for _ in 0..MAX_NODES_PING {
2502 let node = NodeRecord::new(local_addr, PeerId::random());
2503 if service.add_node(node) {
2504 num_inserted += 1;
2505 assert!(!service.pending_pings.contains_key(&node.id));
2506 assert_eq!(service.pending_pings.len(), MAX_NODES_PING);
2507 assert_eq!(service.queued_pings.len(), num_inserted);
2508 }
2509 }
2510 }
2511
2512 #[tokio::test(flavor = "multi_thread")]
2514 #[ignore]
2515 async fn test_mainnet_lookup() {
2516 reth_tracing::init_test_tracing();
2517 let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2518
2519 let all_nodes = mainnet_nodes();
2520 let config = Discv4Config::builder()
2521 .add_boot_nodes(all_nodes)
2522 .lookup_interval(Duration::from_secs(1))
2523 .add_eip868_pair("eth", fork_id)
2524 .build();
2525 let (_discv4, mut service) = create_discv4_with_config(config).await;
2526
2527 let mut updates = service.update_stream();
2528
2529 let _handle = service.spawn();
2530
2531 let mut table = HashMap::new();
2532 while let Some(update) = updates.next().await {
2533 match update {
2534 DiscoveryUpdate::EnrForkId(record, fork_id) => {
2535 println!("{record:?}, {fork_id:?}");
2536 }
2537 DiscoveryUpdate::Added(record) => {
2538 table.insert(record.id, record);
2539 }
2540 DiscoveryUpdate::Removed(id) => {
2541 table.remove(&id);
2542 }
2543 _ => {}
2544 }
2545 println!("total peers {}", table.len());
2546 }
2547 }
2548
2549 #[tokio::test]
2550 async fn test_mapped_ipv4() {
2551 reth_tracing::init_test_tracing();
2552 let mut rng = rand_08::thread_rng();
2553 let config = Discv4Config::builder().build();
2554 let (_discv4, mut service) = create_discv4_with_config(config).await;
2555
2556 let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2557 let v6 = v4.to_ipv6_mapped();
2558 let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2559
2560 let ping = Ping {
2561 from: rng_endpoint(&mut rng),
2562 to: rng_endpoint(&mut rng),
2563 expire: service.ping_expiration(),
2564 enr_sq: Some(rng.r#gen()),
2565 };
2566
2567 let id = PeerId::random();
2568 service.on_ping(ping, addr, id, B256::random());
2569
2570 let key = kad_key(id);
2571 match service.kbuckets.entry(&key) {
2572 kbucket::Entry::Present(entry, _) => {
2573 let node_addr = entry.value().record.address;
2574 assert!(node_addr.is_ipv4());
2575 assert_eq!(node_addr, IpAddr::from(v4));
2576 }
2577 _ => unreachable!(),
2578 };
2579 }
2580
2581 #[tokio::test]
2582 async fn test_respect_ping_expiration() {
2583 reth_tracing::init_test_tracing();
2584 let mut rng = rand_08::thread_rng();
2585 let config = Discv4Config::builder().build();
2586 let (_discv4, mut service) = create_discv4_with_config(config).await;
2587
2588 let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2589 let v6 = v4.to_ipv6_mapped();
2590 let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2591
2592 let ping = Ping {
2593 from: rng_endpoint(&mut rng),
2594 to: rng_endpoint(&mut rng),
2595 expire: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() - 1,
2596 enr_sq: Some(rng.r#gen()),
2597 };
2598
2599 let id = PeerId::random();
2600 service.on_ping(ping, addr, id, B256::random());
2601
2602 let key = kad_key(id);
2603 match service.kbuckets.entry(&key) {
2604 kbucket::Entry::Absent(_) => {}
2605 _ => unreachable!(),
2606 };
2607 }
2608
2609 #[tokio::test]
2610 async fn test_single_lookups() {
2611 reth_tracing::init_test_tracing();
2612
2613 let config = Discv4Config::builder().build();
2614 let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2615
2616 let id = PeerId::random();
2617 let key = kad_key(id);
2618 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2619
2620 let _ = service.kbuckets.insert_or_update(
2621 &key,
2622 NodeEntry::new_proven(record),
2623 NodeStatus {
2624 direction: ConnectionDirection::Incoming,
2625 state: ConnectionState::Connected,
2626 },
2627 );
2628
2629 service.lookup_self();
2630 assert_eq!(service.pending_find_nodes.len(), 1);
2631
2632 poll_fn(|cx| {
2633 let _ = service.poll(cx);
2634 assert_eq!(service.pending_find_nodes.len(), 1);
2635
2636 Poll::Ready(())
2637 })
2638 .await;
2639 }
2640
2641 #[tokio::test]
2642 async fn test_on_neighbours_recursive_lookup() {
2643 reth_tracing::init_test_tracing();
2644
2645 let config = Discv4Config::builder().build();
2646 let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2647 let (_discv4, mut service2) = create_discv4_with_config(config).await;
2648
2649 let id = PeerId::random();
2650 let key = kad_key(id);
2651 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2652
2653 let _ = service.kbuckets.insert_or_update(
2654 &key,
2655 NodeEntry::new_proven(record),
2656 NodeStatus {
2657 direction: ConnectionDirection::Incoming,
2658 state: ConnectionState::Connected,
2659 },
2660 );
2661 service.lookup_self();
2664 assert_eq!(service.pending_find_nodes.len(), 1);
2665
2666 poll_fn(|cx| {
2667 let _ = service.poll(cx);
2668 assert_eq!(service.pending_find_nodes.len(), 1);
2669
2670 Poll::Ready(())
2671 })
2672 .await;
2673
2674 let expiry = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() +
2675 10000000000000;
2676 let msg = Neighbours { nodes: vec![service2.local_node_record], expire: expiry };
2677 service.on_neighbours(msg, record.tcp_addr(), id);
2678 let event = poll_fn(|cx| service2.poll(cx)).await;
2680 assert_eq!(event, Discv4Event::Ping);
2681 assert_eq!(service.pending_find_nodes.len(), 1);
2684 let event = poll_fn(|cx| service.poll(cx)).await;
2686 assert_eq!(event, Discv4Event::Pong);
2687 let event = poll_fn(|cx| service.poll(cx)).await;
2692 assert_eq!(event, Discv4Event::Ping);
2693 assert_eq!(service.pending_find_nodes.len(), 2);
2696 }
2697
2698 #[tokio::test]
2699 async fn test_no_local_in_closest() {
2700 reth_tracing::init_test_tracing();
2701
2702 let config = Discv4Config::builder().build();
2703 let (_discv4, mut service) = create_discv4_with_config(config).await;
2704
2705 let target_key = kad_key(PeerId::random());
2706
2707 let id = PeerId::random();
2708 let key = kad_key(id);
2709 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2710
2711 let _ = service.kbuckets.insert_or_update(
2712 &key,
2713 NodeEntry::new(record),
2714 NodeStatus {
2715 direction: ConnectionDirection::Incoming,
2716 state: ConnectionState::Connected,
2717 },
2718 );
2719
2720 let closest = service
2721 .kbuckets
2722 .closest_values(&target_key)
2723 .map(|n| n.value.record)
2724 .take(MAX_NODES_PER_BUCKET)
2725 .collect::<Vec<_>>();
2726
2727 assert_eq!(closest.len(), 1);
2728 assert!(!closest.iter().any(|r| r.id == *service.local_peer_id()));
2729 }
2730
2731 #[tokio::test]
2732 async fn test_random_lookup() {
2733 reth_tracing::init_test_tracing();
2734
2735 let config = Discv4Config::builder().build();
2736 let (_discv4, mut service) = create_discv4_with_config(config).await;
2737
2738 let target = PeerId::random();
2739
2740 let id = PeerId::random();
2741 let key = kad_key(id);
2742 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2743
2744 let _ = service.kbuckets.insert_or_update(
2745 &key,
2746 NodeEntry::new_proven(record),
2747 NodeStatus {
2748 direction: ConnectionDirection::Incoming,
2749 state: ConnectionState::Connected,
2750 },
2751 );
2752
2753 service.lookup(target);
2754 assert_eq!(service.pending_find_nodes.len(), 1);
2755
2756 let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2757
2758 assert_eq!(ctx.target(), target);
2759 assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2760
2761 ctx.add_node(record);
2762 assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2763 }
2764
2765 #[tokio::test]
2766 async fn test_reping_on_find_node_failures() {
2767 reth_tracing::init_test_tracing();
2768
2769 let config = Discv4Config::builder().build();
2770 let (_discv4, mut service) = create_discv4_with_config(config).await;
2771
2772 let target = PeerId::random();
2773
2774 let id = PeerId::random();
2775 let key = kad_key(id);
2776 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2777
2778 let mut entry = NodeEntry::new_proven(record);
2779 entry.find_node_failures = u8::MAX;
2780 let _ = service.kbuckets.insert_or_update(
2781 &key,
2782 entry,
2783 NodeStatus {
2784 direction: ConnectionDirection::Incoming,
2785 state: ConnectionState::Connected,
2786 },
2787 );
2788
2789 service.lookup(target);
2790 assert_eq!(service.pending_find_nodes.len(), 0);
2791 assert_eq!(service.pending_pings.len(), 1);
2792
2793 service.update_on_pong(record, None);
2794
2795 service
2796 .on_entry(record.id, |entry| {
2797 assert_eq!(entry.find_node_failures, 0);
2799 assert!(entry.has_endpoint_proof);
2800 })
2801 .unwrap();
2802 }
2803
2804 #[tokio::test]
2805 async fn test_service_commands() {
2806 reth_tracing::init_test_tracing();
2807
2808 let config = Discv4Config::builder().build();
2809 let (discv4, mut service) = create_discv4_with_config(config).await;
2810
2811 service.lookup_self();
2812
2813 let _handle = service.spawn();
2814 discv4.send_lookup_self();
2815 let _ = discv4.lookup_self().await;
2816 }
2817
2818 #[tokio::test]
2819 async fn test_requests_timeout() {
2820 reth_tracing::init_test_tracing();
2821 let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2822
2823 let config = Discv4Config::builder()
2824 .request_timeout(Duration::from_millis(200))
2825 .ping_expiration(Duration::from_millis(200))
2826 .lookup_neighbours_expiration(Duration::from_millis(200))
2827 .add_eip868_pair("eth", fork_id)
2828 .build();
2829 let (_disv4, mut service) = create_discv4_with_config(config).await;
2830
2831 let id = PeerId::random();
2832 let key = kad_key(id);
2833 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2834
2835 let _ = service.kbuckets.insert_or_update(
2836 &key,
2837 NodeEntry::new_proven(record),
2838 NodeStatus {
2839 direction: ConnectionDirection::Incoming,
2840 state: ConnectionState::Connected,
2841 },
2842 );
2843
2844 service.lookup_self();
2845 assert_eq!(service.pending_find_nodes.len(), 1);
2846
2847 let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2848
2849 service.pending_lookup.insert(record.id, (Instant::now(), ctx));
2850
2851 assert_eq!(service.pending_lookup.len(), 1);
2852
2853 let ping = Ping {
2854 from: service.local_node_record.into(),
2855 to: record.into(),
2856 expire: service.ping_expiration(),
2857 enr_sq: service.enr_seq(),
2858 };
2859 let echo_hash = service.send_packet(Message::Ping(ping), record.udp_addr());
2860 let ping_request = PingRequest {
2861 sent_at: Instant::now(),
2862 node: record,
2863 echo_hash,
2864 reason: PingReason::InitialInsert,
2865 };
2866 service.pending_pings.insert(record.id, ping_request);
2867
2868 assert_eq!(service.pending_pings.len(), 1);
2869
2870 tokio::time::sleep(Duration::from_secs(1)).await;
2871
2872 poll_fn(|cx| {
2873 let _ = service.poll(cx);
2874
2875 assert_eq!(service.pending_find_nodes.len(), 0);
2876 assert_eq!(service.pending_lookup.len(), 0);
2877 assert_eq!(service.pending_pings.len(), 0);
2878
2879 Poll::Ready(())
2880 })
2881 .await;
2882 }
2883
2884 #[tokio::test(flavor = "multi_thread")]
2886 async fn test_check_wrong_to() {
2887 reth_tracing::init_test_tracing();
2888
2889 let config = Discv4Config::builder().external_ip_resolver(None).build();
2890 let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2891 let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2892
2893 let mut ping = Ping {
2895 from: service_1.local_node_record.into(),
2896 to: service_2.local_node_record.into(),
2897 expire: service_1.ping_expiration(),
2898 enr_sq: service_1.enr_seq(),
2899 };
2900 ping.to.address = "192.0.2.0".parse().unwrap();
2901
2902 let echo_hash = service_1.send_packet(Message::Ping(ping), service_2.local_addr());
2903 let ping_request = PingRequest {
2904 sent_at: Instant::now(),
2905 node: service_2.local_node_record,
2906 echo_hash,
2907 reason: PingReason::InitialInsert,
2908 };
2909 service_1.pending_pings.insert(*service_2.local_peer_id(), ping_request);
2910
2911 let event = poll_fn(|cx| service_2.poll(cx)).await;
2913 assert_eq!(event, Discv4Event::Ping);
2914
2915 let event = poll_fn(|cx| service_1.poll(cx)).await;
2917 assert_eq!(event, Discv4Event::Pong);
2918 let event = poll_fn(|cx| service_1.poll(cx)).await;
2920 assert_eq!(event, Discv4Event::Ping);
2921 }
2922
2923 #[tokio::test(flavor = "multi_thread")]
2924 async fn test_check_ping_pong() {
2925 reth_tracing::init_test_tracing();
2926
2927 let config = Discv4Config::builder().external_ip_resolver(None).build();
2928 let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2929 let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2930
2931 service_1.add_node(service_2.local_node_record);
2933
2934 let event = poll_fn(|cx| service_2.poll(cx)).await;
2936 assert_eq!(event, Discv4Event::Ping);
2937
2938 let key1 = kad_key(*service_1.local_peer_id());
2940 match service_2.kbuckets.entry(&key1) {
2941 kbucket::Entry::Present(_entry, status) => {
2942 assert!(!status.is_connected());
2943 }
2944 _ => unreachable!(),
2945 }
2946
2947 let event = poll_fn(|cx| service_1.poll(cx)).await;
2949 assert_eq!(event, Discv4Event::Pong);
2950
2951 let key2 = kad_key(*service_2.local_peer_id());
2953 match service_1.kbuckets.entry(&key2) {
2954 kbucket::Entry::Present(_entry, status) => {
2955 assert!(status.is_connected());
2956 }
2957 _ => unreachable!(),
2958 }
2959
2960 let event = poll_fn(|cx| service_1.poll(cx)).await;
2962 assert_eq!(event, Discv4Event::Ping);
2963
2964 let event = poll_fn(|cx| service_2.poll(cx)).await;
2966
2967 match event {
2968 Discv4Event::EnrRequest => {
2969 let event = poll_fn(|cx| service_2.poll(cx)).await;
2971 match event {
2972 Discv4Event::EnrRequest => {
2973 let event = poll_fn(|cx| service_2.poll(cx)).await;
2974 assert_eq!(event, Discv4Event::Pong);
2975 }
2976 Discv4Event::Pong => {}
2977 _ => {
2978 unreachable!()
2979 }
2980 }
2981 }
2982 Discv4Event::Pong => {}
2983 ev => unreachable!("{ev:?}"),
2984 }
2985
2986 match service_2.kbuckets.entry(&key1) {
2988 kbucket::Entry::Present(_entry, status) => {
2989 assert!(status.is_connected());
2990 }
2991 ev => unreachable!("{ev:?}"),
2992 }
2993 }
2994
2995 #[test]
2996 fn test_insert() {
2997 let local_node_record = rng_record(&mut rand_08::thread_rng());
2998 let mut kbuckets: KBucketsTable<NodeKey, NodeEntry> = KBucketsTable::new(
2999 NodeKey::from(&local_node_record).into(),
3000 Duration::from_secs(60),
3001 MAX_NODES_PER_BUCKET,
3002 None,
3003 None,
3004 );
3005
3006 let new_record = rng_record(&mut rand_08::thread_rng());
3007 let key = kad_key(new_record.id);
3008 match kbuckets.entry(&key) {
3009 kbucket::Entry::Absent(entry) => {
3010 let node = NodeEntry::new(new_record);
3011 let _ = entry.insert(
3012 node,
3013 NodeStatus {
3014 direction: ConnectionDirection::Outgoing,
3015 state: ConnectionState::Disconnected,
3016 },
3017 );
3018 }
3019 _ => {
3020 unreachable!()
3021 }
3022 };
3023 match kbuckets.entry(&key) {
3024 kbucket::Entry::Present(_, _) => {}
3025 _ => {
3026 unreachable!()
3027 }
3028 }
3029 }
3030
3031 #[tokio::test]
3032 async fn test_bootnode_not_in_update_stream() {
3033 reth_tracing::init_test_tracing();
3034 let (_, service_1) = create_discv4().await;
3035 let peerid_1 = *service_1.local_peer_id();
3036
3037 let config = Discv4Config::builder().add_boot_node(service_1.local_node_record).build();
3038 service_1.spawn();
3039
3040 let (_, mut service_2) = create_discv4_with_config(config).await;
3041
3042 let mut updates = service_2.update_stream();
3043
3044 service_2.spawn();
3045
3046 let mut bootnode_appeared = false;
3048 let timeout = tokio::time::sleep(Duration::from_secs(1));
3049 tokio::pin!(timeout);
3050
3051 loop {
3052 tokio::select! {
3053 Some(update) = updates.next() => {
3054 if let DiscoveryUpdate::Added(record) = update
3055 && record.id == peerid_1 {
3056 bootnode_appeared = true;
3057 break;
3058 }
3059 }
3060 _ = &mut timeout => break,
3061 }
3062 }
3063
3064 assert!(bootnode_appeared, "Bootnode should appear in update stream");
3066 }
3067}