1#![doc(
20 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
21 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
22 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
23)]
24#![cfg_attr(not(test), warn(unused_crate_dependencies))]
25#![cfg_attr(docsrs, feature(doc_cfg))]
26
27use crate::{
28 error::{DecodePacketError, Discv4Error},
29 proto::{FindNode, Message, Neighbours, Packet, Ping, Pong},
30};
31use alloy_primitives::{bytes::Bytes, hex, B256};
32use discv5::{
33 kbucket,
34 kbucket::{
35 BucketInsertResult, Distance, Entry as BucketEntry, InsertResult, KBucketsTable,
36 NodeStatus, MAX_NODES_PER_BUCKET,
37 },
38 ConnectionDirection, ConnectionState,
39};
40use enr::Enr;
41use itertools::Itertools;
42use parking_lot::Mutex;
43use proto::{EnrRequest, EnrResponse};
44use reth_ethereum_forks::ForkId;
45use reth_network_peers::{pk2id, PeerId};
46use secp256k1::SecretKey;
47use std::{
48 cell::RefCell,
49 collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque},
50 fmt,
51 future::poll_fn,
52 io,
53 net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
54 pin::Pin,
55 rc::Rc,
56 sync::Arc,
57 task::{ready, Context, Poll},
58 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
59};
60use tokio::{
61 net::UdpSocket,
62 sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::Sender as OneshotSender},
63 task::{JoinHandle, JoinSet},
64 time::Interval,
65};
66use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
67use tracing::{debug, trace};
68
69pub mod error;
70pub mod proto;
71
72mod config;
73pub use config::{Discv4Config, Discv4ConfigBuilder};
74
75mod node;
76use node::{kad_key, NodeKey};
77
78mod table;
79
80pub use reth_network_peers::NodeRecord;
82
83#[cfg(any(test, feature = "test-utils"))]
84pub mod test_utils;
85
86use crate::table::PongTable;
87use reth_net_nat::ResolveNatInterval;
88pub use reth_net_nat::{external_ip, NatResolver};
90
91pub const DEFAULT_DISCOVERY_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
95
96pub const DEFAULT_DISCOVERY_PORT: u16 = 30303;
100
101pub const DEFAULT_DISCOVERY_ADDRESS: SocketAddr =
105 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT));
106
107const MAX_PACKET_SIZE: usize = 1280;
109
110const MIN_PACKET_SIZE: usize = 32 + 65 + 1;
112
113const ALPHA: usize = 3;
115
116const MAX_NODES_PING: usize = 2 * MAX_NODES_PER_BUCKET;
121
122const MAX_QUEUED_PINGS: usize = 2 * MAX_NODES_PER_BUCKET;
130
131const SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS: usize = (MAX_PACKET_SIZE - 109) / 91;
137
138const ENDPOINT_PROOF_EXPIRATION: Duration = Duration::from_secs(24 * 60 * 60);
142
143const EXPIRE_DURATION: Duration = Duration::from_secs(60 * 60);
145
146const UDP_MESSAGE_POLL_LOOP_BUDGET: i32 = 4;
151
152type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>;
153type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>;
154
155pub(crate) type IngressSender = mpsc::Sender<IngressEvent>;
156pub(crate) type IngressReceiver = mpsc::Receiver<IngressEvent>;
157
158type NodeRecordSender = OneshotSender<Vec<NodeRecord>>;
159
160#[derive(Debug, Clone)]
167pub struct Discv4 {
168 local_addr: SocketAddr,
170 to_service: mpsc::UnboundedSender<Discv4Command>,
172 node_record: Arc<Mutex<NodeRecord>>,
176}
177
178impl Discv4 {
179 pub async fn spawn(
183 local_address: SocketAddr,
184 local_enr: NodeRecord,
185 secret_key: SecretKey,
186 config: Discv4Config,
187 ) -> io::Result<Self> {
188 let (discv4, service) = Self::bind(local_address, local_enr, secret_key, config).await?;
189
190 service.spawn();
191
192 Ok(discv4)
193 }
194
195 #[cfg(feature = "test-utils")]
199 pub fn noop() -> Self {
200 let (to_service, _rx) = mpsc::unbounded_channel();
201 let local_addr =
202 (IpAddr::from(std::net::Ipv4Addr::UNSPECIFIED), DEFAULT_DISCOVERY_PORT).into();
203 Self {
204 local_addr,
205 to_service,
206 node_record: Arc::new(Mutex::new(NodeRecord::new(
207 "127.0.0.1:3030".parse().unwrap(),
208 PeerId::random(),
209 ))),
210 }
211 }
212
213 pub async fn bind(
245 local_address: SocketAddr,
246 mut local_node_record: NodeRecord,
247 secret_key: SecretKey,
248 config: Discv4Config,
249 ) -> io::Result<(Self, Discv4Service)> {
250 let socket = UdpSocket::bind(local_address).await?;
251 let local_addr = socket.local_addr()?;
252 local_node_record.udp_port = local_addr.port();
253 trace!(target: "discv4", ?local_addr,"opened UDP socket");
254
255 let mut service =
256 Discv4Service::new(socket, local_addr, local_node_record, secret_key, config);
257
258 service.resolve_external_ip();
260
261 let discv4 = service.handle();
262 Ok((discv4, service))
263 }
264
265 pub const fn local_addr(&self) -> SocketAddr {
267 self.local_addr
268 }
269
270 pub fn node_record(&self) -> NodeRecord {
274 *self.node_record.lock()
275 }
276
277 pub fn external_ip(&self) -> IpAddr {
279 self.node_record.lock().address
280 }
281
282 pub fn set_lookup_interval(&self, duration: Duration) {
284 self.send_to_service(Discv4Command::SetLookupInterval(duration))
285 }
286
287 pub async fn lookup_self(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
302 self.lookup_node(None).await
303 }
304
305 pub async fn lookup(&self, node_id: PeerId) -> Result<Vec<NodeRecord>, Discv4Error> {
309 self.lookup_node(Some(node_id)).await
310 }
311
312 pub async fn lookup_random(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
314 let target = PeerId::random();
315 self.lookup_node(Some(target)).await
316 }
317
318 pub fn send_lookup(&self, node_id: PeerId) {
320 let cmd = Discv4Command::Lookup { node_id: Some(node_id), tx: None };
321 self.send_to_service(cmd);
322 }
323
324 async fn lookup_node(&self, node_id: Option<PeerId>) -> Result<Vec<NodeRecord>, Discv4Error> {
325 let (tx, rx) = oneshot::channel();
326 let cmd = Discv4Command::Lookup { node_id, tx: Some(tx) };
327 self.to_service.send(cmd)?;
328 Ok(rx.await?)
329 }
330
331 pub fn send_lookup_self(&self) {
333 let cmd = Discv4Command::Lookup { node_id: None, tx: None };
334 self.send_to_service(cmd);
335 }
336
337 pub fn remove_peer(&self, node_id: PeerId) {
339 let cmd = Discv4Command::Remove(node_id);
340 self.send_to_service(cmd);
341 }
342
343 pub fn add_node(&self, node_record: NodeRecord) {
345 let cmd = Discv4Command::Add(node_record);
346 self.send_to_service(cmd);
347 }
348
349 pub fn ban(&self, node_id: PeerId, ip: IpAddr) {
353 let cmd = Discv4Command::Ban(node_id, ip);
354 self.send_to_service(cmd);
355 }
356
357 pub fn ban_ip(&self, ip: IpAddr) {
361 let cmd = Discv4Command::BanIp(ip);
362 self.send_to_service(cmd);
363 }
364
365 pub fn ban_node(&self, node_id: PeerId) {
369 let cmd = Discv4Command::BanPeer(node_id);
370 self.send_to_service(cmd);
371 }
372
373 pub fn set_tcp_port(&self, port: u16) {
377 let cmd = Discv4Command::SetTcpPort(port);
378 self.send_to_service(cmd);
379 }
380
381 pub fn set_eip868_rlp_pair(&self, key: Vec<u8>, rlp: Bytes) {
387 let cmd = Discv4Command::SetEIP868RLPPair { key, rlp };
388 self.send_to_service(cmd);
389 }
390
391 pub fn set_eip868_rlp(&self, key: Vec<u8>, value: impl alloy_rlp::Encodable) {
395 self.set_eip868_rlp_pair(key, Bytes::from(alloy_rlp::encode(&value)))
396 }
397
398 #[inline]
399 fn send_to_service(&self, cmd: Discv4Command) {
400 let _ = self.to_service.send(cmd).map_err(|err| {
401 debug!(
402 target: "discv4",
403 %err,
404 "channel capacity reached, dropping command",
405 )
406 });
407 }
408
409 pub async fn update_stream(&self) -> Result<ReceiverStream<DiscoveryUpdate>, Discv4Error> {
411 let (tx, rx) = oneshot::channel();
412 let cmd = Discv4Command::Updates(tx);
413 self.to_service.send(cmd)?;
414 Ok(rx.await?)
415 }
416
417 pub fn terminate(&self) {
419 self.send_to_service(Discv4Command::Terminated);
420 }
421}
422
423#[must_use = "Stream does nothing unless polled"]
437pub struct Discv4Service {
438 local_address: SocketAddr,
440 local_eip_868_enr: Enr<SecretKey>,
442 local_node_record: NodeRecord,
444 shared_node_record: Arc<Mutex<NodeRecord>>,
446 secret_key: SecretKey,
448 _socket: Arc<UdpSocket>,
450 _tasks: JoinSet<()>,
454 kbuckets: KBucketsTable<NodeKey, NodeEntry>,
456 ingress: IngressReceiver,
460 egress: EgressSender,
464 queued_pings: VecDeque<(NodeRecord, PingReason)>,
471 pending_pings: HashMap<PeerId, PingRequest>,
473 pending_lookup: HashMap<PeerId, (Instant, LookupContext)>,
478 pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
480 pending_enr_requests: HashMap<PeerId, EnrRequestState>,
482 to_service: mpsc::UnboundedSender<Discv4Command>,
484 commands_rx: mpsc::UnboundedReceiver<Discv4Command>,
486 update_listeners: Vec<mpsc::Sender<DiscoveryUpdate>>,
488 lookup_interval: Interval,
490 lookup_rotator: LookupTargetRotator,
492 pending_lookup_reset: bool,
494 evict_expired_requests_interval: Interval,
496 ping_interval: Interval,
498 resolve_external_ip_interval: Option<ResolveNatInterval>,
500 config: Discv4Config,
502 queued_events: VecDeque<Discv4Event>,
504 received_pongs: PongTable,
506 expire_interval: Interval,
508 cached_find_node: Option<CachedFindNode>,
510}
511
512impl Discv4Service {
513 pub(crate) fn new(
515 socket: UdpSocket,
516 local_address: SocketAddr,
517 local_node_record: NodeRecord,
518 secret_key: SecretKey,
519 config: Discv4Config,
520 ) -> Self {
521 let socket = Arc::new(socket);
522 let (ingress_tx, ingress_rx) = mpsc::channel(config.udp_ingress_message_buffer);
523 let (egress_tx, egress_rx) = mpsc::channel(config.udp_egress_message_buffer);
524 let mut tasks = JoinSet::<()>::new();
525
526 let udp = Arc::clone(&socket);
527 tasks.spawn(receive_loop(udp, ingress_tx, local_node_record.id));
528
529 let udp = Arc::clone(&socket);
530 tasks.spawn(send_loop(udp, egress_rx));
531
532 let kbuckets = KBucketsTable::new(
533 NodeKey::from(&local_node_record).into(),
534 Duration::from_secs(60),
535 MAX_NODES_PER_BUCKET,
536 None,
537 None,
538 );
539
540 let self_lookup_interval = tokio::time::interval(config.lookup_interval);
541
542 let ping_interval = tokio::time::interval_at(
545 tokio::time::Instant::now() + config.ping_interval,
546 config.ping_interval,
547 );
548
549 let evict_expired_requests_interval = tokio::time::interval_at(
550 tokio::time::Instant::now() + config.request_timeout,
551 config.request_timeout,
552 );
553
554 let lookup_rotator = if config.enable_dht_random_walk {
555 LookupTargetRotator::default()
556 } else {
557 LookupTargetRotator::local_only()
558 };
559
560 let local_eip_868_enr = {
562 let mut builder = Enr::builder();
563 builder.ip(local_node_record.address);
564 if local_node_record.address.is_ipv4() {
565 builder.udp4(local_node_record.udp_port);
566 builder.tcp4(local_node_record.tcp_port);
567 } else {
568 builder.udp6(local_node_record.udp_port);
569 builder.tcp6(local_node_record.tcp_port);
570 }
571
572 for (key, val) in &config.additional_eip868_rlp_pairs {
573 builder.add_value_rlp(key, val.clone());
574 }
575 builder.build(&secret_key).expect("v4 is set")
576 };
577
578 let (to_service, commands_rx) = mpsc::unbounded_channel();
579
580 let shared_node_record = Arc::new(Mutex::new(local_node_record));
581
582 Self {
583 local_address,
584 local_eip_868_enr,
585 local_node_record,
586 shared_node_record,
587 _socket: socket,
588 kbuckets,
589 secret_key,
590 _tasks: tasks,
591 ingress: ingress_rx,
592 egress: egress_tx,
593 queued_pings: VecDeque::with_capacity(MAX_QUEUED_PINGS),
594 pending_pings: Default::default(),
595 pending_lookup: Default::default(),
596 pending_find_nodes: Default::default(),
597 pending_enr_requests: Default::default(),
598 commands_rx,
599 to_service,
600 update_listeners: Vec::with_capacity(1),
601 lookup_interval: self_lookup_interval,
602 ping_interval,
603 evict_expired_requests_interval,
604 lookup_rotator,
605 pending_lookup_reset: config.enable_lookup,
606 resolve_external_ip_interval: config.resolve_external_ip_interval(),
607 config,
608 queued_events: Default::default(),
609 received_pongs: Default::default(),
610 expire_interval: tokio::time::interval(EXPIRE_DURATION),
611 cached_find_node: None,
612 }
613 }
614
615 pub fn handle(&self) -> Discv4 {
617 Discv4 {
618 local_addr: self.local_address,
619 to_service: self.to_service.clone(),
620 node_record: self.shared_node_record.clone(),
621 }
622 }
623
624 fn enr_seq(&self) -> Option<u64> {
626 self.config.enable_eip868.then(|| self.local_eip_868_enr.seq())
627 }
628
629 pub fn set_lookup_interval(&mut self, duration: Duration) {
631 self.lookup_interval = tokio::time::interval(duration);
632 }
633
634 fn resolve_external_ip(&mut self) {
638 if let Some(r) = &self.resolve_external_ip_interval &&
639 let Some(external_ip) =
640 r.resolver().clone().as_external_ip(self.local_node_record.udp_port)
641 {
642 self.set_external_ip_addr(external_ip);
643 }
644 }
645
646 pub fn set_external_ip_addr(&mut self, external_ip: IpAddr) {
649 if self.local_node_record.address != external_ip {
650 debug!(target: "discv4", ?external_ip, "Updating external ip");
651 self.local_node_record.address = external_ip;
652 let _ = self.local_eip_868_enr.set_ip(external_ip, &self.secret_key);
653 let mut lock = self.shared_node_record.lock();
654 *lock = self.local_node_record;
655 debug!(target: "discv4", enr=?self.local_eip_868_enr, "Updated local ENR");
656 }
657 }
658
659 pub const fn local_peer_id(&self) -> &PeerId {
661 &self.local_node_record.id
662 }
663
664 pub const fn local_addr(&self) -> SocketAddr {
666 self.local_address
667 }
668
669 pub const fn local_enr(&self) -> NodeRecord {
673 self.local_node_record
674 }
675
676 #[cfg(test)]
678 pub const fn local_enr_mut(&mut self) -> &mut NodeRecord {
679 &mut self.local_node_record
680 }
681
682 pub fn contains_node(&self, id: PeerId) -> bool {
684 let key = kad_key(id);
685 self.kbuckets.get_index(&key).is_some()
686 }
687
688 pub fn bootstrap(&mut self) {
700 for record in self.config.bootstrap_nodes.clone() {
701 debug!(target: "discv4", ?record, "pinging boot node");
702 let key = kad_key(record.id);
703 let entry = NodeEntry::new(record);
704
705 match self.kbuckets.insert_or_update(
707 &key,
708 entry,
709 NodeStatus {
710 state: ConnectionState::Disconnected,
711 direction: ConnectionDirection::Outgoing,
712 },
713 ) {
714 InsertResult::Failed(_) => {}
715 _ => {
716 self.try_ping(record, PingReason::InitialInsert);
717 }
718 }
719 }
720 }
721
722 pub fn spawn(mut self) -> JoinHandle<()> {
726 tokio::task::spawn(async move {
727 self.bootstrap();
728
729 while let Some(event) = self.next().await {
730 trace!(target: "discv4", ?event, "processed");
731 }
732 trace!(target: "discv4", "service terminated");
733 })
734 }
735
736 pub fn update_stream(&mut self) -> ReceiverStream<DiscoveryUpdate> {
738 let (tx, rx) = mpsc::channel(512);
739 self.update_listeners.push(tx);
740 ReceiverStream::new(rx)
741 }
742
743 pub fn lookup_self(&mut self) {
745 self.lookup(self.local_node_record.id)
746 }
747
748 pub fn lookup(&mut self, target: PeerId) {
758 self.lookup_with(target, None)
759 }
760
761 fn lookup_with(&mut self, target: PeerId, tx: Option<NodeRecordSender>) {
771 trace!(target: "discv4", ?target, "Starting lookup");
772 let target_key = kad_key(target);
773
774 let ctx = LookupContext::new(
777 target_key.clone(),
778 self.kbuckets
779 .closest_values(&target_key)
780 .filter(|node| {
781 node.value.has_endpoint_proof &&
782 !self.pending_find_nodes.contains_key(&node.key.preimage().0)
783 })
784 .take(MAX_NODES_PER_BUCKET)
785 .map(|n| (target_key.distance(&n.key), n.value.record)),
786 tx,
787 );
788
789 let closest = ctx.closest(ALPHA);
791
792 if closest.is_empty() && self.pending_find_nodes.is_empty() {
793 self.bootstrap();
798 return
799 }
800
801 trace!(target: "discv4", ?target, num = closest.len(), "Start lookup closest nodes");
802
803 for node in closest {
804 self.find_node_checked(&node, ctx.clone());
808 }
809 }
810
811 fn find_node(&mut self, node: &NodeRecord, ctx: LookupContext) {
815 trace!(target: "discv4", ?node, lookup=?ctx.target(), "Sending FindNode");
816 ctx.mark_queried(node.id);
817 let (payload, hash) = self.find_node_packet(ctx.target());
818 let to = node.udp_addr();
819 trace!(target: "discv4", ?to, ?hash, "sending FindNode packet");
820 let _ = self.egress.try_send((payload, to)).map_err(|err| {
821 debug!(target: "discv4", %err, "dropped outgoing packet");
822 });
823 self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx));
824 }
825
826 fn find_node_checked(&mut self, node: &NodeRecord, ctx: LookupContext) {
831 let max_failures = self.config.max_find_node_failures;
832 let needs_ping = self
833 .on_entry(node.id, |entry| entry.exceeds_find_node_failures(max_failures))
834 .unwrap_or(true);
835 if needs_ping {
836 self.try_ping(*node, PingReason::Lookup(*node, ctx))
837 } else {
838 self.find_node(node, ctx)
839 }
840 }
841
842 fn notify(&mut self, update: DiscoveryUpdate) {
846 self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) {
847 Ok(()) => true,
848 Err(err) => match err {
849 TrySendError::Full(_) => true,
850 TrySendError::Closed(_) => false,
851 },
852 });
853 }
854
855 pub fn ban_ip(&mut self, ip: IpAddr) {
857 self.config.ban_list.ban_ip(ip);
858 }
859
860 pub fn ban_node(&mut self, node_id: PeerId) {
862 self.remove_node(node_id);
863 self.config.ban_list.ban_peer(node_id);
864 }
865
866 pub fn ban_ip_until(&mut self, ip: IpAddr, until: Instant) {
868 self.config.ban_list.ban_ip_until(ip, until);
869 }
870
871 pub fn ban_node_until(&mut self, node_id: PeerId, until: Instant) {
873 self.remove_node(node_id);
874 self.config.ban_list.ban_peer_until(node_id, until);
875 }
876
877 pub fn remove_node(&mut self, node_id: PeerId) -> bool {
882 let key = kad_key(node_id);
883 self.remove_key(node_id, key)
884 }
885
886 pub fn soft_remove_node(&mut self, node_id: PeerId) -> bool {
891 let key = kad_key(node_id);
892 let Some(bucket) = self.kbuckets.get_bucket(&key) else { return false };
893 if bucket.num_entries() < MAX_NODES_PER_BUCKET / 2 {
894 return false
896 }
897 self.remove_key(node_id, key)
898 }
899
900 fn remove_key(&mut self, node_id: PeerId, key: discv5::Key<NodeKey>) -> bool {
901 let removed = self.kbuckets.remove(&key);
902 if removed {
903 trace!(target: "discv4", ?node_id, "removed node");
904 self.notify(DiscoveryUpdate::Removed(node_id));
905 }
906 removed
907 }
908
909 pub fn num_connected(&self) -> usize {
911 self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected())
912 }
913
914 fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool {
916 if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) &&
917 timestamp.elapsed() < self.config.bond_expiration
918 {
919 return true
920 }
921 false
922 }
923
924 fn on_entry<F, R>(&mut self, peer_id: PeerId, f: F) -> Option<R>
926 where
927 F: FnOnce(&NodeEntry) -> R,
928 {
929 let key = kad_key(peer_id);
930 match self.kbuckets.entry(&key) {
931 BucketEntry::Present(entry, _) => Some(f(entry.value())),
932 BucketEntry::Pending(mut entry, _) => Some(f(entry.value())),
933 _ => None,
934 }
935 }
936
937 fn update_on_reping(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
944 if record.id == self.local_node_record.id {
945 return
946 }
947
948 if !self.config.enable_eip868 {
950 last_enr_seq = None;
951 }
952
953 let key = kad_key(record.id);
954 let old_enr = match self.kbuckets.entry(&key) {
955 kbucket::Entry::Present(mut entry, _) => {
956 entry.value_mut().update_with_enr(last_enr_seq)
957 }
958 kbucket::Entry::Pending(mut entry, _) => entry.value().update_with_enr(last_enr_seq),
959 _ => return,
960 };
961
962 match (last_enr_seq, old_enr) {
964 (Some(new), Some(old)) if new > old => {
965 self.send_enr_request(record);
966 }
967 (Some(_), None) => {
968 self.send_enr_request(record);
970 }
971 _ => {}
972 };
973 }
974
975 fn update_on_pong(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
977 if record.id == *self.local_peer_id() {
978 return
979 }
980
981 if !self.config.enable_eip868 {
983 last_enr_seq = None;
984 }
985
986 let has_enr_seq = last_enr_seq.is_some();
989
990 let key = kad_key(record.id);
991 match self.kbuckets.entry(&key) {
992 kbucket::Entry::Present(mut entry, old_status) => {
993 entry.value_mut().establish_proof();
995 entry.value_mut().update_with_enr(last_enr_seq);
996
997 if !old_status.is_connected() {
998 let _ = entry.update(ConnectionState::Connected, Some(old_status.direction));
999 trace!(target: "discv4", ?record, "added after successful endpoint proof");
1000 self.notify(DiscoveryUpdate::Added(record));
1001
1002 if has_enr_seq {
1003 self.send_enr_request(record);
1005 }
1006 }
1007 }
1008 kbucket::Entry::Pending(mut entry, mut status) => {
1009 entry.value().establish_proof();
1011 entry.value().update_with_enr(last_enr_seq);
1012
1013 if !status.is_connected() {
1014 status.state = ConnectionState::Connected;
1015 let _ = entry.update(status);
1016 trace!(target: "discv4", ?record, "added after successful endpoint proof");
1017 self.notify(DiscoveryUpdate::Added(record));
1018
1019 if has_enr_seq {
1020 self.send_enr_request(record);
1022 }
1023 }
1024 }
1025 _ => {}
1026 };
1027 }
1028
1029 pub fn add_all_nodes(&mut self, records: impl IntoIterator<Item = NodeRecord>) {
1033 for record in records {
1034 self.add_node(record);
1035 }
1036 }
1037
1038 pub fn add_node(&mut self, record: NodeRecord) -> bool {
1044 let key = kad_key(record.id);
1045 match self.kbuckets.entry(&key) {
1046 kbucket::Entry::Absent(entry) => {
1047 let node = NodeEntry::new(record);
1048 match entry.insert(
1049 node,
1050 NodeStatus {
1051 direction: ConnectionDirection::Outgoing,
1052 state: ConnectionState::Disconnected,
1053 },
1054 ) {
1055 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1056 trace!(target: "discv4", ?record, "inserted new record");
1057 }
1058 _ => return false,
1059 }
1060 }
1061 _ => return false,
1062 }
1063
1064 self.try_ping(record, PingReason::InitialInsert);
1066 true
1067 }
1068
1069 pub(crate) fn send_packet(&self, msg: Message, to: SocketAddr) -> B256 {
1071 let (payload, hash) = msg.encode(&self.secret_key);
1072 trace!(target: "discv4", r#type=?msg.msg_type(), ?to, ?hash, "sending packet");
1073 let _ = self.egress.try_send((payload, to)).map_err(|err| {
1074 debug!(
1075 target: "discv4",
1076 %err,
1077 "dropped outgoing packet",
1078 );
1079 });
1080 hash
1081 }
1082
1083 fn find_node_packet(&mut self, target: PeerId) -> (Bytes, B256) {
1085 let expire = self.find_node_expiration();
1086 let cache_ttl = self.config.request_timeout / 4;
1087 CachedFindNode::get_or_sign(
1088 &mut self.cached_find_node,
1089 target,
1090 cache_ttl,
1091 &self.secret_key,
1092 expire,
1093 )
1094 }
1095
1096 fn on_ping(&mut self, ping: Ping, remote_addr: SocketAddr, remote_id: PeerId, hash: B256) {
1098 if self.is_expired(ping.expire) {
1099 return
1101 }
1102
1103 let record = NodeRecord {
1105 address: remote_addr.ip(),
1106 udp_port: remote_addr.port(),
1107 tcp_port: ping.from.tcp_port,
1108 id: remote_id,
1109 }
1110 .into_ipv4_mapped();
1111
1112 let key = kad_key(record.id);
1113
1114 let mut is_new_insert = false;
1121 let mut needs_bond = false;
1122 let mut is_proven = false;
1123
1124 let old_enr = match self.kbuckets.entry(&key) {
1125 kbucket::Entry::Present(mut entry, _) => {
1126 if entry.value().is_expired() {
1127 needs_bond = true;
1130 } else {
1131 is_proven = entry.value().has_endpoint_proof;
1132 }
1133 entry.value_mut().update_with_enr(ping.enr_sq)
1134 }
1135 kbucket::Entry::Pending(mut entry, _) => {
1136 if entry.value().is_expired() {
1137 needs_bond = true;
1140 } else {
1141 is_proven = entry.value().has_endpoint_proof;
1142 }
1143 entry.value().update_with_enr(ping.enr_sq)
1144 }
1145 kbucket::Entry::Absent(entry) => {
1146 let mut node = NodeEntry::new(record);
1147 node.last_enr_seq = ping.enr_sq;
1148
1149 match entry.insert(
1150 node,
1151 NodeStatus {
1152 direction: ConnectionDirection::Incoming,
1153 state: ConnectionState::Disconnected,
1155 },
1156 ) {
1157 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1158 is_new_insert = true;
1160 }
1161 BucketInsertResult::Full => {
1162 trace!(target: "discv4", ?record, "discovered new record but bucket is full");
1166 self.notify(DiscoveryUpdate::DiscoveredAtCapacity(record));
1167 needs_bond = true;
1168 }
1169 BucketInsertResult::TooManyIncoming | BucketInsertResult::NodeExists => {
1170 needs_bond = true;
1171 }
1173 BucketInsertResult::FailedFilter => return,
1174 }
1175
1176 None
1177 }
1178 kbucket::Entry::SelfEntry => return,
1179 };
1180
1181 let pong = Message::Pong(Pong {
1184 to: record.into(),
1186 echo: hash,
1187 expire: ping.expire,
1188 enr_sq: self.enr_seq(),
1189 });
1190 self.send_packet(pong, remote_addr);
1191
1192 if is_new_insert {
1194 self.try_ping(record, PingReason::InitialInsert);
1195 } else if needs_bond {
1196 self.try_ping(record, PingReason::EstablishBond);
1197 } else if is_proven {
1198 if let Some((_, ctx)) = self.pending_lookup.remove(&record.id) {
1202 if self.pending_find_nodes.contains_key(&record.id) {
1203 ctx.unmark_queried(record.id);
1206 } else {
1207 self.find_node(&record, ctx);
1210 }
1211 }
1212 } else {
1213 match (ping.enr_sq, old_enr) {
1215 (Some(new), Some(old)) if new > old => {
1216 self.send_enr_request(record);
1217 }
1218 (Some(_), None) => {
1219 self.send_enr_request(record);
1220 }
1221 _ => {}
1222 };
1223 }
1224 }
1225
1226 fn try_ping(&mut self, node: NodeRecord, reason: PingReason) {
1228 if node.id == *self.local_peer_id() {
1229 return
1231 }
1232
1233 if self.pending_pings.contains_key(&node.id) ||
1234 self.pending_find_nodes.contains_key(&node.id)
1235 {
1236 return
1237 }
1238
1239 if self.queued_pings.iter().any(|(n, _)| n.id == node.id) {
1240 return
1241 }
1242
1243 if self.pending_pings.len() < MAX_NODES_PING {
1244 self.send_ping(node, reason);
1245 } else if self.queued_pings.len() < MAX_QUEUED_PINGS {
1246 self.queued_pings.push_back((node, reason));
1247 }
1248 }
1249
1250 pub(crate) fn send_ping(&mut self, node: NodeRecord, reason: PingReason) -> B256 {
1254 let remote_addr = node.udp_addr();
1255 let id = node.id;
1256 let ping = Ping {
1257 from: self.local_node_record.into(),
1258 to: node.into(),
1259 expire: self.ping_expiration(),
1260 enr_sq: self.enr_seq(),
1261 };
1262 trace!(target: "discv4", ?ping, "sending ping");
1263 let echo_hash = self.send_packet(Message::Ping(ping), remote_addr);
1264
1265 self.pending_pings
1266 .insert(id, PingRequest { sent_at: Instant::now(), node, echo_hash, reason });
1267 echo_hash
1268 }
1269
1270 pub(crate) fn send_enr_request(&mut self, node: NodeRecord) {
1274 if !self.config.enable_eip868 {
1275 return
1276 }
1277 let remote_addr = node.udp_addr();
1278 let enr_request = EnrRequest { expire: self.enr_request_expiration() };
1279
1280 trace!(target: "discv4", ?enr_request, "sending enr request");
1281 let echo_hash = self.send_packet(Message::EnrRequest(enr_request), remote_addr);
1282
1283 self.pending_enr_requests
1284 .insert(node.id, EnrRequestState { sent_at: Instant::now(), echo_hash });
1285 }
1286
1287 fn on_pong(&mut self, pong: Pong, remote_addr: SocketAddr, remote_id: PeerId) {
1289 if self.is_expired(pong.expire) {
1290 return
1291 }
1292
1293 let PingRequest { node, reason, .. } = match self.pending_pings.entry(remote_id) {
1294 Entry::Occupied(entry) => {
1295 {
1296 let request = entry.get();
1297 if request.echo_hash != pong.echo {
1298 trace!(target: "discv4", from=?remote_addr, expected=?request.echo_hash, echo_hash=?pong.echo,"Got unexpected Pong");
1299 return
1300 }
1301 }
1302 entry.remove()
1303 }
1304 Entry::Vacant(_) => return,
1305 };
1306
1307 self.received_pongs.on_pong(remote_id, remote_addr.ip());
1309
1310 match reason {
1311 PingReason::InitialInsert => {
1312 self.update_on_pong(node, pong.enr_sq);
1313 if self.pending_lookup_reset && self.config.bootstrap_nodes.contains(&node) {
1316 self.pending_lookup_reset = false;
1317 self.lookup_interval.reset();
1318 }
1319 }
1320 PingReason::EstablishBond => {
1321 self.update_on_pong(node, pong.enr_sq);
1323 }
1324 PingReason::RePing => {
1325 self.update_on_reping(node, pong.enr_sq);
1326 }
1327 PingReason::Lookup(node, ctx) => {
1328 self.update_on_pong(node, pong.enr_sq);
1329 self.pending_lookup.insert(node.id, (Instant::now(), ctx));
1334 }
1335 }
1336 }
1337
1338 fn on_find_node(&mut self, msg: FindNode, remote_addr: SocketAddr, node_id: PeerId) {
1340 if self.is_expired(msg.expire) {
1341 return
1343 }
1344 if node_id == *self.local_peer_id() {
1345 return
1347 }
1348
1349 if self.has_bond(node_id, remote_addr.ip()) {
1350 self.respond_closest(msg.id, remote_addr)
1351 }
1352 }
1353
1354 fn on_enr_response(&mut self, msg: EnrResponse, remote_addr: SocketAddr, id: PeerId) {
1356 trace!(target: "discv4", ?remote_addr, ?msg, "received ENR response");
1357 if let Some(resp) = self.pending_enr_requests.remove(&id) {
1358 let enr_id = pk2id(&msg.enr.public_key());
1360 if id != enr_id {
1361 return
1362 }
1363
1364 if resp.echo_hash == msg.request_hash {
1365 let key = kad_key(id);
1366 let fork_id = msg.eth_fork_id();
1367 let (record, old_fork_id) = match self.kbuckets.entry(&key) {
1368 kbucket::Entry::Present(mut entry, _) => {
1369 let id = entry.value_mut().update_with_fork_id(fork_id);
1370 (entry.value().record, id)
1371 }
1372 kbucket::Entry::Pending(mut entry, _) => {
1373 let id = entry.value().update_with_fork_id(fork_id);
1374 (entry.value().record, id)
1375 }
1376 _ => return,
1377 };
1378 match (fork_id, old_fork_id) {
1379 (Some(new), Some(old)) if new != old => {
1380 self.notify(DiscoveryUpdate::EnrForkId(record, new))
1381 }
1382 (Some(new), None) => self.notify(DiscoveryUpdate::EnrForkId(record, new)),
1383 _ => {}
1384 }
1385 }
1386 }
1387 }
1388
1389 fn on_enr_request(
1391 &self,
1392 msg: EnrRequest,
1393 remote_addr: SocketAddr,
1394 id: PeerId,
1395 request_hash: B256,
1396 ) {
1397 if !self.config.enable_eip868 || self.is_expired(msg.expire) {
1398 return
1399 }
1400
1401 if self.has_bond(id, remote_addr.ip()) {
1402 self.send_packet(
1403 Message::EnrResponse(EnrResponse {
1404 request_hash,
1405 enr: self.local_eip_868_enr.clone(),
1406 }),
1407 remote_addr,
1408 );
1409 }
1410 }
1411
1412 fn on_neighbours(&mut self, msg: Neighbours, remote_addr: SocketAddr, node_id: PeerId) {
1415 if self.is_expired(msg.expire) {
1416 return
1418 }
1419 let ctx = match self.pending_find_nodes.entry(node_id) {
1421 Entry::Occupied(mut entry) => {
1422 {
1423 let request = entry.get_mut();
1424 request.answered = true;
1426 let total = request.response_count + msg.nodes.len();
1427
1428 if total <= MAX_NODES_PER_BUCKET {
1430 request.response_count = total;
1431 } else {
1432 trace!(target: "discv4", total, from=?remote_addr, "Received neighbors packet entries exceeds max nodes per bucket");
1433 return
1434 }
1435 };
1436
1437 if entry.get().response_count == MAX_NODES_PER_BUCKET {
1438 let ctx = entry.remove().lookup_context;
1440 ctx.mark_responded(node_id);
1441 ctx
1442 } else {
1443 entry.get().lookup_context.clone()
1444 }
1445 }
1446 Entry::Vacant(_) => {
1447 trace!(target: "discv4", from=?remote_addr, "Received unsolicited Neighbours");
1449 return
1450 }
1451 };
1452
1453 trace!(target: "discv4",
1455 target=format!("{:#?}", node_id),
1456 peers_count=msg.nodes.len(),
1457 peers=format!("[{:#}]", msg.nodes.iter()
1458 .map(|node_rec| node_rec.id
1459 ).format(", ")),
1460 "Received peers from Neighbours packet"
1461 );
1462
1463 for node in msg.nodes.into_iter().map(NodeRecord::into_ipv4_mapped) {
1466 if self.config.ban_list.is_banned(&node.id, &node.address) {
1468 trace!(target: "discv4", peer_id=?node.id, ip=?node.address, "ignoring banned record");
1469 continue
1470 }
1471
1472 ctx.add_node(node);
1473 }
1474
1475 let closest =
1477 ctx.filter_closest(ALPHA, |node| !self.pending_find_nodes.contains_key(&node.id));
1478
1479 for closest in closest {
1480 let key = kad_key(closest.id);
1481 match self.kbuckets.entry(&key) {
1482 BucketEntry::Absent(entry) => {
1483 ctx.mark_queried(closest.id);
1489 let node = NodeEntry::new(closest);
1490 match entry.insert(
1491 node,
1492 NodeStatus {
1493 direction: ConnectionDirection::Outgoing,
1494 state: ConnectionState::Disconnected,
1495 },
1496 ) {
1497 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1498 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1500 }
1501 BucketInsertResult::Full => {
1502 self.notify(DiscoveryUpdate::DiscoveredAtCapacity(closest))
1504 }
1505 _ => {}
1506 }
1507 }
1508 BucketEntry::SelfEntry => {
1509 }
1511 BucketEntry::Present(entry, _) => {
1512 if entry.value().has_endpoint_proof {
1513 if entry
1514 .value()
1515 .exceeds_find_node_failures(self.config.max_find_node_failures)
1516 {
1517 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1518 } else {
1519 self.find_node(&closest, ctx.clone());
1520 }
1521 }
1522 }
1523 BucketEntry::Pending(mut entry, _) => {
1524 if entry.value().has_endpoint_proof {
1525 if entry
1526 .value()
1527 .exceeds_find_node_failures(self.config.max_find_node_failures)
1528 {
1529 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1530 } else {
1531 self.find_node(&closest, ctx.clone());
1532 }
1533 }
1534 }
1535 }
1536 }
1537 }
1538
1539 fn respond_closest(&mut self, target: PeerId, to: SocketAddr) {
1541 let key = kad_key(target);
1542 let expire = self.send_neighbours_expiration();
1543
1544 let closest_nodes =
1546 self.kbuckets.closest_values(&key).take(MAX_NODES_PER_BUCKET).collect::<Vec<_>>();
1547
1548 for nodes in closest_nodes.chunks(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) {
1549 let nodes = nodes.iter().map(|node| node.value.record).collect::<Vec<NodeRecord>>();
1550 trace!(target: "discv4", len = nodes.len(), to=?to,"Sent neighbours packet");
1551 let msg = Message::Neighbours(Neighbours { nodes, expire });
1552 self.send_packet(msg, to);
1553 }
1554 }
1555
1556 fn evict_expired_requests(&mut self, now: Instant) {
1557 self.pending_enr_requests.retain(|_node_id, enr_request| {
1558 now.duration_since(enr_request.sent_at) < self.config.enr_expiration
1559 });
1560
1561 let mut failed_pings = Vec::new();
1562 self.pending_pings.retain(|node_id, ping_request| {
1563 if now.duration_since(ping_request.sent_at) > self.config.ping_expiration {
1564 failed_pings.push(*node_id);
1565 return false
1566 }
1567 true
1568 });
1569
1570 if !failed_pings.is_empty() {
1571 trace!(target: "discv4", num=%failed_pings.len(), "evicting nodes due to failed pong");
1573 for node_id in failed_pings {
1574 self.remove_node(node_id);
1575 }
1576 }
1577
1578 let mut failed_lookups = Vec::new();
1579 self.pending_lookup.retain(|node_id, (lookup_sent_at, _)| {
1580 if now.duration_since(*lookup_sent_at) > self.config.request_timeout {
1581 failed_lookups.push(*node_id);
1582 return false
1583 }
1584 true
1585 });
1586
1587 if !failed_lookups.is_empty() {
1588 trace!(target: "discv4", num=%failed_lookups.len(), "evicting nodes due to failed lookup");
1590 for node_id in failed_lookups {
1591 self.remove_node(node_id);
1592 }
1593 }
1594
1595 self.evict_failed_find_nodes(now);
1596 }
1597
1598 fn evict_failed_find_nodes(&mut self, now: Instant) {
1600 let mut failed_find_nodes = Vec::new();
1601 self.pending_find_nodes.retain(|node_id, find_node_request| {
1602 if now.duration_since(find_node_request.sent_at) > self.config.neighbours_expiration {
1603 if !find_node_request.answered {
1604 failed_find_nodes.push(*node_id);
1607 }
1608 return false
1609 }
1610 true
1611 });
1612
1613 if failed_find_nodes.is_empty() {
1614 return
1615 }
1616
1617 trace!(target: "discv4", num=%failed_find_nodes.len(), "processing failed find nodes");
1618
1619 for node_id in failed_find_nodes {
1620 let key = kad_key(node_id);
1621 let failures = match self.kbuckets.entry(&key) {
1622 kbucket::Entry::Present(mut entry, _) => {
1623 entry.value_mut().inc_failed_request();
1624 entry.value().find_node_failures
1625 }
1626 kbucket::Entry::Pending(mut entry, _) => {
1627 entry.value().inc_failed_request();
1628 entry.value().find_node_failures
1629 }
1630 _ => continue,
1631 };
1632
1633 if failures > self.config.max_find_node_failures {
1637 self.soft_remove_node(node_id);
1638 }
1639 }
1640 }
1641
1642 fn re_ping_oldest(&mut self) {
1647 let mut nodes = self
1648 .kbuckets
1649 .iter_ref()
1650 .filter(|entry| entry.node.value.is_expired())
1651 .map(|n| n.node.value)
1652 .collect::<Vec<_>>();
1653 nodes.sort_by_key(|a| a.last_seen);
1654 let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::<Vec<_>>();
1655 for node in to_ping {
1656 self.try_ping(node, PingReason::RePing)
1657 }
1658 }
1659
1660 fn is_expired(&self, expiration: u64) -> bool {
1662 self.ensure_not_expired(expiration).is_err()
1663 }
1664
1665 fn ensure_not_expired(&self, timestamp: u64) -> Result<(), ()> {
1675 let _ = i64::try_from(timestamp).map_err(drop)?;
1677
1678 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
1679 if self.config.enforce_expiration_timestamps && timestamp < now {
1680 trace!(target: "discv4", "Expired packet");
1681 return Err(())
1682 }
1683 Ok(())
1684 }
1685
1686 fn ping_buffered(&mut self) {
1688 while self.pending_pings.len() < MAX_NODES_PING {
1689 match self.queued_pings.pop_front() {
1690 Some((next, reason)) => self.try_ping(next, reason),
1691 None => break,
1692 }
1693 }
1694 }
1695
1696 fn ping_expiration(&self) -> u64 {
1697 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.ping_expiration)
1698 .as_secs()
1699 }
1700
1701 fn find_node_expiration(&self) -> u64 {
1702 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.request_timeout)
1703 .as_secs()
1704 }
1705
1706 fn enr_request_expiration(&self) -> u64 {
1707 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.enr_expiration)
1708 .as_secs()
1709 }
1710
1711 fn send_neighbours_expiration(&self) -> u64 {
1712 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.neighbours_expiration)
1713 .as_secs()
1714 }
1715
1716 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Discv4Event> {
1722 loop {
1723 if let Some(event) = self.queued_events.pop_front() {
1725 return Poll::Ready(event)
1726 }
1727
1728 if self.config.enable_lookup {
1730 while self.lookup_interval.poll_tick(cx).is_ready() {
1731 let target = self.lookup_rotator.next(&self.local_node_record.id);
1732 self.lookup_with(target, None);
1733 }
1734 }
1735
1736 while self.ping_interval.poll_tick(cx).is_ready() {
1738 self.re_ping_oldest();
1739 }
1740
1741 if let Some(Poll::Ready(Some(ip))) =
1742 self.resolve_external_ip_interval.as_mut().map(|r| r.poll_tick(cx))
1743 {
1744 self.set_external_ip_addr(ip);
1745 }
1746
1747 while let Poll::Ready(Some(cmd)) = self.commands_rx.poll_recv(cx) {
1749 match cmd {
1750 Discv4Command::Add(enr) => {
1751 self.add_node(enr);
1752 }
1753 Discv4Command::Lookup { node_id, tx } => {
1754 let node_id = node_id.unwrap_or(self.local_node_record.id);
1755 self.lookup_with(node_id, tx);
1756 }
1757 Discv4Command::SetLookupInterval(duration) => {
1758 self.set_lookup_interval(duration);
1759 }
1760 Discv4Command::Updates(tx) => {
1761 let rx = self.update_stream();
1762 let _ = tx.send(rx);
1763 }
1764 Discv4Command::BanPeer(node_id) => self.ban_node(node_id),
1765 Discv4Command::Remove(node_id) => {
1766 self.remove_node(node_id);
1767 }
1768 Discv4Command::Ban(node_id, ip) => {
1769 self.ban_node(node_id);
1770 self.ban_ip(ip);
1771 }
1772 Discv4Command::BanIp(ip) => {
1773 self.ban_ip(ip);
1774 }
1775 Discv4Command::SetEIP868RLPPair { key, rlp } => {
1776 debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair");
1777
1778 let _ = self.local_eip_868_enr.insert_raw_rlp(key, rlp, &self.secret_key);
1779 }
1780 Discv4Command::SetTcpPort(port) => {
1781 debug!(target: "discv4", %port, "Update tcp port");
1782 self.local_node_record.tcp_port = port;
1783 if self.local_node_record.address.is_ipv4() {
1784 let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key);
1785 } else {
1786 let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key);
1787 }
1788 }
1789
1790 Discv4Command::Terminated => {
1791 self.queued_events.push_back(Discv4Event::Terminated);
1793 }
1794 }
1795 }
1796
1797 let mut udp_message_budget = UDP_MESSAGE_POLL_LOOP_BUDGET;
1799
1800 while let Poll::Ready(Some(event)) = self.ingress.poll_recv(cx) {
1802 match event {
1803 IngressEvent::RecvError(err) => {
1804 debug!(target: "discv4", %err, "failed to read datagram");
1805 }
1806 IngressEvent::BadPacket(from, err, data) => {
1807 trace!(target: "discv4", ?from, %err, packet=?hex::encode(&data), "bad packet");
1808 }
1809 IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => {
1810 trace!(target: "discv4", r#type=?msg.msg_type(), from=?remote_addr,"received packet");
1811 let event = match msg {
1812 Message::Ping(ping) => {
1813 self.on_ping(ping, remote_addr, node_id, hash);
1814 Discv4Event::Ping
1815 }
1816 Message::Pong(pong) => {
1817 self.on_pong(pong, remote_addr, node_id);
1818 Discv4Event::Pong
1819 }
1820 Message::FindNode(msg) => {
1821 self.on_find_node(msg, remote_addr, node_id);
1822 Discv4Event::FindNode
1823 }
1824 Message::Neighbours(msg) => {
1825 self.on_neighbours(msg, remote_addr, node_id);
1826 Discv4Event::Neighbours
1827 }
1828 Message::EnrRequest(msg) => {
1829 self.on_enr_request(msg, remote_addr, node_id, hash);
1830 Discv4Event::EnrRequest
1831 }
1832 Message::EnrResponse(msg) => {
1833 self.on_enr_response(msg, remote_addr, node_id);
1834 Discv4Event::EnrResponse
1835 }
1836 };
1837
1838 self.queued_events.push_back(event);
1839 }
1840 }
1841
1842 udp_message_budget -= 1;
1843 if udp_message_budget < 0 {
1844 trace!(target: "discv4", budget=UDP_MESSAGE_POLL_LOOP_BUDGET, "exhausted message poll budget");
1845 if self.queued_events.is_empty() {
1846 cx.waker().wake_by_ref();
1849 }
1850 break
1851 }
1852 }
1853
1854 self.ping_buffered();
1856
1857 while self.evict_expired_requests_interval.poll_tick(cx).is_ready() {
1859 self.evict_expired_requests(Instant::now());
1860 }
1861
1862 while self.expire_interval.poll_tick(cx).is_ready() {
1864 self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION);
1865 }
1866
1867 if self.queued_events.is_empty() {
1868 return Poll::Pending
1869 }
1870 }
1871 }
1872}
1873
1874impl Stream for Discv4Service {
1876 type Item = Discv4Event;
1877
1878 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1879 match ready!(self.get_mut().poll(cx)) {
1881 Discv4Event::Terminated => Poll::Ready(None),
1883 ev => Poll::Ready(Some(ev)),
1885 }
1886 }
1887}
1888
1889impl fmt::Debug for Discv4Service {
1890 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1891 f.debug_struct("Discv4Service")
1892 .field("local_address", &self.local_address)
1893 .field("local_peer_id", &self.local_peer_id())
1894 .field("local_node_record", &self.local_node_record)
1895 .field("queued_pings", &self.queued_pings)
1896 .field("pending_lookup", &self.pending_lookup)
1897 .field("pending_find_nodes", &self.pending_find_nodes)
1898 .field("lookup_interval", &self.lookup_interval)
1899 .finish_non_exhaustive()
1900 }
1901}
1902
1903#[derive(Debug, Eq, PartialEq)]
1907pub enum Discv4Event {
1908 Ping,
1910 Pong,
1912 FindNode,
1914 Neighbours,
1916 EnrRequest,
1918 EnrResponse,
1920 Terminated,
1922}
1923
1924pub(crate) async fn send_loop(udp: Arc<UdpSocket>, rx: EgressReceiver) {
1926 let mut stream = ReceiverStream::new(rx);
1927 while let Some((payload, to)) = stream.next().await {
1928 match udp.send_to(&payload, to).await {
1929 Ok(size) => {
1930 trace!(target: "discv4", ?to, ?size,"sent payload");
1931 }
1932 Err(err) => {
1933 debug!(target: "discv4", ?to, %err,"Failed to send datagram.");
1934 }
1935 }
1936 }
1937}
1938
1939const MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP: usize = 60usize;
1941
1942pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_id: PeerId) {
1947 let send = |event: IngressEvent| async {
1948 let _ = tx.send(event).await.map_err(|err| {
1949 debug!(
1950 target: "discv4",
1951 %err,
1952 "failed send incoming packet",
1953 )
1954 });
1955 };
1956
1957 let mut cache = ReceiveCache::default();
1958
1959 let tick = MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP / 2;
1961 let mut interval = tokio::time::interval(Duration::from_secs(tick as u64));
1962
1963 let mut buf = [0; MAX_PACKET_SIZE];
1964 loop {
1965 let res = udp.recv_from(&mut buf).await;
1966 match res {
1967 Err(err) => {
1968 debug!(target: "discv4", %err, "Failed to read datagram.");
1969 send(IngressEvent::RecvError(err)).await;
1970 }
1971 Ok((read, remote_addr)) => {
1972 if cache.inc_ip(remote_addr.ip()) > MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP {
1974 trace!(target: "discv4", ?remote_addr, "Too many incoming packets from IP.");
1975 continue
1976 }
1977
1978 let packet = &buf[..read];
1979 match Message::decode(packet) {
1980 Ok(packet) => {
1981 if packet.node_id == local_id {
1982 debug!(target: "discv4", ?remote_addr, "Received own packet.");
1984 continue
1985 }
1986
1987 if cache.contains_packet(packet.hash) {
1989 debug!(target: "discv4", ?remote_addr, "Received duplicate packet.");
1990 continue
1991 }
1992
1993 send(IngressEvent::Packet(remote_addr, packet)).await;
1994 }
1995 Err(err) => {
1996 trace!(target: "discv4", %err,"Failed to decode packet");
1997 send(IngressEvent::BadPacket(remote_addr, err, packet.to_vec())).await
1998 }
1999 }
2000 }
2001 }
2002
2003 if poll_fn(|cx| match interval.poll_tick(cx) {
2005 Poll::Ready(_) => Poll::Ready(true),
2006 Poll::Pending => Poll::Ready(false),
2007 })
2008 .await
2009 {
2010 cache.tick_ips(tick);
2011 }
2012 }
2013}
2014
2015struct ReceiveCache {
2019 ip_messages: HashMap<IpAddr, usize>,
2025 unique_packets: schnellru::LruMap<B256, ()>,
2027}
2028
2029impl ReceiveCache {
2030 fn tick_ips(&mut self, tick: usize) {
2034 self.ip_messages.retain(|_, count| {
2035 if let Some(reset) = count.checked_sub(tick) {
2036 *count = reset;
2037 true
2038 } else {
2039 false
2040 }
2041 });
2042 }
2043
2044 fn inc_ip(&mut self, ip: IpAddr) -> usize {
2046 let ctn = self.ip_messages.entry(ip).or_default();
2047 *ctn = ctn.saturating_add(1);
2048 *ctn
2049 }
2050
2051 fn contains_packet(&mut self, hash: B256) -> bool {
2053 !self.unique_packets.insert(hash, ())
2054 }
2055}
2056
2057impl Default for ReceiveCache {
2058 fn default() -> Self {
2059 Self {
2060 ip_messages: Default::default(),
2061 unique_packets: schnellru::LruMap::new(schnellru::ByLength::new(32)),
2062 }
2063 }
2064}
2065
2066enum Discv4Command {
2068 Add(NodeRecord),
2069 SetTcpPort(u16),
2070 SetEIP868RLPPair { key: Vec<u8>, rlp: Bytes },
2071 Ban(PeerId, IpAddr),
2072 BanPeer(PeerId),
2073 BanIp(IpAddr),
2074 Remove(PeerId),
2075 Lookup { node_id: Option<PeerId>, tx: Option<NodeRecordSender> },
2076 SetLookupInterval(Duration),
2077 Updates(OneshotSender<ReceiverStream<DiscoveryUpdate>>),
2078 Terminated,
2079}
2080
2081#[derive(Debug)]
2083pub(crate) enum IngressEvent {
2084 RecvError(io::Error),
2086 BadPacket(SocketAddr, DecodePacketError, Vec<u8>),
2088 Packet(SocketAddr, Packet),
2090}
2091
2092#[derive(Debug)]
2094struct PingRequest {
2095 sent_at: Instant,
2097 node: NodeRecord,
2099 echo_hash: B256,
2101 reason: PingReason,
2103}
2104
2105#[derive(Debug)]
2109struct LookupTargetRotator {
2110 interval: usize,
2111 counter: usize,
2112}
2113
2114impl LookupTargetRotator {
2117 const fn local_only() -> Self {
2119 Self { interval: 1, counter: 0 }
2120 }
2121}
2122
2123impl Default for LookupTargetRotator {
2124 fn default() -> Self {
2125 Self {
2126 interval: 4,
2128 counter: 3,
2129 }
2130 }
2131}
2132
2133impl LookupTargetRotator {
2134 fn next(&mut self, local: &PeerId) -> PeerId {
2136 self.counter += 1;
2137 self.counter %= self.interval;
2138 if self.counter == 0 {
2139 return *local
2140 }
2141 PeerId::random()
2142 }
2143}
2144
2145#[derive(Clone, Debug)]
2150struct LookupContext {
2151 inner: Rc<LookupContextInner>,
2152}
2153
2154impl LookupContext {
2155 fn new(
2157 target: discv5::Key<NodeKey>,
2158 nearest_nodes: impl IntoIterator<Item = (Distance, NodeRecord)>,
2159 listener: Option<NodeRecordSender>,
2160 ) -> Self {
2161 let closest_nodes = nearest_nodes
2162 .into_iter()
2163 .map(|(distance, record)| {
2164 (distance, QueryNode { record, queried: false, responded: false })
2165 })
2166 .collect();
2167
2168 let inner = Rc::new(LookupContextInner {
2169 target,
2170 closest_nodes: RefCell::new(closest_nodes),
2171 listener,
2172 });
2173 Self { inner }
2174 }
2175
2176 fn target(&self) -> PeerId {
2178 self.inner.target.preimage().0
2179 }
2180
2181 fn closest(&self, num: usize) -> Vec<NodeRecord> {
2182 self.inner
2183 .closest_nodes
2184 .borrow()
2185 .iter()
2186 .filter(|(_, node)| !node.queried)
2187 .map(|(_, n)| n.record)
2188 .take(num)
2189 .collect()
2190 }
2191
2192 fn filter_closest<P>(&self, num: usize, filter: P) -> Vec<NodeRecord>
2194 where
2195 P: FnMut(&NodeRecord) -> bool,
2196 {
2197 self.inner
2198 .closest_nodes
2199 .borrow()
2200 .iter()
2201 .filter(|(_, node)| !node.queried)
2202 .map(|(_, n)| n.record)
2203 .filter(filter)
2204 .take(num)
2205 .collect()
2206 }
2207
2208 fn add_node(&self, record: NodeRecord) {
2210 let distance = self.inner.target.distance(&kad_key(record.id));
2211 let mut closest = self.inner.closest_nodes.borrow_mut();
2212 if let btree_map::Entry::Vacant(entry) = closest.entry(distance) {
2213 entry.insert(QueryNode { record, queried: false, responded: false });
2214 }
2215 }
2216
2217 fn set_queried(&self, id: PeerId, val: bool) {
2218 if let Some((_, node)) =
2219 self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2220 {
2221 node.queried = val;
2222 }
2223 }
2224
2225 fn mark_queried(&self, id: PeerId) {
2227 self.set_queried(id, true)
2228 }
2229
2230 fn unmark_queried(&self, id: PeerId) {
2232 self.set_queried(id, false)
2233 }
2234
2235 fn mark_responded(&self, id: PeerId) {
2237 if let Some((_, node)) =
2238 self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2239 {
2240 node.responded = true;
2241 }
2242 }
2243}
2244
2245unsafe impl Send for LookupContext {}
2252#[derive(Debug)]
2253struct LookupContextInner {
2254 target: discv5::Key<NodeKey>,
2256 closest_nodes: RefCell<BTreeMap<Distance, QueryNode>>,
2258 listener: Option<NodeRecordSender>,
2263}
2264
2265impl Drop for LookupContextInner {
2266 fn drop(&mut self) {
2267 if let Some(tx) = self.listener.take() {
2268 let nodes = self
2271 .closest_nodes
2272 .take()
2273 .into_values()
2274 .filter(|node| node.responded)
2275 .map(|node| node.record)
2276 .collect();
2277 let _ = tx.send(nodes);
2278 }
2279 }
2280}
2281
2282#[derive(Debug, Clone, Copy)]
2284struct QueryNode {
2285 record: NodeRecord,
2286 queried: bool,
2287 responded: bool,
2288}
2289
2290#[derive(Debug)]
2291struct FindNodeRequest {
2292 sent_at: Instant,
2294 response_count: usize,
2296 answered: bool,
2298 lookup_context: LookupContext,
2300}
2301
2302impl FindNodeRequest {
2305 fn new(resp: LookupContext) -> Self {
2306 Self { sent_at: Instant::now(), response_count: 0, answered: false, lookup_context: resp }
2307 }
2308}
2309
2310#[derive(Debug)]
2312struct CachedFindNode {
2313 target: PeerId,
2314 payload: Bytes,
2315 hash: B256,
2316 cached_at: Instant,
2317}
2318
2319impl CachedFindNode {
2320 fn get_or_sign(
2323 cache: &mut Option<Self>,
2324 target: PeerId,
2325 ttl: Duration,
2326 secret_key: &secp256k1::SecretKey,
2327 expire: u64,
2328 ) -> (Bytes, B256) {
2329 if let Some(c) = cache.as_ref() &&
2330 c.target == target &&
2331 c.cached_at.elapsed() < ttl
2332 {
2333 return (c.payload.clone(), c.hash);
2334 }
2335
2336 let msg = Message::FindNode(FindNode { id: target, expire });
2337 let (payload, hash) = msg.encode(secret_key);
2338
2339 *cache = Some(Self { target, payload: payload.clone(), hash, cached_at: Instant::now() });
2340
2341 (payload, hash)
2342 }
2343}
2344
2345#[derive(Debug)]
2346struct EnrRequestState {
2347 sent_at: Instant,
2349 echo_hash: B256,
2351}
2352
2353#[derive(Debug, Clone, Eq, PartialEq)]
2355struct NodeEntry {
2356 record: NodeRecord,
2358 last_seen: Instant,
2360 last_enr_seq: Option<u64>,
2362 fork_id: Option<ForkId>,
2364 find_node_failures: u8,
2366 has_endpoint_proof: bool,
2368}
2369
2370impl NodeEntry {
2373 fn new(record: NodeRecord) -> Self {
2375 Self {
2376 record,
2377 last_seen: Instant::now(),
2378 last_enr_seq: None,
2379 fork_id: None,
2380 find_node_failures: 0,
2381 has_endpoint_proof: false,
2382 }
2383 }
2384
2385 #[cfg(test)]
2386 fn new_proven(record: NodeRecord) -> Self {
2387 let mut node = Self::new(record);
2388 node.has_endpoint_proof = true;
2389 node
2390 }
2391
2392 const fn establish_proof(&mut self) {
2394 self.has_endpoint_proof = true;
2395 self.find_node_failures = 0;
2396 }
2397
2398 const fn exceeds_find_node_failures(&self, max_failures: u8) -> bool {
2400 self.find_node_failures >= max_failures
2401 }
2402
2403 fn update_with_enr(&mut self, last_enr_seq: Option<u64>) -> Option<u64> {
2405 self.update_now(|s| std::mem::replace(&mut s.last_enr_seq, last_enr_seq))
2406 }
2407
2408 const fn inc_failed_request(&mut self) {
2410 self.find_node_failures += 1;
2411 }
2412
2413 fn update_with_fork_id(&mut self, fork_id: Option<ForkId>) -> Option<ForkId> {
2415 self.update_now(|s| std::mem::replace(&mut s.fork_id, fork_id))
2416 }
2417
2418 fn update_now<F, R>(&mut self, f: F) -> R
2420 where
2421 F: FnOnce(&mut Self) -> R,
2422 {
2423 self.last_seen = Instant::now();
2424 f(self)
2425 }
2426}
2427
2428impl NodeEntry {
2431 fn is_expired(&self) -> bool {
2433 self.last_seen.elapsed() > (ENDPOINT_PROOF_EXPIRATION / 2)
2434 }
2435}
2436
2437#[derive(Debug)]
2439enum PingReason {
2440 InitialInsert,
2442 EstablishBond,
2444 RePing,
2446 Lookup(NodeRecord, LookupContext),
2448}
2449
2450#[derive(Debug, Clone)]
2452pub enum DiscoveryUpdate {
2453 Added(NodeRecord),
2455 DiscoveredAtCapacity(NodeRecord),
2457 EnrForkId(NodeRecord, ForkId),
2459 Removed(PeerId),
2461 Batch(Vec<Self>),
2463}
2464
2465#[cfg(test)]
2466mod tests {
2467 use super::*;
2468 use crate::test_utils::{create_discv4, create_discv4_with_config, rng_endpoint, rng_record};
2469 use alloy_primitives::hex;
2470 use alloy_rlp::{Decodable, Encodable};
2471 use rand_08::Rng;
2472 use reth_ethereum_forks::{EnrForkIdEntry, ForkHash};
2473 use reth_network_peers::mainnet_nodes;
2474 use std::future::poll_fn;
2475
2476 #[tokio::test]
2477 async fn test_configured_enr_forkid_entry() {
2478 let fork: ForkId = ForkId { hash: ForkHash([220, 233, 108, 45]), next: 0u64 };
2479 let mut disc_conf = Discv4Config::default();
2480 disc_conf.add_eip868_pair("eth", EnrForkIdEntry::from(fork));
2481 let (_discv4, service) = create_discv4_with_config(disc_conf).await;
2482 let eth = service.local_eip_868_enr.get_raw_rlp(b"eth").unwrap();
2483 let fork_entry_id = EnrForkIdEntry::decode(&mut ð[..]).unwrap();
2484
2485 let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2486 let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2487 let expected = EnrForkIdEntry {
2488 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2489 };
2490 assert_eq!(expected, fork_entry_id);
2491 assert_eq!(expected, decoded);
2492 }
2493
2494 #[test]
2495 fn test_enr_forkid_entry_decode() {
2496 let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2497 let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2498 let expected = EnrForkIdEntry {
2499 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2500 };
2501 assert_eq!(expected, decoded);
2502 }
2503
2504 #[test]
2505 fn test_enr_forkid_entry_encode() {
2506 let original = EnrForkIdEntry {
2507 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2508 };
2509 let expected: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2510 let mut encoded = Vec::with_capacity(expected.len());
2511 original.encode(&mut encoded);
2512 assert_eq!(&expected[..], encoded.as_slice());
2513 }
2514
2515 #[test]
2516 fn test_local_rotator() {
2517 let id = PeerId::random();
2518 let mut rotator = LookupTargetRotator::local_only();
2519 assert_eq!(rotator.next(&id), id);
2520 assert_eq!(rotator.next(&id), id);
2521 }
2522
2523 #[test]
2524 fn test_rotator() {
2525 let id = PeerId::random();
2526 let mut rotator = LookupTargetRotator::default();
2527 assert_eq!(rotator.next(&id), id);
2528 assert_ne!(rotator.next(&id), id);
2529 assert_ne!(rotator.next(&id), id);
2530 assert_ne!(rotator.next(&id), id);
2531 assert_eq!(rotator.next(&id), id);
2532 }
2533
2534 #[tokio::test]
2535 async fn test_pending_ping() {
2536 let (_, mut service) = create_discv4().await;
2537
2538 let local_addr = service.local_addr();
2539
2540 let mut num_inserted = 0;
2541 loop {
2542 let node = NodeRecord::new(local_addr, PeerId::random());
2543 if service.add_node(node) {
2544 num_inserted += 1;
2545 assert!(service.pending_pings.contains_key(&node.id));
2546 assert_eq!(service.pending_pings.len(), num_inserted);
2547 if num_inserted == MAX_NODES_PING {
2548 break
2549 }
2550 }
2551 }
2552
2553 num_inserted = 0;
2555 for _ in 0..MAX_NODES_PING {
2556 let node = NodeRecord::new(local_addr, PeerId::random());
2557 if service.add_node(node) {
2558 num_inserted += 1;
2559 assert!(!service.pending_pings.contains_key(&node.id));
2560 assert_eq!(service.pending_pings.len(), MAX_NODES_PING);
2561 assert_eq!(service.queued_pings.len(), num_inserted);
2562 }
2563 }
2564 }
2565
2566 #[tokio::test(flavor = "multi_thread")]
2568 #[ignore]
2569 async fn test_mainnet_lookup() {
2570 reth_tracing::init_test_tracing();
2571 let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2572
2573 let all_nodes = mainnet_nodes();
2574 let config = Discv4Config::builder()
2575 .add_boot_nodes(all_nodes)
2576 .lookup_interval(Duration::from_secs(1))
2577 .add_eip868_pair("eth", fork_id)
2578 .build();
2579 let (_discv4, mut service) = create_discv4_with_config(config).await;
2580
2581 let mut updates = service.update_stream();
2582
2583 let _handle = service.spawn();
2584
2585 let mut table = HashMap::new();
2586 while let Some(update) = updates.next().await {
2587 match update {
2588 DiscoveryUpdate::EnrForkId(record, fork_id) => {
2589 println!("{record:?}, {fork_id:?}");
2590 }
2591 DiscoveryUpdate::Added(record) => {
2592 table.insert(record.id, record);
2593 }
2594 DiscoveryUpdate::Removed(id) => {
2595 table.remove(&id);
2596 }
2597 _ => {}
2598 }
2599 println!("total peers {}", table.len());
2600 }
2601 }
2602
2603 #[tokio::test]
2604 async fn test_mapped_ipv4() {
2605 reth_tracing::init_test_tracing();
2606 let mut rng = rand_08::thread_rng();
2607 let config = Discv4Config::builder().build();
2608 let (_discv4, mut service) = create_discv4_with_config(config).await;
2609
2610 let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2611 let v6 = v4.to_ipv6_mapped();
2612 let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2613
2614 let ping = Ping {
2615 from: rng_endpoint(&mut rng),
2616 to: rng_endpoint(&mut rng),
2617 expire: service.ping_expiration(),
2618 enr_sq: Some(rng.r#gen()),
2619 };
2620
2621 let id = PeerId::random();
2622 service.on_ping(ping, addr, id, B256::random());
2623
2624 let key = kad_key(id);
2625 match service.kbuckets.entry(&key) {
2626 kbucket::Entry::Present(entry, _) => {
2627 let node_addr = entry.value().record.address;
2628 assert!(node_addr.is_ipv4());
2629 assert_eq!(node_addr, IpAddr::from(v4));
2630 }
2631 _ => unreachable!(),
2632 };
2633 }
2634
2635 #[tokio::test]
2636 async fn test_respect_ping_expiration() {
2637 reth_tracing::init_test_tracing();
2638 let mut rng = rand_08::thread_rng();
2639 let config = Discv4Config::builder().build();
2640 let (_discv4, mut service) = create_discv4_with_config(config).await;
2641
2642 let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2643 let v6 = v4.to_ipv6_mapped();
2644 let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2645
2646 let ping = Ping {
2647 from: rng_endpoint(&mut rng),
2648 to: rng_endpoint(&mut rng),
2649 expire: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() - 1,
2650 enr_sq: Some(rng.r#gen()),
2651 };
2652
2653 let id = PeerId::random();
2654 service.on_ping(ping, addr, id, B256::random());
2655
2656 let key = kad_key(id);
2657 match service.kbuckets.entry(&key) {
2658 kbucket::Entry::Absent(_) => {}
2659 _ => unreachable!(),
2660 };
2661 }
2662
2663 #[tokio::test]
2664 async fn test_single_lookups() {
2665 reth_tracing::init_test_tracing();
2666
2667 let config = Discv4Config::builder().build();
2668 let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2669
2670 let id = PeerId::random();
2671 let key = kad_key(id);
2672 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2673
2674 let _ = service.kbuckets.insert_or_update(
2675 &key,
2676 NodeEntry::new_proven(record),
2677 NodeStatus {
2678 direction: ConnectionDirection::Incoming,
2679 state: ConnectionState::Connected,
2680 },
2681 );
2682
2683 service.lookup_self();
2684 assert_eq!(service.pending_find_nodes.len(), 1);
2685
2686 poll_fn(|cx| {
2687 let _ = service.poll(cx);
2688 assert_eq!(service.pending_find_nodes.len(), 1);
2689
2690 Poll::Ready(())
2691 })
2692 .await;
2693 }
2694
2695 #[tokio::test]
2696 async fn test_on_neighbours_recursive_lookup() {
2697 reth_tracing::init_test_tracing();
2698
2699 let config = Discv4Config::builder().build();
2700 let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2701 let (_discv4, mut service2) = create_discv4_with_config(config).await;
2702
2703 let id = PeerId::random();
2704 let key = kad_key(id);
2705 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2706
2707 let _ = service.kbuckets.insert_or_update(
2708 &key,
2709 NodeEntry::new_proven(record),
2710 NodeStatus {
2711 direction: ConnectionDirection::Incoming,
2712 state: ConnectionState::Connected,
2713 },
2714 );
2715 service.lookup_self();
2718 assert_eq!(service.pending_find_nodes.len(), 1);
2719
2720 poll_fn(|cx| {
2721 let _ = service.poll(cx);
2722 assert_eq!(service.pending_find_nodes.len(), 1);
2723
2724 Poll::Ready(())
2725 })
2726 .await;
2727
2728 let expiry = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() +
2729 10000000000000;
2730 let msg = Neighbours { nodes: vec![service2.local_node_record], expire: expiry };
2731 service.on_neighbours(msg, record.tcp_addr(), id);
2732 let event = poll_fn(|cx| service2.poll(cx)).await;
2734 assert_eq!(event, Discv4Event::Ping);
2735 assert_eq!(service.pending_find_nodes.len(), 1);
2738 let event = poll_fn(|cx| service.poll(cx)).await;
2740 assert_eq!(event, Discv4Event::Pong);
2741 let event = poll_fn(|cx| service.poll(cx)).await;
2746 assert_eq!(event, Discv4Event::Ping);
2747 assert_eq!(service.pending_find_nodes.len(), 2);
2750 }
2751
2752 #[tokio::test]
2753 async fn test_no_local_in_closest() {
2754 reth_tracing::init_test_tracing();
2755
2756 let config = Discv4Config::builder().build();
2757 let (_discv4, mut service) = create_discv4_with_config(config).await;
2758
2759 let target_key = kad_key(PeerId::random());
2760
2761 let id = PeerId::random();
2762 let key = kad_key(id);
2763 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2764
2765 let _ = service.kbuckets.insert_or_update(
2766 &key,
2767 NodeEntry::new(record),
2768 NodeStatus {
2769 direction: ConnectionDirection::Incoming,
2770 state: ConnectionState::Connected,
2771 },
2772 );
2773
2774 let closest = service
2775 .kbuckets
2776 .closest_values(&target_key)
2777 .map(|n| n.value.record)
2778 .take(MAX_NODES_PER_BUCKET)
2779 .collect::<Vec<_>>();
2780
2781 assert_eq!(closest.len(), 1);
2782 assert!(!closest.iter().any(|r| r.id == *service.local_peer_id()));
2783 }
2784
2785 #[tokio::test]
2786 async fn test_random_lookup() {
2787 reth_tracing::init_test_tracing();
2788
2789 let config = Discv4Config::builder().build();
2790 let (_discv4, mut service) = create_discv4_with_config(config).await;
2791
2792 let target = PeerId::random();
2793
2794 let id = PeerId::random();
2795 let key = kad_key(id);
2796 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2797
2798 let _ = service.kbuckets.insert_or_update(
2799 &key,
2800 NodeEntry::new_proven(record),
2801 NodeStatus {
2802 direction: ConnectionDirection::Incoming,
2803 state: ConnectionState::Connected,
2804 },
2805 );
2806
2807 service.lookup(target);
2808 assert_eq!(service.pending_find_nodes.len(), 1);
2809
2810 let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2811
2812 assert_eq!(ctx.target(), target);
2813 assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2814
2815 ctx.add_node(record);
2816 assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2817 }
2818
2819 #[tokio::test]
2820 async fn test_reping_on_find_node_failures() {
2821 reth_tracing::init_test_tracing();
2822
2823 let config = Discv4Config::builder().build();
2824 let (_discv4, mut service) = create_discv4_with_config(config).await;
2825
2826 let target = PeerId::random();
2827
2828 let id = PeerId::random();
2829 let key = kad_key(id);
2830 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2831
2832 let mut entry = NodeEntry::new_proven(record);
2833 entry.find_node_failures = u8::MAX;
2834 let _ = service.kbuckets.insert_or_update(
2835 &key,
2836 entry,
2837 NodeStatus {
2838 direction: ConnectionDirection::Incoming,
2839 state: ConnectionState::Connected,
2840 },
2841 );
2842
2843 service.lookup(target);
2844 assert_eq!(service.pending_find_nodes.len(), 0);
2845 assert_eq!(service.pending_pings.len(), 1);
2846
2847 service.update_on_pong(record, None);
2848
2849 service
2850 .on_entry(record.id, |entry| {
2851 assert_eq!(entry.find_node_failures, 0);
2853 assert!(entry.has_endpoint_proof);
2854 })
2855 .unwrap();
2856 }
2857
2858 #[tokio::test]
2859 async fn test_service_commands() {
2860 reth_tracing::init_test_tracing();
2861
2862 let config = Discv4Config::builder().build();
2863 let (discv4, mut service) = create_discv4_with_config(config).await;
2864
2865 service.lookup_self();
2866
2867 let _handle = service.spawn();
2868 discv4.send_lookup_self();
2869 let _ = discv4.lookup_self().await;
2870 }
2871
2872 #[tokio::test]
2873 async fn test_requests_timeout() {
2874 reth_tracing::init_test_tracing();
2875 let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2876
2877 let config = Discv4Config::builder()
2878 .request_timeout(Duration::from_millis(200))
2879 .ping_expiration(Duration::from_millis(200))
2880 .lookup_neighbours_expiration(Duration::from_millis(200))
2881 .add_eip868_pair("eth", fork_id)
2882 .build();
2883 let (_disv4, mut service) = create_discv4_with_config(config).await;
2884
2885 let id = PeerId::random();
2886 let key = kad_key(id);
2887 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2888
2889 let _ = service.kbuckets.insert_or_update(
2890 &key,
2891 NodeEntry::new_proven(record),
2892 NodeStatus {
2893 direction: ConnectionDirection::Incoming,
2894 state: ConnectionState::Connected,
2895 },
2896 );
2897
2898 service.lookup_self();
2899 assert_eq!(service.pending_find_nodes.len(), 1);
2900
2901 let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2902
2903 service.pending_lookup.insert(record.id, (Instant::now(), ctx));
2904
2905 assert_eq!(service.pending_lookup.len(), 1);
2906
2907 let ping = Ping {
2908 from: service.local_node_record.into(),
2909 to: record.into(),
2910 expire: service.ping_expiration(),
2911 enr_sq: service.enr_seq(),
2912 };
2913 let echo_hash = service.send_packet(Message::Ping(ping), record.udp_addr());
2914 let ping_request = PingRequest {
2915 sent_at: Instant::now(),
2916 node: record,
2917 echo_hash,
2918 reason: PingReason::InitialInsert,
2919 };
2920 service.pending_pings.insert(record.id, ping_request);
2921
2922 assert_eq!(service.pending_pings.len(), 1);
2923
2924 tokio::time::sleep(Duration::from_secs(1)).await;
2925
2926 poll_fn(|cx| {
2927 let _ = service.poll(cx);
2928
2929 assert_eq!(service.pending_find_nodes.len(), 0);
2930 assert_eq!(service.pending_lookup.len(), 0);
2931 assert_eq!(service.pending_pings.len(), 0);
2932
2933 Poll::Ready(())
2934 })
2935 .await;
2936 }
2937
2938 #[tokio::test(flavor = "multi_thread")]
2940 async fn test_check_wrong_to() {
2941 reth_tracing::init_test_tracing();
2942
2943 let config = Discv4Config::builder().external_ip_resolver(None).build();
2944 let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2945 let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2946
2947 let mut ping = Ping {
2949 from: service_1.local_node_record.into(),
2950 to: service_2.local_node_record.into(),
2951 expire: service_1.ping_expiration(),
2952 enr_sq: service_1.enr_seq(),
2953 };
2954 ping.to.address = "192.0.2.0".parse().unwrap();
2955
2956 let echo_hash = service_1.send_packet(Message::Ping(ping), service_2.local_addr());
2957 let ping_request = PingRequest {
2958 sent_at: Instant::now(),
2959 node: service_2.local_node_record,
2960 echo_hash,
2961 reason: PingReason::InitialInsert,
2962 };
2963 service_1.pending_pings.insert(*service_2.local_peer_id(), ping_request);
2964
2965 let event = poll_fn(|cx| service_2.poll(cx)).await;
2967 assert_eq!(event, Discv4Event::Ping);
2968
2969 let event = poll_fn(|cx| service_1.poll(cx)).await;
2971 assert_eq!(event, Discv4Event::Pong);
2972 let event = poll_fn(|cx| service_1.poll(cx)).await;
2974 assert_eq!(event, Discv4Event::Ping);
2975 }
2976
2977 #[tokio::test(flavor = "multi_thread")]
2978 async fn test_check_ping_pong() {
2979 reth_tracing::init_test_tracing();
2980
2981 let config = Discv4Config::builder().external_ip_resolver(None).build();
2982 let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2983 let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2984
2985 service_1.add_node(service_2.local_node_record);
2987
2988 let event = poll_fn(|cx| service_2.poll(cx)).await;
2990 assert_eq!(event, Discv4Event::Ping);
2991
2992 let key1 = kad_key(*service_1.local_peer_id());
2994 match service_2.kbuckets.entry(&key1) {
2995 kbucket::Entry::Present(_entry, status) => {
2996 assert!(!status.is_connected());
2997 }
2998 _ => unreachable!(),
2999 }
3000
3001 let event = poll_fn(|cx| service_1.poll(cx)).await;
3003 assert_eq!(event, Discv4Event::Pong);
3004
3005 let key2 = kad_key(*service_2.local_peer_id());
3007 match service_1.kbuckets.entry(&key2) {
3008 kbucket::Entry::Present(_entry, status) => {
3009 assert!(status.is_connected());
3010 }
3011 _ => unreachable!(),
3012 }
3013
3014 let event = poll_fn(|cx| service_1.poll(cx)).await;
3016 assert_eq!(event, Discv4Event::Ping);
3017
3018 tokio::time::timeout(Duration::from_secs(5), async {
3022 loop {
3023 let event = poll_fn(|cx| service_2.poll(cx)).await;
3024 match event {
3025 Discv4Event::Pong => break,
3026 Discv4Event::EnrRequest | Discv4Event::FindNode => {}
3027 ev => unreachable!("{ev:?}"),
3028 }
3029 }
3030 })
3031 .await
3032 .expect("timed out waiting for Pong from service_2");
3033
3034 match service_2.kbuckets.entry(&key1) {
3036 kbucket::Entry::Present(_entry, status) => {
3037 assert!(status.is_connected());
3038 }
3039 ev => unreachable!("{ev:?}"),
3040 }
3041 }
3042
3043 #[test]
3044 fn test_insert() {
3045 let local_node_record = rng_record(&mut rand_08::thread_rng());
3046 let mut kbuckets: KBucketsTable<NodeKey, NodeEntry> = KBucketsTable::new(
3047 NodeKey::from(&local_node_record).into(),
3048 Duration::from_secs(60),
3049 MAX_NODES_PER_BUCKET,
3050 None,
3051 None,
3052 );
3053
3054 let new_record = rng_record(&mut rand_08::thread_rng());
3055 let key = kad_key(new_record.id);
3056 match kbuckets.entry(&key) {
3057 kbucket::Entry::Absent(entry) => {
3058 let node = NodeEntry::new(new_record);
3059 let _ = entry.insert(
3060 node,
3061 NodeStatus {
3062 direction: ConnectionDirection::Outgoing,
3063 state: ConnectionState::Disconnected,
3064 },
3065 );
3066 }
3067 _ => {
3068 unreachable!()
3069 }
3070 };
3071 match kbuckets.entry(&key) {
3072 kbucket::Entry::Present(_, _) => {}
3073 _ => {
3074 unreachable!()
3075 }
3076 }
3077 }
3078
3079 #[tokio::test]
3080 async fn test_bootnode_not_in_update_stream() {
3081 reth_tracing::init_test_tracing();
3082 let (_, service_1) = create_discv4().await;
3083 let peerid_1 = *service_1.local_peer_id();
3084
3085 let config = Discv4Config::builder().add_boot_node(service_1.local_node_record).build();
3086 service_1.spawn();
3087
3088 let (_, mut service_2) = create_discv4_with_config(config).await;
3089
3090 let mut updates = service_2.update_stream();
3091
3092 service_2.spawn();
3093
3094 let mut bootnode_appeared = false;
3096 let timeout = tokio::time::sleep(Duration::from_secs(1));
3097 tokio::pin!(timeout);
3098
3099 loop {
3100 tokio::select! {
3101 Some(update) = updates.next() => {
3102 if let DiscoveryUpdate::Added(record) = update
3103 && record.id == peerid_1 {
3104 bootnode_appeared = true;
3105 break;
3106 }
3107 }
3108 _ = &mut timeout => break,
3109 }
3110 }
3111
3112 assert!(bootnode_appeared, "Bootnode should appear in update stream");
3114 }
3115
3116 fn insert_proven_node(service: &mut Discv4Service, record: NodeRecord) {
3117 let key = kad_key(record.id);
3118 let _ = service.kbuckets.insert_or_update(
3119 &key,
3120 NodeEntry::new_proven(record),
3121 NodeStatus {
3122 direction: ConnectionDirection::Incoming,
3123 state: ConnectionState::Connected,
3124 },
3125 );
3126 }
3127
3128 fn insert_initial_ping(service: &mut Discv4Service, record: NodeRecord) -> B256 {
3129 let echo_hash = B256::random();
3130 service.pending_pings.insert(
3131 record.id,
3132 PingRequest {
3133 sent_at: Instant::now(),
3134 node: record,
3135 echo_hash,
3136 reason: PingReason::InitialInsert,
3137 },
3138 );
3139 echo_hash
3140 }
3141
3142 fn make_pong(service: &Discv4Service, echo_hash: B256) -> Pong {
3143 Pong {
3144 to: rng_endpoint(&mut rand_08::thread_rng()),
3145 echo: echo_hash,
3146 expire: service.ping_expiration(),
3147 enr_sq: None,
3148 }
3149 }
3150
3151 #[tokio::test]
3152 async fn test_lookup_reset_on_first_bootnode_pong() {
3153 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
3154 let config = Discv4Config::builder().add_boot_node(record).build();
3155 let (_discv4, mut service) = create_discv4_with_config(config).await;
3156
3157 assert!(service.pending_lookup_reset);
3159
3160 insert_proven_node(&mut service, record);
3162 let echo_hash = insert_initial_ping(&mut service, record);
3163
3164 service.on_pong(make_pong(&service, echo_hash), record.udp_addr(), record.id);
3166
3167 assert!(!service.pending_lookup_reset, "flag should be consumed");
3169 }
3170
3171 #[tokio::test]
3172 async fn test_lookup_reset_fires_only_once() {
3173 let records: Vec<_> = (0..2)
3174 .map(|_| NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random()))
3175 .collect();
3176 let config = Discv4Config::builder().add_boot_nodes(records.clone()).build();
3177 let (_discv4, mut service) = create_discv4_with_config(config).await;
3178
3179 for &r in &records {
3181 insert_proven_node(&mut service, r);
3182 }
3183 let hashes: Vec<_> =
3184 records.iter().map(|r| insert_initial_ping(&mut service, *r)).collect();
3185
3186 service.on_pong(make_pong(&service, hashes[0]), records[0].udp_addr(), records[0].id);
3188 assert!(!service.pending_lookup_reset);
3189
3190 service.on_pong(make_pong(&service, hashes[1]), records[1].udp_addr(), records[1].id);
3192 assert!(!service.pending_lookup_reset);
3193 }
3194
3195 #[tokio::test]
3196 async fn test_lookup_reset_not_triggered_by_non_bootnode() {
3197 let bootnode = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
3198 let config = Discv4Config::builder().add_boot_node(bootnode).build();
3199 let (_discv4, mut service) = create_discv4_with_config(config).await;
3200
3201 assert!(service.pending_lookup_reset);
3202
3203 let stranger = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
3205 insert_proven_node(&mut service, stranger);
3206 let echo_hash = insert_initial_ping(&mut service, stranger);
3207 service.on_pong(make_pong(&service, echo_hash), stranger.udp_addr(), stranger.id);
3208
3209 assert!(service.pending_lookup_reset, "flag should not be consumed by non-bootnode");
3210 }
3211
3212 #[tokio::test]
3213 async fn test_lookup_reset_disabled_when_lookup_disabled() {
3214 let config = Discv4Config::builder().enable_lookup(false).build();
3215 let (_discv4, service) = create_discv4_with_config(config).await;
3216
3217 assert!(!service.pending_lookup_reset);
3219 }
3220}