1#![doc(
20 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
21 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
22 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
23)]
24#![cfg_attr(not(test), warn(unused_crate_dependencies))]
25#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
26
27use crate::{
28 error::{DecodePacketError, Discv4Error},
29 proto::{FindNode, Message, Neighbours, Packet, Ping, Pong},
30};
31use alloy_primitives::{bytes::Bytes, hex, B256};
32use discv5::{
33 kbucket,
34 kbucket::{
35 BucketInsertResult, Distance, Entry as BucketEntry, InsertResult, KBucketsTable,
36 NodeStatus, MAX_NODES_PER_BUCKET,
37 },
38 ConnectionDirection, ConnectionState,
39};
40use enr::Enr;
41use itertools::Itertools;
42use parking_lot::Mutex;
43use proto::{EnrRequest, EnrResponse};
44use reth_ethereum_forks::ForkId;
45use reth_network_peers::{pk2id, PeerId};
46use secp256k1::SecretKey;
47use std::{
48 cell::RefCell,
49 collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque},
50 fmt,
51 future::poll_fn,
52 io,
53 net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
54 pin::Pin,
55 rc::Rc,
56 sync::Arc,
57 task::{ready, Context, Poll},
58 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
59};
60use tokio::{
61 net::UdpSocket,
62 sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::Sender as OneshotSender},
63 task::{JoinHandle, JoinSet},
64 time::Interval,
65};
66use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
67use tracing::{debug, trace};
68
69pub mod error;
70pub mod proto;
71
72mod config;
73pub use config::{Discv4Config, Discv4ConfigBuilder};
74
75mod node;
76use node::{kad_key, NodeKey};
77
78mod table;
79
80pub use reth_network_peers::NodeRecord;
82
83#[cfg(any(test, feature = "test-utils"))]
84pub mod test_utils;
85
86use crate::table::PongTable;
87use reth_net_nat::ResolveNatInterval;
88pub use reth_net_nat::{external_ip, NatResolver};
90
91pub const DEFAULT_DISCOVERY_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
95
96pub const DEFAULT_DISCOVERY_PORT: u16 = 30303;
100
101pub const DEFAULT_DISCOVERY_ADDRESS: SocketAddr =
105 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT));
106
107const MAX_PACKET_SIZE: usize = 1280;
109
110const MIN_PACKET_SIZE: usize = 32 + 65 + 1;
112
113const ALPHA: usize = 3;
115
116const MAX_NODES_PING: usize = 2 * MAX_NODES_PER_BUCKET;
121
122const MAX_QUEUED_PINGS: usize = 2 * MAX_NODES_PER_BUCKET;
130
131const SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS: usize = (MAX_PACKET_SIZE - 109) / 91;
137
138const ENDPOINT_PROOF_EXPIRATION: Duration = Duration::from_secs(24 * 60 * 60);
142
143const EXPIRE_DURATION: Duration = Duration::from_secs(60 * 60);
145
146const UDP_MESSAGE_POLL_LOOP_BUDGET: i32 = 4;
151
152type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>;
153type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>;
154
155pub(crate) type IngressSender = mpsc::Sender<IngressEvent>;
156pub(crate) type IngressReceiver = mpsc::Receiver<IngressEvent>;
157
158type NodeRecordSender = OneshotSender<Vec<NodeRecord>>;
159
160#[derive(Debug, Clone)]
167pub struct Discv4 {
168 local_addr: SocketAddr,
170 to_service: mpsc::UnboundedSender<Discv4Command>,
172 node_record: Arc<Mutex<NodeRecord>>,
176}
177
178impl Discv4 {
179 pub async fn spawn(
183 local_address: SocketAddr,
184 local_enr: NodeRecord,
185 secret_key: SecretKey,
186 config: Discv4Config,
187 ) -> io::Result<Self> {
188 let (discv4, service) = Self::bind(local_address, local_enr, secret_key, config).await?;
189
190 service.spawn();
191
192 Ok(discv4)
193 }
194
195 #[cfg(feature = "test-utils")]
199 pub fn noop() -> Self {
200 let (to_service, _rx) = mpsc::unbounded_channel();
201 let local_addr =
202 (IpAddr::from(std::net::Ipv4Addr::UNSPECIFIED), DEFAULT_DISCOVERY_PORT).into();
203 Self {
204 local_addr,
205 to_service,
206 node_record: Arc::new(Mutex::new(NodeRecord::new(
207 "127.0.0.1:3030".parse().unwrap(),
208 PeerId::random(),
209 ))),
210 }
211 }
212
213 pub async fn bind(
245 local_address: SocketAddr,
246 mut local_node_record: NodeRecord,
247 secret_key: SecretKey,
248 config: Discv4Config,
249 ) -> io::Result<(Self, Discv4Service)> {
250 let socket = UdpSocket::bind(local_address).await?;
251 let local_addr = socket.local_addr()?;
252 local_node_record.udp_port = local_addr.port();
253 trace!(target: "discv4", ?local_addr,"opened UDP socket");
254
255 let mut service =
256 Discv4Service::new(socket, local_addr, local_node_record, secret_key, config);
257
258 service.resolve_external_ip();
260
261 let discv4 = service.handle();
262 Ok((discv4, service))
263 }
264
265 pub const fn local_addr(&self) -> SocketAddr {
267 self.local_addr
268 }
269
270 pub fn node_record(&self) -> NodeRecord {
274 *self.node_record.lock()
275 }
276
277 pub fn external_ip(&self) -> IpAddr {
279 self.node_record.lock().address
280 }
281
282 pub fn set_lookup_interval(&self, duration: Duration) {
284 self.send_to_service(Discv4Command::SetLookupInterval(duration))
285 }
286
287 pub async fn lookup_self(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
302 self.lookup_node(None).await
303 }
304
305 pub async fn lookup(&self, node_id: PeerId) -> Result<Vec<NodeRecord>, Discv4Error> {
309 self.lookup_node(Some(node_id)).await
310 }
311
312 pub async fn lookup_random(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
314 let target = PeerId::random();
315 self.lookup_node(Some(target)).await
316 }
317
318 pub fn send_lookup(&self, node_id: PeerId) {
320 let cmd = Discv4Command::Lookup { node_id: Some(node_id), tx: None };
321 self.send_to_service(cmd);
322 }
323
324 async fn lookup_node(&self, node_id: Option<PeerId>) -> Result<Vec<NodeRecord>, Discv4Error> {
325 let (tx, rx) = oneshot::channel();
326 let cmd = Discv4Command::Lookup { node_id, tx: Some(tx) };
327 self.to_service.send(cmd)?;
328 Ok(rx.await?)
329 }
330
331 pub fn send_lookup_self(&self) {
333 let cmd = Discv4Command::Lookup { node_id: None, tx: None };
334 self.send_to_service(cmd);
335 }
336
337 pub fn remove_peer(&self, node_id: PeerId) {
339 let cmd = Discv4Command::Remove(node_id);
340 self.send_to_service(cmd);
341 }
342
343 pub fn add_node(&self, node_record: NodeRecord) {
345 let cmd = Discv4Command::Add(node_record);
346 self.send_to_service(cmd);
347 }
348
349 pub fn ban(&self, node_id: PeerId, ip: IpAddr) {
353 let cmd = Discv4Command::Ban(node_id, ip);
354 self.send_to_service(cmd);
355 }
356
357 pub fn ban_ip(&self, ip: IpAddr) {
361 let cmd = Discv4Command::BanIp(ip);
362 self.send_to_service(cmd);
363 }
364
365 pub fn ban_node(&self, node_id: PeerId) {
369 let cmd = Discv4Command::BanPeer(node_id);
370 self.send_to_service(cmd);
371 }
372
373 pub fn set_tcp_port(&self, port: u16) {
377 let cmd = Discv4Command::SetTcpPort(port);
378 self.send_to_service(cmd);
379 }
380
381 pub fn set_eip868_rlp_pair(&self, key: Vec<u8>, rlp: Bytes) {
387 let cmd = Discv4Command::SetEIP868RLPPair { key, rlp };
388 self.send_to_service(cmd);
389 }
390
391 pub fn set_eip868_rlp(&self, key: Vec<u8>, value: impl alloy_rlp::Encodable) {
395 self.set_eip868_rlp_pair(key, Bytes::from(alloy_rlp::encode(&value)))
396 }
397
398 #[inline]
399 fn send_to_service(&self, cmd: Discv4Command) {
400 let _ = self.to_service.send(cmd).map_err(|err| {
401 debug!(
402 target: "discv4",
403 %err,
404 "channel capacity reached, dropping command",
405 )
406 });
407 }
408
409 pub async fn update_stream(&self) -> Result<ReceiverStream<DiscoveryUpdate>, Discv4Error> {
411 let (tx, rx) = oneshot::channel();
412 let cmd = Discv4Command::Updates(tx);
413 self.to_service.send(cmd)?;
414 Ok(rx.await?)
415 }
416
417 pub fn terminate(&self) {
419 self.send_to_service(Discv4Command::Terminated);
420 }
421}
422
423#[must_use = "Stream does nothing unless polled"]
437pub struct Discv4Service {
438 local_address: SocketAddr,
440 local_eip_868_enr: Enr<SecretKey>,
442 local_node_record: NodeRecord,
444 shared_node_record: Arc<Mutex<NodeRecord>>,
446 secret_key: SecretKey,
448 _socket: Arc<UdpSocket>,
450 _tasks: JoinSet<()>,
454 kbuckets: KBucketsTable<NodeKey, NodeEntry>,
456 ingress: IngressReceiver,
460 egress: EgressSender,
464 queued_pings: VecDeque<(NodeRecord, PingReason)>,
471 pending_pings: HashMap<PeerId, PingRequest>,
473 pending_lookup: HashMap<PeerId, (Instant, LookupContext)>,
478 pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
480 pending_enr_requests: HashMap<PeerId, EnrRequestState>,
482 to_service: mpsc::UnboundedSender<Discv4Command>,
484 commands_rx: mpsc::UnboundedReceiver<Discv4Command>,
486 update_listeners: Vec<mpsc::Sender<DiscoveryUpdate>>,
488 lookup_interval: Interval,
490 lookup_rotator: LookupTargetRotator,
492 evict_expired_requests_interval: Interval,
494 ping_interval: Interval,
496 resolve_external_ip_interval: Option<ResolveNatInterval>,
498 config: Discv4Config,
500 queued_events: VecDeque<Discv4Event>,
502 received_pongs: PongTable,
504 expire_interval: Interval,
506}
507
508impl Discv4Service {
509 pub(crate) fn new(
511 socket: UdpSocket,
512 local_address: SocketAddr,
513 local_node_record: NodeRecord,
514 secret_key: SecretKey,
515 config: Discv4Config,
516 ) -> Self {
517 let socket = Arc::new(socket);
518 let (ingress_tx, ingress_rx) = mpsc::channel(config.udp_ingress_message_buffer);
519 let (egress_tx, egress_rx) = mpsc::channel(config.udp_egress_message_buffer);
520 let mut tasks = JoinSet::<()>::new();
521
522 let udp = Arc::clone(&socket);
523 tasks.spawn(receive_loop(udp, ingress_tx, local_node_record.id));
524
525 let udp = Arc::clone(&socket);
526 tasks.spawn(send_loop(udp, egress_rx));
527
528 let kbuckets = KBucketsTable::new(
529 NodeKey::from(&local_node_record).into(),
530 Duration::from_secs(60),
531 MAX_NODES_PER_BUCKET,
532 None,
533 None,
534 );
535
536 let self_lookup_interval = tokio::time::interval(config.lookup_interval);
537
538 let ping_interval = tokio::time::interval_at(
541 tokio::time::Instant::now() + config.ping_interval,
542 config.ping_interval,
543 );
544
545 let evict_expired_requests_interval = tokio::time::interval_at(
546 tokio::time::Instant::now() + config.request_timeout,
547 config.request_timeout,
548 );
549
550 let lookup_rotator = if config.enable_dht_random_walk {
551 LookupTargetRotator::default()
552 } else {
553 LookupTargetRotator::local_only()
554 };
555
556 let local_eip_868_enr = {
558 let mut builder = Enr::builder();
559 builder.ip(local_node_record.address);
560 if local_node_record.address.is_ipv4() {
561 builder.udp4(local_node_record.udp_port);
562 builder.tcp4(local_node_record.tcp_port);
563 } else {
564 builder.udp6(local_node_record.udp_port);
565 builder.tcp6(local_node_record.tcp_port);
566 }
567
568 for (key, val) in &config.additional_eip868_rlp_pairs {
569 builder.add_value_rlp(key, val.clone());
570 }
571 builder.build(&secret_key).expect("v4 is set")
572 };
573
574 let (to_service, commands_rx) = mpsc::unbounded_channel();
575
576 let shared_node_record = Arc::new(Mutex::new(local_node_record));
577
578 Self {
579 local_address,
580 local_eip_868_enr,
581 local_node_record,
582 shared_node_record,
583 _socket: socket,
584 kbuckets,
585 secret_key,
586 _tasks: tasks,
587 ingress: ingress_rx,
588 egress: egress_tx,
589 queued_pings: VecDeque::with_capacity(MAX_QUEUED_PINGS),
590 pending_pings: Default::default(),
591 pending_lookup: Default::default(),
592 pending_find_nodes: Default::default(),
593 pending_enr_requests: Default::default(),
594 commands_rx,
595 to_service,
596 update_listeners: Vec::with_capacity(1),
597 lookup_interval: self_lookup_interval,
598 ping_interval,
599 evict_expired_requests_interval,
600 lookup_rotator,
601 resolve_external_ip_interval: config.resolve_external_ip_interval(),
602 config,
603 queued_events: Default::default(),
604 received_pongs: Default::default(),
605 expire_interval: tokio::time::interval(EXPIRE_DURATION),
606 }
607 }
608
609 pub fn handle(&self) -> Discv4 {
611 Discv4 {
612 local_addr: self.local_address,
613 to_service: self.to_service.clone(),
614 node_record: self.shared_node_record.clone(),
615 }
616 }
617
618 fn enr_seq(&self) -> Option<u64> {
620 self.config.enable_eip868.then(|| self.local_eip_868_enr.seq())
621 }
622
623 pub fn set_lookup_interval(&mut self, duration: Duration) {
625 self.lookup_interval = tokio::time::interval(duration);
626 }
627
628 fn resolve_external_ip(&mut self) {
630 if let Some(r) = &self.resolve_external_ip_interval {
631 if let Some(external_ip) = r.resolver().as_external_ip() {
632 self.set_external_ip_addr(external_ip);
633 }
634 }
635 }
636
637 pub fn set_external_ip_addr(&mut self, external_ip: IpAddr) {
640 if self.local_node_record.address != external_ip {
641 debug!(target: "discv4", ?external_ip, "Updating external ip");
642 self.local_node_record.address = external_ip;
643 let _ = self.local_eip_868_enr.set_ip(external_ip, &self.secret_key);
644 let mut lock = self.shared_node_record.lock();
645 *lock = self.local_node_record;
646 debug!(target: "discv4", enr=?self.local_eip_868_enr, "Updated local ENR");
647 }
648 }
649
650 pub const fn local_peer_id(&self) -> &PeerId {
652 &self.local_node_record.id
653 }
654
655 pub const fn local_addr(&self) -> SocketAddr {
657 self.local_address
658 }
659
660 pub const fn local_enr(&self) -> NodeRecord {
664 self.local_node_record
665 }
666
667 #[cfg(test)]
669 pub const fn local_enr_mut(&mut self) -> &mut NodeRecord {
670 &mut self.local_node_record
671 }
672
673 pub fn contains_node(&self, id: PeerId) -> bool {
675 let key = kad_key(id);
676 self.kbuckets.get_index(&key).is_some()
677 }
678
679 pub fn bootstrap(&mut self) {
694 for record in self.config.bootstrap_nodes.clone() {
695 debug!(target: "discv4", ?record, "pinging boot node");
696 let key = kad_key(record.id);
697 let entry = NodeEntry::new(record);
698
699 match self.kbuckets.insert_or_update(
701 &key,
702 entry,
703 NodeStatus {
704 state: ConnectionState::Disconnected,
705 direction: ConnectionDirection::Outgoing,
706 },
707 ) {
708 InsertResult::Failed(_) => {}
709 _ => {
710 self.try_ping(record, PingReason::InitialInsert);
711 }
712 }
713 }
714 }
715
716 pub fn spawn(mut self) -> JoinHandle<()> {
720 tokio::task::spawn(async move {
721 self.bootstrap();
722
723 while let Some(event) = self.next().await {
724 trace!(target: "discv4", ?event, "processed");
725 }
726 trace!(target: "discv4", "service terminated");
727 })
728 }
729
730 pub fn update_stream(&mut self) -> ReceiverStream<DiscoveryUpdate> {
732 let (tx, rx) = mpsc::channel(512);
733 self.update_listeners.push(tx);
734 ReceiverStream::new(rx)
735 }
736
737 pub fn lookup_self(&mut self) {
739 self.lookup(self.local_node_record.id)
740 }
741
742 pub fn lookup(&mut self, target: PeerId) {
752 self.lookup_with(target, None)
753 }
754
755 fn lookup_with(&mut self, target: PeerId, tx: Option<NodeRecordSender>) {
765 trace!(target: "discv4", ?target, "Starting lookup");
766 let target_key = kad_key(target);
767
768 let ctx = LookupContext::new(
771 target_key.clone(),
772 self.kbuckets
773 .closest_values(&target_key)
774 .filter(|node| {
775 node.value.has_endpoint_proof &&
776 !self.pending_find_nodes.contains_key(&node.key.preimage().0)
777 })
778 .take(MAX_NODES_PER_BUCKET)
779 .map(|n| (target_key.distance(&n.key), n.value.record)),
780 tx,
781 );
782
783 let closest = ctx.closest(ALPHA);
785
786 if closest.is_empty() && self.pending_find_nodes.is_empty() {
787 self.bootstrap();
792 return
793 }
794
795 trace!(target: "discv4", ?target, num = closest.len(), "Start lookup closest nodes");
796
797 for node in closest {
798 self.find_node_checked(&node, ctx.clone());
802 }
803 }
804
805 fn find_node(&mut self, node: &NodeRecord, ctx: LookupContext) {
809 trace!(target: "discv4", ?node, lookup=?ctx.target(), "Sending FindNode");
810 ctx.mark_queried(node.id);
811 let id = ctx.target();
812 let msg = Message::FindNode(FindNode { id, expire: self.find_node_expiration() });
813 self.send_packet(msg, node.udp_addr());
814 self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx));
815 }
816
817 fn find_node_checked(&mut self, node: &NodeRecord, ctx: LookupContext) {
822 let max_failures = self.config.max_find_node_failures;
823 let needs_ping = self
824 .on_entry(node.id, |entry| entry.exceeds_find_node_failures(max_failures))
825 .unwrap_or(true);
826 if needs_ping {
827 self.try_ping(*node, PingReason::Lookup(*node, ctx))
828 } else {
829 self.find_node(node, ctx)
830 }
831 }
832
833 fn notify(&mut self, update: DiscoveryUpdate) {
837 self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) {
838 Ok(()) => true,
839 Err(err) => match err {
840 TrySendError::Full(_) => true,
841 TrySendError::Closed(_) => false,
842 },
843 });
844 }
845
846 pub fn ban_ip(&mut self, ip: IpAddr) {
848 self.config.ban_list.ban_ip(ip);
849 }
850
851 pub fn ban_node(&mut self, node_id: PeerId) {
853 self.remove_node(node_id);
854 self.config.ban_list.ban_peer(node_id);
855 }
856
857 pub fn ban_ip_until(&mut self, ip: IpAddr, until: Instant) {
859 self.config.ban_list.ban_ip_until(ip, until);
860 }
861
862 pub fn ban_node_until(&mut self, node_id: PeerId, until: Instant) {
864 self.remove_node(node_id);
865 self.config.ban_list.ban_peer_until(node_id, until);
866 }
867
868 pub fn remove_node(&mut self, node_id: PeerId) -> bool {
873 let key = kad_key(node_id);
874 self.remove_key(node_id, key)
875 }
876
877 pub fn soft_remove_node(&mut self, node_id: PeerId) -> bool {
882 let key = kad_key(node_id);
883 let Some(bucket) = self.kbuckets.get_bucket(&key) else { return false };
884 if bucket.num_entries() < MAX_NODES_PER_BUCKET / 2 {
885 return false
887 }
888 self.remove_key(node_id, key)
889 }
890
891 fn remove_key(&mut self, node_id: PeerId, key: discv5::Key<NodeKey>) -> bool {
892 let removed = self.kbuckets.remove(&key);
893 if removed {
894 trace!(target: "discv4", ?node_id, "removed node");
895 self.notify(DiscoveryUpdate::Removed(node_id));
896 }
897 removed
898 }
899
900 pub fn num_connected(&self) -> usize {
902 self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected())
903 }
904
905 fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool {
907 if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) {
908 if timestamp.elapsed() < self.config.bond_expiration {
909 return true
910 }
911 }
912 false
913 }
914
915 fn on_entry<F, R>(&mut self, peer_id: PeerId, f: F) -> Option<R>
917 where
918 F: FnOnce(&NodeEntry) -> R,
919 {
920 let key = kad_key(peer_id);
921 match self.kbuckets.entry(&key) {
922 BucketEntry::Present(entry, _) => Some(f(entry.value())),
923 BucketEntry::Pending(mut entry, _) => Some(f(entry.value())),
924 _ => None,
925 }
926 }
927
928 fn update_on_reping(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
935 if record.id == self.local_node_record.id {
936 return
937 }
938
939 if !self.config.enable_eip868 {
941 last_enr_seq = None;
942 }
943
944 let key = kad_key(record.id);
945 let old_enr = match self.kbuckets.entry(&key) {
946 kbucket::Entry::Present(mut entry, _) => {
947 entry.value_mut().update_with_enr(last_enr_seq)
948 }
949 kbucket::Entry::Pending(mut entry, _) => entry.value().update_with_enr(last_enr_seq),
950 _ => return,
951 };
952
953 match (last_enr_seq, old_enr) {
955 (Some(new), Some(old)) => {
956 if new > old {
957 self.send_enr_request(record);
958 }
959 }
960 (Some(_), None) => {
961 self.send_enr_request(record);
963 }
964 _ => {}
965 };
966 }
967
968 fn update_on_pong(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
970 if record.id == *self.local_peer_id() {
971 return
972 }
973
974 if !self.config.enable_eip868 {
976 last_enr_seq = None;
977 }
978
979 let has_enr_seq = last_enr_seq.is_some();
982
983 let key = kad_key(record.id);
984 match self.kbuckets.entry(&key) {
985 kbucket::Entry::Present(mut entry, old_status) => {
986 entry.value_mut().establish_proof();
988 entry.value_mut().update_with_enr(last_enr_seq);
989
990 if !old_status.is_connected() {
991 let _ = entry.update(ConnectionState::Connected, Some(old_status.direction));
992 trace!(target: "discv4", ?record, "added after successful endpoint proof");
993 self.notify(DiscoveryUpdate::Added(record));
994
995 if has_enr_seq {
996 self.send_enr_request(record);
998 }
999 }
1000 }
1001 kbucket::Entry::Pending(mut entry, mut status) => {
1002 entry.value().establish_proof();
1004 entry.value().update_with_enr(last_enr_seq);
1005
1006 if !status.is_connected() {
1007 status.state = ConnectionState::Connected;
1008 let _ = entry.update(status);
1009 trace!(target: "discv4", ?record, "added after successful endpoint proof");
1010 self.notify(DiscoveryUpdate::Added(record));
1011
1012 if has_enr_seq {
1013 self.send_enr_request(record);
1015 }
1016 }
1017 }
1018 _ => {}
1019 };
1020 }
1021
1022 pub fn add_all_nodes(&mut self, records: impl IntoIterator<Item = NodeRecord>) {
1026 for record in records {
1027 self.add_node(record);
1028 }
1029 }
1030
1031 pub fn add_node(&mut self, record: NodeRecord) -> bool {
1037 let key = kad_key(record.id);
1038 match self.kbuckets.entry(&key) {
1039 kbucket::Entry::Absent(entry) => {
1040 let node = NodeEntry::new(record);
1041 match entry.insert(
1042 node,
1043 NodeStatus {
1044 direction: ConnectionDirection::Outgoing,
1045 state: ConnectionState::Disconnected,
1046 },
1047 ) {
1048 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1049 trace!(target: "discv4", ?record, "inserted new record");
1050 }
1051 _ => return false,
1052 }
1053 }
1054 _ => return false,
1055 }
1056
1057 self.try_ping(record, PingReason::InitialInsert);
1059 true
1060 }
1061
1062 pub(crate) fn send_packet(&self, msg: Message, to: SocketAddr) -> B256 {
1064 let (payload, hash) = msg.encode(&self.secret_key);
1065 trace!(target: "discv4", r#type=?msg.msg_type(), ?to, ?hash, "sending packet");
1066 let _ = self.egress.try_send((payload, to)).map_err(|err| {
1067 debug!(
1068 target: "discv4",
1069 %err,
1070 "dropped outgoing packet",
1071 );
1072 });
1073 hash
1074 }
1075
1076 fn on_ping(&mut self, ping: Ping, remote_addr: SocketAddr, remote_id: PeerId, hash: B256) {
1078 if self.is_expired(ping.expire) {
1079 return
1081 }
1082
1083 let record = NodeRecord {
1085 address: remote_addr.ip(),
1086 udp_port: remote_addr.port(),
1087 tcp_port: ping.from.tcp_port,
1088 id: remote_id,
1089 }
1090 .into_ipv4_mapped();
1091
1092 let key = kad_key(record.id);
1093
1094 let mut is_new_insert = false;
1101 let mut needs_bond = false;
1102 let mut is_proven = false;
1103
1104 let old_enr = match self.kbuckets.entry(&key) {
1105 kbucket::Entry::Present(mut entry, _) => {
1106 if entry.value().is_expired() {
1107 needs_bond = true;
1110 } else {
1111 is_proven = entry.value().has_endpoint_proof;
1112 }
1113 entry.value_mut().update_with_enr(ping.enr_sq)
1114 }
1115 kbucket::Entry::Pending(mut entry, _) => {
1116 if entry.value().is_expired() {
1117 needs_bond = true;
1120 } else {
1121 is_proven = entry.value().has_endpoint_proof;
1122 }
1123 entry.value().update_with_enr(ping.enr_sq)
1124 }
1125 kbucket::Entry::Absent(entry) => {
1126 let mut node = NodeEntry::new(record);
1127 node.last_enr_seq = ping.enr_sq;
1128
1129 match entry.insert(
1130 node,
1131 NodeStatus {
1132 direction: ConnectionDirection::Incoming,
1133 state: ConnectionState::Disconnected,
1135 },
1136 ) {
1137 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1138 is_new_insert = true;
1140 }
1141 BucketInsertResult::Full => {
1142 trace!(target: "discv4", ?record, "discovered new record but bucket is full");
1146 self.notify(DiscoveryUpdate::DiscoveredAtCapacity(record));
1147 needs_bond = true;
1148 }
1149 BucketInsertResult::TooManyIncoming | BucketInsertResult::NodeExists => {
1150 needs_bond = true;
1151 }
1153 BucketInsertResult::FailedFilter => return,
1154 }
1155
1156 None
1157 }
1158 kbucket::Entry::SelfEntry => return,
1159 };
1160
1161 let pong = Message::Pong(Pong {
1164 to: record.into(),
1166 echo: hash,
1167 expire: ping.expire,
1168 enr_sq: self.enr_seq(),
1169 });
1170 self.send_packet(pong, remote_addr);
1171
1172 if is_new_insert {
1174 self.try_ping(record, PingReason::InitialInsert);
1175 } else if needs_bond {
1176 self.try_ping(record, PingReason::EstablishBond);
1177 } else if is_proven {
1178 if let Some((_, ctx)) = self.pending_lookup.remove(&record.id) {
1182 if self.pending_find_nodes.contains_key(&record.id) {
1183 ctx.unmark_queried(record.id);
1186 } else {
1187 self.find_node(&record, ctx);
1190 }
1191 }
1192 } else {
1193 match (ping.enr_sq, old_enr) {
1195 (Some(new), Some(old)) => {
1196 if new > old {
1197 self.send_enr_request(record);
1198 }
1199 }
1200 (Some(_), None) => {
1201 self.send_enr_request(record);
1202 }
1203 _ => {}
1204 };
1205 }
1206 }
1207
1208 fn try_ping(&mut self, node: NodeRecord, reason: PingReason) {
1210 if node.id == *self.local_peer_id() {
1211 return
1213 }
1214
1215 if self.pending_pings.contains_key(&node.id) ||
1216 self.pending_find_nodes.contains_key(&node.id)
1217 {
1218 return
1219 }
1220
1221 if self.queued_pings.iter().any(|(n, _)| n.id == node.id) {
1222 return
1223 }
1224
1225 if self.pending_pings.len() < MAX_NODES_PING {
1226 self.send_ping(node, reason);
1227 } else if self.queued_pings.len() < MAX_QUEUED_PINGS {
1228 self.queued_pings.push_back((node, reason));
1229 }
1230 }
1231
1232 pub(crate) fn send_ping(&mut self, node: NodeRecord, reason: PingReason) -> B256 {
1236 let remote_addr = node.udp_addr();
1237 let id = node.id;
1238 let ping = Ping {
1239 from: self.local_node_record.into(),
1240 to: node.into(),
1241 expire: self.ping_expiration(),
1242 enr_sq: self.enr_seq(),
1243 };
1244 trace!(target: "discv4", ?ping, "sending ping");
1245 let echo_hash = self.send_packet(Message::Ping(ping), remote_addr);
1246
1247 self.pending_pings
1248 .insert(id, PingRequest { sent_at: Instant::now(), node, echo_hash, reason });
1249 echo_hash
1250 }
1251
1252 pub(crate) fn send_enr_request(&mut self, node: NodeRecord) {
1256 if !self.config.enable_eip868 {
1257 return
1258 }
1259 let remote_addr = node.udp_addr();
1260 let enr_request = EnrRequest { expire: self.enr_request_expiration() };
1261
1262 trace!(target: "discv4", ?enr_request, "sending enr request");
1263 let echo_hash = self.send_packet(Message::EnrRequest(enr_request), remote_addr);
1264
1265 self.pending_enr_requests
1266 .insert(node.id, EnrRequestState { sent_at: Instant::now(), echo_hash });
1267 }
1268
1269 fn on_pong(&mut self, pong: Pong, remote_addr: SocketAddr, remote_id: PeerId) {
1271 if self.is_expired(pong.expire) {
1272 return
1273 }
1274
1275 let PingRequest { node, reason, .. } = match self.pending_pings.entry(remote_id) {
1276 Entry::Occupied(entry) => {
1277 {
1278 let request = entry.get();
1279 if request.echo_hash != pong.echo {
1280 trace!(target: "discv4", from=?remote_addr, expected=?request.echo_hash, echo_hash=?pong.echo,"Got unexpected Pong");
1281 return
1282 }
1283 }
1284 entry.remove()
1285 }
1286 Entry::Vacant(_) => return,
1287 };
1288
1289 self.received_pongs.on_pong(remote_id, remote_addr.ip());
1291
1292 match reason {
1293 PingReason::InitialInsert => {
1294 self.update_on_pong(node, pong.enr_sq);
1295 }
1296 PingReason::EstablishBond => {
1297 self.update_on_pong(node, pong.enr_sq);
1299 }
1300 PingReason::RePing => {
1301 self.update_on_reping(node, pong.enr_sq);
1302 }
1303 PingReason::Lookup(node, ctx) => {
1304 self.update_on_pong(node, pong.enr_sq);
1305 self.pending_lookup.insert(node.id, (Instant::now(), ctx));
1310 }
1311 }
1312 }
1313
1314 fn on_find_node(&mut self, msg: FindNode, remote_addr: SocketAddr, node_id: PeerId) {
1316 if self.is_expired(msg.expire) {
1317 return
1319 }
1320 if node_id == *self.local_peer_id() {
1321 return
1323 }
1324
1325 if self.has_bond(node_id, remote_addr.ip()) {
1326 self.respond_closest(msg.id, remote_addr)
1327 }
1328 }
1329
1330 fn on_enr_response(&mut self, msg: EnrResponse, remote_addr: SocketAddr, id: PeerId) {
1332 trace!(target: "discv4", ?remote_addr, ?msg, "received ENR response");
1333 if let Some(resp) = self.pending_enr_requests.remove(&id) {
1334 let enr_id = pk2id(&msg.enr.public_key());
1336 if id != enr_id {
1337 return
1338 }
1339
1340 if resp.echo_hash == msg.request_hash {
1341 let key = kad_key(id);
1342 let fork_id = msg.eth_fork_id();
1343 let (record, old_fork_id) = match self.kbuckets.entry(&key) {
1344 kbucket::Entry::Present(mut entry, _) => {
1345 let id = entry.value_mut().update_with_fork_id(fork_id);
1346 (entry.value().record, id)
1347 }
1348 kbucket::Entry::Pending(mut entry, _) => {
1349 let id = entry.value().update_with_fork_id(fork_id);
1350 (entry.value().record, id)
1351 }
1352 _ => return,
1353 };
1354 match (fork_id, old_fork_id) {
1355 (Some(new), Some(old)) => {
1356 if new != old {
1357 self.notify(DiscoveryUpdate::EnrForkId(record, new))
1358 }
1359 }
1360 (Some(new), None) => self.notify(DiscoveryUpdate::EnrForkId(record, new)),
1361 _ => {}
1362 }
1363 }
1364 }
1365 }
1366
1367 fn on_enr_request(
1369 &self,
1370 msg: EnrRequest,
1371 remote_addr: SocketAddr,
1372 id: PeerId,
1373 request_hash: B256,
1374 ) {
1375 if !self.config.enable_eip868 || self.is_expired(msg.expire) {
1376 return
1377 }
1378
1379 if self.has_bond(id, remote_addr.ip()) {
1380 self.send_packet(
1381 Message::EnrResponse(EnrResponse {
1382 request_hash,
1383 enr: self.local_eip_868_enr.clone(),
1384 }),
1385 remote_addr,
1386 );
1387 }
1388 }
1389
1390 fn on_neighbours(&mut self, msg: Neighbours, remote_addr: SocketAddr, node_id: PeerId) {
1393 if self.is_expired(msg.expire) {
1394 return
1396 }
1397 let ctx = match self.pending_find_nodes.entry(node_id) {
1399 Entry::Occupied(mut entry) => {
1400 {
1401 let request = entry.get_mut();
1402 request.answered = true;
1404 let total = request.response_count + msg.nodes.len();
1405
1406 if total <= MAX_NODES_PER_BUCKET {
1408 request.response_count = total;
1409 } else {
1410 trace!(target: "discv4", total, from=?remote_addr, "Received neighbors packet entries exceeds max nodes per bucket");
1411 return
1412 }
1413 };
1414
1415 if entry.get().response_count == MAX_NODES_PER_BUCKET {
1416 let ctx = entry.remove().lookup_context;
1418 ctx.mark_responded(node_id);
1419 ctx
1420 } else {
1421 entry.get().lookup_context.clone()
1422 }
1423 }
1424 Entry::Vacant(_) => {
1425 trace!(target: "discv4", from=?remote_addr, "Received unsolicited Neighbours");
1427 return
1428 }
1429 };
1430
1431 trace!(target: "discv4",
1433 target=format!("{:#?}", node_id),
1434 peers_count=msg.nodes.len(),
1435 peers=format!("[{:#}]", msg.nodes.iter()
1436 .map(|node_rec| node_rec.id
1437 ).format(", ")),
1438 "Received peers from Neighbours packet"
1439 );
1440
1441 for node in msg.nodes.into_iter().map(NodeRecord::into_ipv4_mapped) {
1444 if self.config.ban_list.is_banned(&node.id, &node.address) {
1446 trace!(target: "discv4", peer_id=?node.id, ip=?node.address, "ignoring banned record");
1447 continue
1448 }
1449
1450 ctx.add_node(node);
1451 }
1452
1453 let closest =
1455 ctx.filter_closest(ALPHA, |node| !self.pending_find_nodes.contains_key(&node.id));
1456
1457 for closest in closest {
1458 let key = kad_key(closest.id);
1459 match self.kbuckets.entry(&key) {
1460 BucketEntry::Absent(entry) => {
1461 ctx.mark_queried(closest.id);
1467 let node = NodeEntry::new(closest);
1468 match entry.insert(
1469 node,
1470 NodeStatus {
1471 direction: ConnectionDirection::Outgoing,
1472 state: ConnectionState::Disconnected,
1473 },
1474 ) {
1475 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1476 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1478 }
1479 BucketInsertResult::Full => {
1480 self.notify(DiscoveryUpdate::DiscoveredAtCapacity(closest))
1482 }
1483 _ => {}
1484 }
1485 }
1486 BucketEntry::SelfEntry => {
1487 }
1489 BucketEntry::Present(entry, _) => {
1490 if entry.value().has_endpoint_proof {
1491 if entry
1492 .value()
1493 .exceeds_find_node_failures(self.config.max_find_node_failures)
1494 {
1495 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1496 } else {
1497 self.find_node(&closest, ctx.clone());
1498 }
1499 }
1500 }
1501 BucketEntry::Pending(mut entry, _) => {
1502 if entry.value().has_endpoint_proof {
1503 if entry
1504 .value()
1505 .exceeds_find_node_failures(self.config.max_find_node_failures)
1506 {
1507 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1508 } else {
1509 self.find_node(&closest, ctx.clone());
1510 }
1511 }
1512 }
1513 }
1514 }
1515 }
1516
1517 fn respond_closest(&mut self, target: PeerId, to: SocketAddr) {
1519 let key = kad_key(target);
1520 let expire = self.send_neighbours_expiration();
1521
1522 let closest_nodes =
1524 self.kbuckets.closest_values(&key).take(MAX_NODES_PER_BUCKET).collect::<Vec<_>>();
1525
1526 for nodes in closest_nodes.chunks(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) {
1527 let nodes = nodes.iter().map(|node| node.value.record).collect::<Vec<NodeRecord>>();
1528 trace!(target: "discv4", len = nodes.len(), to=?to,"Sent neighbours packet");
1529 let msg = Message::Neighbours(Neighbours { nodes, expire });
1530 self.send_packet(msg, to);
1531 }
1532 }
1533
1534 fn evict_expired_requests(&mut self, now: Instant) {
1535 self.pending_enr_requests.retain(|_node_id, enr_request| {
1536 now.duration_since(enr_request.sent_at) < self.config.enr_expiration
1537 });
1538
1539 let mut failed_pings = Vec::new();
1540 self.pending_pings.retain(|node_id, ping_request| {
1541 if now.duration_since(ping_request.sent_at) > self.config.ping_expiration {
1542 failed_pings.push(*node_id);
1543 return false
1544 }
1545 true
1546 });
1547
1548 if !failed_pings.is_empty() {
1549 trace!(target: "discv4", num=%failed_pings.len(), "evicting nodes due to failed pong");
1551 for node_id in failed_pings {
1552 self.remove_node(node_id);
1553 }
1554 }
1555
1556 let mut failed_lookups = Vec::new();
1557 self.pending_lookup.retain(|node_id, (lookup_sent_at, _)| {
1558 if now.duration_since(*lookup_sent_at) > self.config.request_timeout {
1559 failed_lookups.push(*node_id);
1560 return false
1561 }
1562 true
1563 });
1564
1565 if !failed_lookups.is_empty() {
1566 trace!(target: "discv4", num=%failed_lookups.len(), "evicting nodes due to failed lookup");
1568 for node_id in failed_lookups {
1569 self.remove_node(node_id);
1570 }
1571 }
1572
1573 self.evict_failed_find_nodes(now);
1574 }
1575
1576 fn evict_failed_find_nodes(&mut self, now: Instant) {
1578 let mut failed_find_nodes = Vec::new();
1579 self.pending_find_nodes.retain(|node_id, find_node_request| {
1580 if now.duration_since(find_node_request.sent_at) > self.config.neighbours_expiration {
1581 if !find_node_request.answered {
1582 failed_find_nodes.push(*node_id);
1585 }
1586 return false
1587 }
1588 true
1589 });
1590
1591 if failed_find_nodes.is_empty() {
1592 return
1593 }
1594
1595 trace!(target: "discv4", num=%failed_find_nodes.len(), "processing failed find nodes");
1596
1597 for node_id in failed_find_nodes {
1598 let key = kad_key(node_id);
1599 let failures = match self.kbuckets.entry(&key) {
1600 kbucket::Entry::Present(mut entry, _) => {
1601 entry.value_mut().inc_failed_request();
1602 entry.value().find_node_failures
1603 }
1604 kbucket::Entry::Pending(mut entry, _) => {
1605 entry.value().inc_failed_request();
1606 entry.value().find_node_failures
1607 }
1608 _ => continue,
1609 };
1610
1611 if failures > self.config.max_find_node_failures {
1615 self.soft_remove_node(node_id);
1616 }
1617 }
1618 }
1619
1620 fn re_ping_oldest(&mut self) {
1625 let mut nodes = self
1626 .kbuckets
1627 .iter_ref()
1628 .filter(|entry| entry.node.value.is_expired())
1629 .map(|n| n.node.value)
1630 .collect::<Vec<_>>();
1631 nodes.sort_by(|a, b| a.last_seen.cmp(&b.last_seen));
1632 let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::<Vec<_>>();
1633 for node in to_ping {
1634 self.try_ping(node, PingReason::RePing)
1635 }
1636 }
1637
1638 fn is_expired(&self, expiration: u64) -> bool {
1640 self.ensure_not_expired(expiration).is_err()
1641 }
1642
1643 fn ensure_not_expired(&self, timestamp: u64) -> Result<(), ()> {
1653 let _ = i64::try_from(timestamp).map_err(drop)?;
1655
1656 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
1657 if self.config.enforce_expiration_timestamps && timestamp < now {
1658 trace!(target: "discv4", "Expired packet");
1659 return Err(())
1660 }
1661 Ok(())
1662 }
1663
1664 fn ping_buffered(&mut self) {
1666 while self.pending_pings.len() < MAX_NODES_PING {
1667 match self.queued_pings.pop_front() {
1668 Some((next, reason)) => self.try_ping(next, reason),
1669 None => break,
1670 }
1671 }
1672 }
1673
1674 fn ping_expiration(&self) -> u64 {
1675 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.ping_expiration)
1676 .as_secs()
1677 }
1678
1679 fn find_node_expiration(&self) -> u64 {
1680 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.request_timeout)
1681 .as_secs()
1682 }
1683
1684 fn enr_request_expiration(&self) -> u64 {
1685 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.enr_expiration)
1686 .as_secs()
1687 }
1688
1689 fn send_neighbours_expiration(&self) -> u64 {
1690 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.neighbours_expiration)
1691 .as_secs()
1692 }
1693
1694 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Discv4Event> {
1700 loop {
1701 if let Some(event) = self.queued_events.pop_front() {
1703 return Poll::Ready(event)
1704 }
1705
1706 if self.config.enable_lookup {
1708 while self.lookup_interval.poll_tick(cx).is_ready() {
1709 let target = self.lookup_rotator.next(&self.local_node_record.id);
1710 self.lookup_with(target, None);
1711 }
1712 }
1713
1714 while self.ping_interval.poll_tick(cx).is_ready() {
1716 self.re_ping_oldest();
1717 }
1718
1719 if let Some(Poll::Ready(Some(ip))) =
1720 self.resolve_external_ip_interval.as_mut().map(|r| r.poll_tick(cx))
1721 {
1722 self.set_external_ip_addr(ip);
1723 }
1724
1725 while let Poll::Ready(Some(cmd)) = self.commands_rx.poll_recv(cx) {
1727 match cmd {
1728 Discv4Command::Add(enr) => {
1729 self.add_node(enr);
1730 }
1731 Discv4Command::Lookup { node_id, tx } => {
1732 let node_id = node_id.unwrap_or(self.local_node_record.id);
1733 self.lookup_with(node_id, tx);
1734 }
1735 Discv4Command::SetLookupInterval(duration) => {
1736 self.set_lookup_interval(duration);
1737 }
1738 Discv4Command::Updates(tx) => {
1739 let rx = self.update_stream();
1740 let _ = tx.send(rx);
1741 }
1742 Discv4Command::BanPeer(node_id) => self.ban_node(node_id),
1743 Discv4Command::Remove(node_id) => {
1744 self.remove_node(node_id);
1745 }
1746 Discv4Command::Ban(node_id, ip) => {
1747 self.ban_node(node_id);
1748 self.ban_ip(ip);
1749 }
1750 Discv4Command::BanIp(ip) => {
1751 self.ban_ip(ip);
1752 }
1753 Discv4Command::SetEIP868RLPPair { key, rlp } => {
1754 debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair");
1755
1756 let _ = self.local_eip_868_enr.insert_raw_rlp(key, rlp, &self.secret_key);
1757 }
1758 Discv4Command::SetTcpPort(port) => {
1759 debug!(target: "discv4", %port, "Update tcp port");
1760 self.local_node_record.tcp_port = port;
1761 if self.local_node_record.address.is_ipv4() {
1762 let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key);
1763 } else {
1764 let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key);
1765 }
1766 }
1767
1768 Discv4Command::Terminated => {
1769 self.queued_events.push_back(Discv4Event::Terminated);
1771 }
1772 }
1773 }
1774
1775 let mut udp_message_budget = UDP_MESSAGE_POLL_LOOP_BUDGET;
1777
1778 while let Poll::Ready(Some(event)) = self.ingress.poll_recv(cx) {
1780 match event {
1781 IngressEvent::RecvError(err) => {
1782 debug!(target: "discv4", %err, "failed to read datagram");
1783 }
1784 IngressEvent::BadPacket(from, err, data) => {
1785 trace!(target: "discv4", ?from, %err, packet=?hex::encode(&data), "bad packet");
1786 }
1787 IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => {
1788 trace!(target: "discv4", r#type=?msg.msg_type(), from=?remote_addr,"received packet");
1789 let event = match msg {
1790 Message::Ping(ping) => {
1791 self.on_ping(ping, remote_addr, node_id, hash);
1792 Discv4Event::Ping
1793 }
1794 Message::Pong(pong) => {
1795 self.on_pong(pong, remote_addr, node_id);
1796 Discv4Event::Pong
1797 }
1798 Message::FindNode(msg) => {
1799 self.on_find_node(msg, remote_addr, node_id);
1800 Discv4Event::FindNode
1801 }
1802 Message::Neighbours(msg) => {
1803 self.on_neighbours(msg, remote_addr, node_id);
1804 Discv4Event::Neighbours
1805 }
1806 Message::EnrRequest(msg) => {
1807 self.on_enr_request(msg, remote_addr, node_id, hash);
1808 Discv4Event::EnrRequest
1809 }
1810 Message::EnrResponse(msg) => {
1811 self.on_enr_response(msg, remote_addr, node_id);
1812 Discv4Event::EnrResponse
1813 }
1814 };
1815
1816 self.queued_events.push_back(event);
1817 }
1818 }
1819
1820 udp_message_budget -= 1;
1821 if udp_message_budget < 0 {
1822 trace!(target: "discv4", budget=UDP_MESSAGE_POLL_LOOP_BUDGET, "exhausted message poll budget");
1823 if self.queued_events.is_empty() {
1824 cx.waker().wake_by_ref();
1827 }
1828 break
1829 }
1830 }
1831
1832 self.ping_buffered();
1834
1835 while self.evict_expired_requests_interval.poll_tick(cx).is_ready() {
1837 self.evict_expired_requests(Instant::now());
1838 }
1839
1840 while self.expire_interval.poll_tick(cx).is_ready() {
1842 self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION);
1843 }
1844
1845 if self.queued_events.is_empty() {
1846 return Poll::Pending
1847 }
1848 }
1849 }
1850}
1851
1852impl Stream for Discv4Service {
1854 type Item = Discv4Event;
1855
1856 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1857 match ready!(self.get_mut().poll(cx)) {
1859 Discv4Event::Terminated => Poll::Ready(None),
1861 ev => Poll::Ready(Some(ev)),
1863 }
1864 }
1865}
1866
1867impl fmt::Debug for Discv4Service {
1868 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1869 f.debug_struct("Discv4Service")
1870 .field("local_address", &self.local_address)
1871 .field("local_peer_id", &self.local_peer_id())
1872 .field("local_node_record", &self.local_node_record)
1873 .field("queued_pings", &self.queued_pings)
1874 .field("pending_lookup", &self.pending_lookup)
1875 .field("pending_find_nodes", &self.pending_find_nodes)
1876 .field("lookup_interval", &self.lookup_interval)
1877 .finish_non_exhaustive()
1878 }
1879}
1880
1881#[derive(Debug, Eq, PartialEq)]
1885pub enum Discv4Event {
1886 Ping,
1888 Pong,
1890 FindNode,
1892 Neighbours,
1894 EnrRequest,
1896 EnrResponse,
1898 Terminated,
1900}
1901
1902pub(crate) async fn send_loop(udp: Arc<UdpSocket>, rx: EgressReceiver) {
1904 let mut stream = ReceiverStream::new(rx);
1905 while let Some((payload, to)) = stream.next().await {
1906 match udp.send_to(&payload, to).await {
1907 Ok(size) => {
1908 trace!(target: "discv4", ?to, ?size,"sent payload");
1909 }
1910 Err(err) => {
1911 debug!(target: "discv4", ?to, %err,"Failed to send datagram.");
1912 }
1913 }
1914 }
1915}
1916
1917const MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP: usize = 60usize;
1919
1920pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_id: PeerId) {
1925 let send = |event: IngressEvent| async {
1926 let _ = tx.send(event).await.map_err(|err| {
1927 debug!(
1928 target: "discv4",
1929 %err,
1930 "failed send incoming packet",
1931 )
1932 });
1933 };
1934
1935 let mut cache = ReceiveCache::default();
1936
1937 let tick = MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP / 2;
1939 let mut interval = tokio::time::interval(Duration::from_secs(tick as u64));
1940
1941 let mut buf = [0; MAX_PACKET_SIZE];
1942 loop {
1943 let res = udp.recv_from(&mut buf).await;
1944 match res {
1945 Err(err) => {
1946 debug!(target: "discv4", %err, "Failed to read datagram.");
1947 send(IngressEvent::RecvError(err)).await;
1948 }
1949 Ok((read, remote_addr)) => {
1950 if cache.inc_ip(remote_addr.ip()) > MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP {
1952 trace!(target: "discv4", ?remote_addr, "Too many incoming packets from IP.");
1953 continue
1954 }
1955
1956 let packet = &buf[..read];
1957 match Message::decode(packet) {
1958 Ok(packet) => {
1959 if packet.node_id == local_id {
1960 debug!(target: "discv4", ?remote_addr, "Received own packet.");
1962 continue
1963 }
1964
1965 if cache.contains_packet(packet.hash) {
1967 debug!(target: "discv4", ?remote_addr, "Received duplicate packet.");
1968 continue
1969 }
1970
1971 send(IngressEvent::Packet(remote_addr, packet)).await;
1972 }
1973 Err(err) => {
1974 trace!(target: "discv4", %err,"Failed to decode packet");
1975 send(IngressEvent::BadPacket(remote_addr, err, packet.to_vec())).await
1976 }
1977 }
1978 }
1979 }
1980
1981 if poll_fn(|cx| match interval.poll_tick(cx) {
1983 Poll::Ready(_) => Poll::Ready(true),
1984 Poll::Pending => Poll::Ready(false),
1985 })
1986 .await
1987 {
1988 cache.tick_ips(tick);
1989 }
1990 }
1991}
1992
1993struct ReceiveCache {
1997 ip_messages: HashMap<IpAddr, usize>,
2003 unique_packets: schnellru::LruMap<B256, ()>,
2005}
2006
2007impl ReceiveCache {
2008 fn tick_ips(&mut self, tick: usize) {
2012 self.ip_messages.retain(|_, count| {
2013 if let Some(reset) = count.checked_sub(tick) {
2014 *count = reset;
2015 true
2016 } else {
2017 false
2018 }
2019 });
2020 }
2021
2022 fn inc_ip(&mut self, ip: IpAddr) -> usize {
2024 let ctn = self.ip_messages.entry(ip).or_default();
2025 *ctn = ctn.saturating_add(1);
2026 *ctn
2027 }
2028
2029 fn contains_packet(&mut self, hash: B256) -> bool {
2031 !self.unique_packets.insert(hash, ())
2032 }
2033}
2034
2035impl Default for ReceiveCache {
2036 fn default() -> Self {
2037 Self {
2038 ip_messages: Default::default(),
2039 unique_packets: schnellru::LruMap::new(schnellru::ByLength::new(32)),
2040 }
2041 }
2042}
2043
2044enum Discv4Command {
2046 Add(NodeRecord),
2047 SetTcpPort(u16),
2048 SetEIP868RLPPair { key: Vec<u8>, rlp: Bytes },
2049 Ban(PeerId, IpAddr),
2050 BanPeer(PeerId),
2051 BanIp(IpAddr),
2052 Remove(PeerId),
2053 Lookup { node_id: Option<PeerId>, tx: Option<NodeRecordSender> },
2054 SetLookupInterval(Duration),
2055 Updates(OneshotSender<ReceiverStream<DiscoveryUpdate>>),
2056 Terminated,
2057}
2058
2059#[derive(Debug)]
2061pub(crate) enum IngressEvent {
2062 RecvError(io::Error),
2064 BadPacket(SocketAddr, DecodePacketError, Vec<u8>),
2066 Packet(SocketAddr, Packet),
2068}
2069
2070#[derive(Debug)]
2072struct PingRequest {
2073 sent_at: Instant,
2075 node: NodeRecord,
2077 echo_hash: B256,
2079 reason: PingReason,
2081}
2082
2083#[derive(Debug)]
2087struct LookupTargetRotator {
2088 interval: usize,
2089 counter: usize,
2090}
2091
2092impl LookupTargetRotator {
2095 const fn local_only() -> Self {
2097 Self { interval: 1, counter: 0 }
2098 }
2099}
2100
2101impl Default for LookupTargetRotator {
2102 fn default() -> Self {
2103 Self {
2104 interval: 4,
2106 counter: 3,
2107 }
2108 }
2109}
2110
2111impl LookupTargetRotator {
2112 fn next(&mut self, local: &PeerId) -> PeerId {
2114 self.counter += 1;
2115 self.counter %= self.interval;
2116 if self.counter == 0 {
2117 return *local
2118 }
2119 PeerId::random()
2120 }
2121}
2122
2123#[derive(Clone, Debug)]
2128struct LookupContext {
2129 inner: Rc<LookupContextInner>,
2130}
2131
2132impl LookupContext {
2133 fn new(
2135 target: discv5::Key<NodeKey>,
2136 nearest_nodes: impl IntoIterator<Item = (Distance, NodeRecord)>,
2137 listener: Option<NodeRecordSender>,
2138 ) -> Self {
2139 let closest_nodes = nearest_nodes
2140 .into_iter()
2141 .map(|(distance, record)| {
2142 (distance, QueryNode { record, queried: false, responded: false })
2143 })
2144 .collect();
2145
2146 let inner = Rc::new(LookupContextInner {
2147 target,
2148 closest_nodes: RefCell::new(closest_nodes),
2149 listener,
2150 });
2151 Self { inner }
2152 }
2153
2154 fn target(&self) -> PeerId {
2156 self.inner.target.preimage().0
2157 }
2158
2159 fn closest(&self, num: usize) -> Vec<NodeRecord> {
2160 self.inner
2161 .closest_nodes
2162 .borrow()
2163 .iter()
2164 .filter(|(_, node)| !node.queried)
2165 .map(|(_, n)| n.record)
2166 .take(num)
2167 .collect()
2168 }
2169
2170 fn filter_closest<P>(&self, num: usize, filter: P) -> Vec<NodeRecord>
2172 where
2173 P: FnMut(&NodeRecord) -> bool,
2174 {
2175 self.inner
2176 .closest_nodes
2177 .borrow()
2178 .iter()
2179 .filter(|(_, node)| !node.queried)
2180 .map(|(_, n)| n.record)
2181 .filter(filter)
2182 .take(num)
2183 .collect()
2184 }
2185
2186 fn add_node(&self, record: NodeRecord) {
2188 let distance = self.inner.target.distance(&kad_key(record.id));
2189 let mut closest = self.inner.closest_nodes.borrow_mut();
2190 if let btree_map::Entry::Vacant(entry) = closest.entry(distance) {
2191 entry.insert(QueryNode { record, queried: false, responded: false });
2192 }
2193 }
2194
2195 fn set_queried(&self, id: PeerId, val: bool) {
2196 if let Some((_, node)) =
2197 self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2198 {
2199 node.queried = val;
2200 }
2201 }
2202
2203 fn mark_queried(&self, id: PeerId) {
2205 self.set_queried(id, true)
2206 }
2207
2208 fn unmark_queried(&self, id: PeerId) {
2210 self.set_queried(id, false)
2211 }
2212
2213 fn mark_responded(&self, id: PeerId) {
2215 if let Some((_, node)) =
2216 self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2217 {
2218 node.responded = true;
2219 }
2220 }
2221}
2222
2223unsafe impl Send for LookupContext {}
2230#[derive(Debug)]
2231struct LookupContextInner {
2232 target: discv5::Key<NodeKey>,
2234 closest_nodes: RefCell<BTreeMap<Distance, QueryNode>>,
2236 listener: Option<NodeRecordSender>,
2241}
2242
2243impl Drop for LookupContextInner {
2244 fn drop(&mut self) {
2245 if let Some(tx) = self.listener.take() {
2246 let nodes = self
2249 .closest_nodes
2250 .take()
2251 .into_values()
2252 .filter(|node| node.responded)
2253 .map(|node| node.record)
2254 .collect();
2255 let _ = tx.send(nodes);
2256 }
2257 }
2258}
2259
2260#[derive(Debug, Clone, Copy)]
2262struct QueryNode {
2263 record: NodeRecord,
2264 queried: bool,
2265 responded: bool,
2266}
2267
2268#[derive(Debug)]
2269struct FindNodeRequest {
2270 sent_at: Instant,
2272 response_count: usize,
2274 answered: bool,
2276 lookup_context: LookupContext,
2278}
2279
2280impl FindNodeRequest {
2283 fn new(resp: LookupContext) -> Self {
2284 Self { sent_at: Instant::now(), response_count: 0, answered: false, lookup_context: resp }
2285 }
2286}
2287
2288#[derive(Debug)]
2289struct EnrRequestState {
2290 sent_at: Instant,
2292 echo_hash: B256,
2294}
2295
2296#[derive(Debug, Clone, Eq, PartialEq)]
2298struct NodeEntry {
2299 record: NodeRecord,
2301 last_seen: Instant,
2303 last_enr_seq: Option<u64>,
2305 fork_id: Option<ForkId>,
2307 find_node_failures: u8,
2309 has_endpoint_proof: bool,
2311}
2312
2313impl NodeEntry {
2316 fn new(record: NodeRecord) -> Self {
2318 Self {
2319 record,
2320 last_seen: Instant::now(),
2321 last_enr_seq: None,
2322 fork_id: None,
2323 find_node_failures: 0,
2324 has_endpoint_proof: false,
2325 }
2326 }
2327
2328 #[cfg(test)]
2329 fn new_proven(record: NodeRecord) -> Self {
2330 let mut node = Self::new(record);
2331 node.has_endpoint_proof = true;
2332 node
2333 }
2334
2335 const fn establish_proof(&mut self) {
2337 self.has_endpoint_proof = true;
2338 self.find_node_failures = 0;
2339 }
2340
2341 const fn exceeds_find_node_failures(&self, max_failures: u8) -> bool {
2343 self.find_node_failures >= max_failures
2344 }
2345
2346 fn update_with_enr(&mut self, last_enr_seq: Option<u64>) -> Option<u64> {
2348 self.update_now(|s| std::mem::replace(&mut s.last_enr_seq, last_enr_seq))
2349 }
2350
2351 const fn inc_failed_request(&mut self) {
2353 self.find_node_failures += 1;
2354 }
2355
2356 fn update_with_fork_id(&mut self, fork_id: Option<ForkId>) -> Option<ForkId> {
2358 self.update_now(|s| std::mem::replace(&mut s.fork_id, fork_id))
2359 }
2360
2361 fn update_now<F, R>(&mut self, f: F) -> R
2363 where
2364 F: FnOnce(&mut Self) -> R,
2365 {
2366 self.last_seen = Instant::now();
2367 f(self)
2368 }
2369}
2370
2371impl NodeEntry {
2374 fn is_expired(&self) -> bool {
2376 self.last_seen.elapsed() > (ENDPOINT_PROOF_EXPIRATION / 2)
2377 }
2378}
2379
2380#[derive(Debug)]
2382enum PingReason {
2383 InitialInsert,
2385 EstablishBond,
2387 RePing,
2389 Lookup(NodeRecord, LookupContext),
2391}
2392
2393#[derive(Debug, Clone)]
2395pub enum DiscoveryUpdate {
2396 Added(NodeRecord),
2398 DiscoveredAtCapacity(NodeRecord),
2400 EnrForkId(NodeRecord, ForkId),
2402 Removed(PeerId),
2404 Batch(Vec<DiscoveryUpdate>),
2406}
2407
2408#[cfg(test)]
2409mod tests {
2410 use super::*;
2411 use crate::test_utils::{create_discv4, create_discv4_with_config, rng_endpoint, rng_record};
2412 use alloy_primitives::hex;
2413 use alloy_rlp::{Decodable, Encodable};
2414 use rand_08::Rng;
2415 use reth_ethereum_forks::{EnrForkIdEntry, ForkHash};
2416 use reth_network_peers::mainnet_nodes;
2417 use std::future::poll_fn;
2418
2419 #[tokio::test]
2420 async fn test_configured_enr_forkid_entry() {
2421 let fork: ForkId = ForkId { hash: ForkHash([220, 233, 108, 45]), next: 0u64 };
2422 let mut disc_conf = Discv4Config::default();
2423 disc_conf.add_eip868_pair("eth", EnrForkIdEntry::from(fork));
2424 let (_discv4, service) = create_discv4_with_config(disc_conf).await;
2425 let eth = service.local_eip_868_enr.get_raw_rlp(b"eth").unwrap();
2426 let fork_entry_id = EnrForkIdEntry::decode(&mut ð[..]).unwrap();
2427
2428 let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2429 let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2430 let expected = EnrForkIdEntry {
2431 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2432 };
2433 assert_eq!(expected, fork_entry_id);
2434 assert_eq!(expected, decoded);
2435 }
2436
2437 #[test]
2438 fn test_enr_forkid_entry_decode() {
2439 let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2440 let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2441 let expected = EnrForkIdEntry {
2442 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2443 };
2444 assert_eq!(expected, decoded);
2445 }
2446
2447 #[test]
2448 fn test_enr_forkid_entry_encode() {
2449 let original = EnrForkIdEntry {
2450 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2451 };
2452 let expected: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2453 let mut encoded = Vec::with_capacity(expected.len());
2454 original.encode(&mut encoded);
2455 assert_eq!(&expected[..], encoded.as_slice());
2456 }
2457
2458 #[test]
2459 fn test_local_rotator() {
2460 let id = PeerId::random();
2461 let mut rotator = LookupTargetRotator::local_only();
2462 assert_eq!(rotator.next(&id), id);
2463 assert_eq!(rotator.next(&id), id);
2464 }
2465
2466 #[test]
2467 fn test_rotator() {
2468 let id = PeerId::random();
2469 let mut rotator = LookupTargetRotator::default();
2470 assert_eq!(rotator.next(&id), id);
2471 assert_ne!(rotator.next(&id), id);
2472 assert_ne!(rotator.next(&id), id);
2473 assert_ne!(rotator.next(&id), id);
2474 assert_eq!(rotator.next(&id), id);
2475 }
2476
2477 #[tokio::test]
2478 async fn test_pending_ping() {
2479 let (_, mut service) = create_discv4().await;
2480
2481 let local_addr = service.local_addr();
2482
2483 let mut num_inserted = 0;
2484 loop {
2485 let node = NodeRecord::new(local_addr, PeerId::random());
2486 if service.add_node(node) {
2487 num_inserted += 1;
2488 assert!(service.pending_pings.contains_key(&node.id));
2489 assert_eq!(service.pending_pings.len(), num_inserted);
2490 if num_inserted == MAX_NODES_PING {
2491 break
2492 }
2493 }
2494 }
2495
2496 num_inserted = 0;
2498 for _ in 0..MAX_NODES_PING {
2499 let node = NodeRecord::new(local_addr, PeerId::random());
2500 if service.add_node(node) {
2501 num_inserted += 1;
2502 assert!(!service.pending_pings.contains_key(&node.id));
2503 assert_eq!(service.pending_pings.len(), MAX_NODES_PING);
2504 assert_eq!(service.queued_pings.len(), num_inserted);
2505 }
2506 }
2507 }
2508
2509 #[tokio::test(flavor = "multi_thread")]
2511 #[ignore]
2512 async fn test_mainnet_lookup() {
2513 reth_tracing::init_test_tracing();
2514 let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2515
2516 let all_nodes = mainnet_nodes();
2517 let config = Discv4Config::builder()
2518 .add_boot_nodes(all_nodes)
2519 .lookup_interval(Duration::from_secs(1))
2520 .add_eip868_pair("eth", fork_id)
2521 .build();
2522 let (_discv4, mut service) = create_discv4_with_config(config).await;
2523
2524 let mut updates = service.update_stream();
2525
2526 let _handle = service.spawn();
2527
2528 let mut table = HashMap::new();
2529 while let Some(update) = updates.next().await {
2530 match update {
2531 DiscoveryUpdate::EnrForkId(record, fork_id) => {
2532 println!("{record:?}, {fork_id:?}");
2533 }
2534 DiscoveryUpdate::Added(record) => {
2535 table.insert(record.id, record);
2536 }
2537 DiscoveryUpdate::Removed(id) => {
2538 table.remove(&id);
2539 }
2540 _ => {}
2541 }
2542 println!("total peers {}", table.len());
2543 }
2544 }
2545
2546 #[tokio::test]
2547 async fn test_mapped_ipv4() {
2548 reth_tracing::init_test_tracing();
2549 let mut rng = rand_08::thread_rng();
2550 let config = Discv4Config::builder().build();
2551 let (_discv4, mut service) = create_discv4_with_config(config).await;
2552
2553 let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2554 let v6 = v4.to_ipv6_mapped();
2555 let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2556
2557 let ping = Ping {
2558 from: rng_endpoint(&mut rng),
2559 to: rng_endpoint(&mut rng),
2560 expire: service.ping_expiration(),
2561 enr_sq: Some(rng.r#gen()),
2562 };
2563
2564 let id = PeerId::random();
2565 service.on_ping(ping, addr, id, B256::random());
2566
2567 let key = kad_key(id);
2568 match service.kbuckets.entry(&key) {
2569 kbucket::Entry::Present(entry, _) => {
2570 let node_addr = entry.value().record.address;
2571 assert!(node_addr.is_ipv4());
2572 assert_eq!(node_addr, IpAddr::from(v4));
2573 }
2574 _ => unreachable!(),
2575 };
2576 }
2577
2578 #[tokio::test]
2579 async fn test_respect_ping_expiration() {
2580 reth_tracing::init_test_tracing();
2581 let mut rng = rand_08::thread_rng();
2582 let config = Discv4Config::builder().build();
2583 let (_discv4, mut service) = create_discv4_with_config(config).await;
2584
2585 let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2586 let v6 = v4.to_ipv6_mapped();
2587 let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2588
2589 let ping = Ping {
2590 from: rng_endpoint(&mut rng),
2591 to: rng_endpoint(&mut rng),
2592 expire: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() - 1,
2593 enr_sq: Some(rng.r#gen()),
2594 };
2595
2596 let id = PeerId::random();
2597 service.on_ping(ping, addr, id, B256::random());
2598
2599 let key = kad_key(id);
2600 match service.kbuckets.entry(&key) {
2601 kbucket::Entry::Absent(_) => {}
2602 _ => unreachable!(),
2603 };
2604 }
2605
2606 #[tokio::test]
2607 async fn test_single_lookups() {
2608 reth_tracing::init_test_tracing();
2609
2610 let config = Discv4Config::builder().build();
2611 let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2612
2613 let id = PeerId::random();
2614 let key = kad_key(id);
2615 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2616
2617 let _ = service.kbuckets.insert_or_update(
2618 &key,
2619 NodeEntry::new_proven(record),
2620 NodeStatus {
2621 direction: ConnectionDirection::Incoming,
2622 state: ConnectionState::Connected,
2623 },
2624 );
2625
2626 service.lookup_self();
2627 assert_eq!(service.pending_find_nodes.len(), 1);
2628
2629 poll_fn(|cx| {
2630 let _ = service.poll(cx);
2631 assert_eq!(service.pending_find_nodes.len(), 1);
2632
2633 Poll::Ready(())
2634 })
2635 .await;
2636 }
2637
2638 #[tokio::test]
2639 async fn test_on_neighbours_recursive_lookup() {
2640 reth_tracing::init_test_tracing();
2641
2642 let config = Discv4Config::builder().build();
2643 let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2644 let (_discv4, mut service2) = create_discv4_with_config(config).await;
2645
2646 let id = PeerId::random();
2647 let key = kad_key(id);
2648 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2649
2650 let _ = service.kbuckets.insert_or_update(
2651 &key,
2652 NodeEntry::new_proven(record),
2653 NodeStatus {
2654 direction: ConnectionDirection::Incoming,
2655 state: ConnectionState::Connected,
2656 },
2657 );
2658 service.lookup_self();
2661 assert_eq!(service.pending_find_nodes.len(), 1);
2662
2663 poll_fn(|cx| {
2664 let _ = service.poll(cx);
2665 assert_eq!(service.pending_find_nodes.len(), 1);
2666
2667 Poll::Ready(())
2668 })
2669 .await;
2670
2671 let expiry = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() +
2672 10000000000000;
2673 let msg = Neighbours { nodes: vec![service2.local_node_record], expire: expiry };
2674 service.on_neighbours(msg, record.tcp_addr(), id);
2675 let event = poll_fn(|cx| service2.poll(cx)).await;
2677 assert_eq!(event, Discv4Event::Ping);
2678 assert_eq!(service.pending_find_nodes.len(), 1);
2681 let event = poll_fn(|cx| service.poll(cx)).await;
2683 assert_eq!(event, Discv4Event::Pong);
2684 let event = poll_fn(|cx| service.poll(cx)).await;
2689 assert_eq!(event, Discv4Event::Ping);
2690 assert_eq!(service.pending_find_nodes.len(), 2);
2693 }
2694
2695 #[tokio::test]
2696 async fn test_no_local_in_closest() {
2697 reth_tracing::init_test_tracing();
2698
2699 let config = Discv4Config::builder().build();
2700 let (_discv4, mut service) = create_discv4_with_config(config).await;
2701
2702 let target_key = kad_key(PeerId::random());
2703
2704 let id = PeerId::random();
2705 let key = kad_key(id);
2706 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2707
2708 let _ = service.kbuckets.insert_or_update(
2709 &key,
2710 NodeEntry::new(record),
2711 NodeStatus {
2712 direction: ConnectionDirection::Incoming,
2713 state: ConnectionState::Connected,
2714 },
2715 );
2716
2717 let closest = service
2718 .kbuckets
2719 .closest_values(&target_key)
2720 .map(|n| n.value.record)
2721 .take(MAX_NODES_PER_BUCKET)
2722 .collect::<Vec<_>>();
2723
2724 assert_eq!(closest.len(), 1);
2725 assert!(!closest.iter().any(|r| r.id == *service.local_peer_id()));
2726 }
2727
2728 #[tokio::test]
2729 async fn test_random_lookup() {
2730 reth_tracing::init_test_tracing();
2731
2732 let config = Discv4Config::builder().build();
2733 let (_discv4, mut service) = create_discv4_with_config(config).await;
2734
2735 let target = PeerId::random();
2736
2737 let id = PeerId::random();
2738 let key = kad_key(id);
2739 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2740
2741 let _ = service.kbuckets.insert_or_update(
2742 &key,
2743 NodeEntry::new_proven(record),
2744 NodeStatus {
2745 direction: ConnectionDirection::Incoming,
2746 state: ConnectionState::Connected,
2747 },
2748 );
2749
2750 service.lookup(target);
2751 assert_eq!(service.pending_find_nodes.len(), 1);
2752
2753 let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2754
2755 assert_eq!(ctx.target(), target);
2756 assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2757
2758 ctx.add_node(record);
2759 assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2760 }
2761
2762 #[tokio::test]
2763 async fn test_reping_on_find_node_failures() {
2764 reth_tracing::init_test_tracing();
2765
2766 let config = Discv4Config::builder().build();
2767 let (_discv4, mut service) = create_discv4_with_config(config).await;
2768
2769 let target = PeerId::random();
2770
2771 let id = PeerId::random();
2772 let key = kad_key(id);
2773 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2774
2775 let mut entry = NodeEntry::new_proven(record);
2776 entry.find_node_failures = u8::MAX;
2777 let _ = service.kbuckets.insert_or_update(
2778 &key,
2779 entry,
2780 NodeStatus {
2781 direction: ConnectionDirection::Incoming,
2782 state: ConnectionState::Connected,
2783 },
2784 );
2785
2786 service.lookup(target);
2787 assert_eq!(service.pending_find_nodes.len(), 0);
2788 assert_eq!(service.pending_pings.len(), 1);
2789
2790 service.update_on_pong(record, None);
2791
2792 service
2793 .on_entry(record.id, |entry| {
2794 assert_eq!(entry.find_node_failures, 0);
2796 assert!(entry.has_endpoint_proof);
2797 })
2798 .unwrap();
2799 }
2800
2801 #[tokio::test]
2802 async fn test_service_commands() {
2803 reth_tracing::init_test_tracing();
2804
2805 let config = Discv4Config::builder().build();
2806 let (discv4, mut service) = create_discv4_with_config(config).await;
2807
2808 service.lookup_self();
2809
2810 let _handle = service.spawn();
2811 discv4.send_lookup_self();
2812 let _ = discv4.lookup_self().await;
2813 }
2814
2815 #[tokio::test]
2816 async fn test_requests_timeout() {
2817 reth_tracing::init_test_tracing();
2818 let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2819
2820 let config = Discv4Config::builder()
2821 .request_timeout(Duration::from_millis(200))
2822 .ping_expiration(Duration::from_millis(200))
2823 .lookup_neighbours_expiration(Duration::from_millis(200))
2824 .add_eip868_pair("eth", fork_id)
2825 .build();
2826 let (_disv4, mut service) = create_discv4_with_config(config).await;
2827
2828 let id = PeerId::random();
2829 let key = kad_key(id);
2830 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2831
2832 let _ = service.kbuckets.insert_or_update(
2833 &key,
2834 NodeEntry::new_proven(record),
2835 NodeStatus {
2836 direction: ConnectionDirection::Incoming,
2837 state: ConnectionState::Connected,
2838 },
2839 );
2840
2841 service.lookup_self();
2842 assert_eq!(service.pending_find_nodes.len(), 1);
2843
2844 let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2845
2846 service.pending_lookup.insert(record.id, (Instant::now(), ctx));
2847
2848 assert_eq!(service.pending_lookup.len(), 1);
2849
2850 let ping = Ping {
2851 from: service.local_node_record.into(),
2852 to: record.into(),
2853 expire: service.ping_expiration(),
2854 enr_sq: service.enr_seq(),
2855 };
2856 let echo_hash = service.send_packet(Message::Ping(ping), record.udp_addr());
2857 let ping_request = PingRequest {
2858 sent_at: Instant::now(),
2859 node: record,
2860 echo_hash,
2861 reason: PingReason::InitialInsert,
2862 };
2863 service.pending_pings.insert(record.id, ping_request);
2864
2865 assert_eq!(service.pending_pings.len(), 1);
2866
2867 tokio::time::sleep(Duration::from_secs(1)).await;
2868
2869 poll_fn(|cx| {
2870 let _ = service.poll(cx);
2871
2872 assert_eq!(service.pending_find_nodes.len(), 0);
2873 assert_eq!(service.pending_lookup.len(), 0);
2874 assert_eq!(service.pending_pings.len(), 0);
2875
2876 Poll::Ready(())
2877 })
2878 .await;
2879 }
2880
2881 #[tokio::test(flavor = "multi_thread")]
2883 async fn test_check_wrong_to() {
2884 reth_tracing::init_test_tracing();
2885
2886 let config = Discv4Config::builder().external_ip_resolver(None).build();
2887 let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2888 let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2889
2890 let mut ping = Ping {
2892 from: service_1.local_node_record.into(),
2893 to: service_2.local_node_record.into(),
2894 expire: service_1.ping_expiration(),
2895 enr_sq: service_1.enr_seq(),
2896 };
2897 ping.to.address = "192.0.2.0".parse().unwrap();
2898
2899 let echo_hash = service_1.send_packet(Message::Ping(ping), service_2.local_addr());
2900 let ping_request = PingRequest {
2901 sent_at: Instant::now(),
2902 node: service_2.local_node_record,
2903 echo_hash,
2904 reason: PingReason::InitialInsert,
2905 };
2906 service_1.pending_pings.insert(*service_2.local_peer_id(), ping_request);
2907
2908 let event = poll_fn(|cx| service_2.poll(cx)).await;
2910 assert_eq!(event, Discv4Event::Ping);
2911
2912 let event = poll_fn(|cx| service_1.poll(cx)).await;
2914 assert_eq!(event, Discv4Event::Pong);
2915 let event = poll_fn(|cx| service_1.poll(cx)).await;
2917 assert_eq!(event, Discv4Event::Ping);
2918 }
2919
2920 #[tokio::test(flavor = "multi_thread")]
2921 async fn test_check_ping_pong() {
2922 reth_tracing::init_test_tracing();
2923
2924 let config = Discv4Config::builder().external_ip_resolver(None).build();
2925 let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2926 let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2927
2928 service_1.add_node(service_2.local_node_record);
2930
2931 let event = poll_fn(|cx| service_2.poll(cx)).await;
2933 assert_eq!(event, Discv4Event::Ping);
2934
2935 let key1 = kad_key(*service_1.local_peer_id());
2937 match service_2.kbuckets.entry(&key1) {
2938 kbucket::Entry::Present(_entry, status) => {
2939 assert!(!status.is_connected());
2940 }
2941 _ => unreachable!(),
2942 }
2943
2944 let event = poll_fn(|cx| service_1.poll(cx)).await;
2946 assert_eq!(event, Discv4Event::Pong);
2947
2948 let key2 = kad_key(*service_2.local_peer_id());
2950 match service_1.kbuckets.entry(&key2) {
2951 kbucket::Entry::Present(_entry, status) => {
2952 assert!(status.is_connected());
2953 }
2954 _ => unreachable!(),
2955 }
2956
2957 let event = poll_fn(|cx| service_1.poll(cx)).await;
2959 assert_eq!(event, Discv4Event::Ping);
2960
2961 let event = poll_fn(|cx| service_2.poll(cx)).await;
2963
2964 match event {
2965 Discv4Event::EnrRequest => {
2966 let event = poll_fn(|cx| service_2.poll(cx)).await;
2968 match event {
2969 Discv4Event::EnrRequest => {
2970 let event = poll_fn(|cx| service_2.poll(cx)).await;
2971 assert_eq!(event, Discv4Event::Pong);
2972 }
2973 Discv4Event::Pong => {}
2974 _ => {
2975 unreachable!()
2976 }
2977 }
2978 }
2979 Discv4Event::Pong => {}
2980 ev => unreachable!("{ev:?}"),
2981 }
2982
2983 match service_2.kbuckets.entry(&key1) {
2985 kbucket::Entry::Present(_entry, status) => {
2986 assert!(status.is_connected());
2987 }
2988 ev => unreachable!("{ev:?}"),
2989 }
2990 }
2991
2992 #[test]
2993 fn test_insert() {
2994 let local_node_record = rng_record(&mut rand_08::thread_rng());
2995 let mut kbuckets: KBucketsTable<NodeKey, NodeEntry> = KBucketsTable::new(
2996 NodeKey::from(&local_node_record).into(),
2997 Duration::from_secs(60),
2998 MAX_NODES_PER_BUCKET,
2999 None,
3000 None,
3001 );
3002
3003 let new_record = rng_record(&mut rand_08::thread_rng());
3004 let key = kad_key(new_record.id);
3005 match kbuckets.entry(&key) {
3006 kbucket::Entry::Absent(entry) => {
3007 let node = NodeEntry::new(new_record);
3008 let _ = entry.insert(
3009 node,
3010 NodeStatus {
3011 direction: ConnectionDirection::Outgoing,
3012 state: ConnectionState::Disconnected,
3013 },
3014 );
3015 }
3016 _ => {
3017 unreachable!()
3018 }
3019 };
3020 match kbuckets.entry(&key) {
3021 kbucket::Entry::Present(_, _) => {}
3022 _ => {
3023 unreachable!()
3024 }
3025 }
3026 }
3027
3028 #[tokio::test]
3029 async fn test_bootnode_not_in_update_stream() {
3030 reth_tracing::init_test_tracing();
3031 let (_, service_1) = create_discv4().await;
3032 let peerid_1 = *service_1.local_peer_id();
3033
3034 let config = Discv4Config::builder().add_boot_node(service_1.local_node_record).build();
3035 service_1.spawn();
3036
3037 let (_, mut service_2) = create_discv4_with_config(config).await;
3038
3039 let mut updates = service_2.update_stream();
3040
3041 service_2.spawn();
3042
3043 let mut bootnode_appeared = false;
3045 let timeout = tokio::time::sleep(Duration::from_secs(1));
3046 tokio::pin!(timeout);
3047
3048 loop {
3049 tokio::select! {
3050 Some(update) = updates.next() => {
3051 if let DiscoveryUpdate::Added(record) = update {
3052 if record.id == peerid_1 {
3053 bootnode_appeared = true;
3054 break;
3055 }
3056 }
3057 }
3058 _ = &mut timeout => break,
3059 }
3060 }
3061
3062 assert!(bootnode_appeared, "Bootnode should appear in update stream");
3064 }
3065}