1#![doc(
20 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
21 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
22 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
23)]
24#![cfg_attr(not(test), warn(unused_crate_dependencies))]
25#![cfg_attr(docsrs, feature(doc_cfg))]
26
27use crate::{
28 error::{DecodePacketError, Discv4Error},
29 proto::{FindNode, Message, Neighbours, Packet, Ping, Pong},
30};
31use alloy_primitives::{bytes::Bytes, hex, B256};
32use discv5::{
33 kbucket,
34 kbucket::{
35 BucketInsertResult, Distance, Entry as BucketEntry, InsertResult, KBucketsTable,
36 NodeStatus, MAX_NODES_PER_BUCKET,
37 },
38 ConnectionDirection, ConnectionState,
39};
40use enr::Enr;
41use itertools::Itertools;
42use parking_lot::Mutex;
43use proto::{EnrRequest, EnrResponse};
44use reth_ethereum_forks::ForkId;
45use reth_network_peers::{pk2id, PeerId};
46use secp256k1::SecretKey;
47use std::{
48 cell::RefCell,
49 collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque},
50 fmt, io,
51 net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
52 pin::Pin,
53 rc::Rc,
54 sync::Arc,
55 task::{ready, Context, Poll},
56 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
57};
58use tokio::{
59 net::UdpSocket,
60 sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::Sender as OneshotSender},
61 task::{JoinHandle, JoinSet},
62 time::Interval,
63};
64use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
65use tracing::{debug, trace};
66
67pub mod error;
68pub mod proto;
69
70mod config;
71pub use config::{Discv4Config, Discv4ConfigBuilder};
72
73mod node;
74use node::{kad_key, NodeKey};
75
76mod table;
77
78pub use reth_network_peers::NodeRecord;
80
81#[cfg(any(test, feature = "test-utils"))]
82pub mod test_utils;
83
84use crate::table::PongTable;
85use reth_net_nat::ResolveNatInterval;
86pub use reth_net_nat::{external_ip, NatResolver};
88
89pub const DEFAULT_DISCOVERY_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
93
94pub const DEFAULT_DISCOVERY_PORT: u16 = 30303;
98
99pub const DEFAULT_DISCOVERY_ADDRESS: SocketAddr =
103 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT));
104
105const MAX_PACKET_SIZE: usize = 1280;
107
108const MIN_PACKET_SIZE: usize = 32 + 65 + 1;
110
111const ALPHA: usize = 3;
113
114const MAX_NODES_PING: usize = 2 * MAX_NODES_PER_BUCKET;
119
120const MAX_QUEUED_PINGS: usize = 2 * MAX_NODES_PER_BUCKET;
128
129const SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS: usize = (MAX_PACKET_SIZE - 109) / 91;
135
136const ENDPOINT_PROOF_EXPIRATION: Duration = Duration::from_secs(24 * 60 * 60);
140
141const EXPIRE_DURATION: Duration = Duration::from_secs(60 * 60);
143
144const UDP_MESSAGE_POLL_LOOP_BUDGET: i32 = 4;
149
150type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>;
151type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>;
152
153pub(crate) type IngressSender = mpsc::Sender<IngressEvent>;
154pub(crate) type IngressReceiver = mpsc::Receiver<IngressEvent>;
155
156type NodeRecordSender = OneshotSender<Vec<NodeRecord>>;
157
158#[derive(Debug, Clone)]
165pub struct Discv4 {
166 local_addr: SocketAddr,
168 to_service: mpsc::UnboundedSender<Discv4Command>,
170 node_record: Arc<Mutex<NodeRecord>>,
174}
175
176impl Discv4 {
177 pub async fn spawn(
181 local_address: SocketAddr,
182 local_enr: NodeRecord,
183 secret_key: SecretKey,
184 config: Discv4Config,
185 ) -> io::Result<Self> {
186 let (discv4, service) = Self::bind(local_address, local_enr, secret_key, config).await?;
187
188 service.spawn();
189
190 Ok(discv4)
191 }
192
193 #[cfg(feature = "test-utils")]
197 pub fn noop() -> Self {
198 let (to_service, _rx) = mpsc::unbounded_channel();
199 let local_addr =
200 (IpAddr::from(std::net::Ipv4Addr::UNSPECIFIED), DEFAULT_DISCOVERY_PORT).into();
201 Self {
202 local_addr,
203 to_service,
204 node_record: Arc::new(Mutex::new(NodeRecord::new(
205 "127.0.0.1:3030".parse().unwrap(),
206 PeerId::random(),
207 ))),
208 }
209 }
210
211 pub async fn bind(
243 local_address: SocketAddr,
244 local_node_record: NodeRecord,
245 secret_key: SecretKey,
246 config: Discv4Config,
247 ) -> io::Result<(Self, Discv4Service)> {
248 let socket = Arc::new(UdpSocket::bind(local_address).await?);
249 trace!(target: "discv4", local_addr=?socket.local_addr(), "opened UDP socket");
250 let (tx, rx) = mpsc::channel(config.udp_ingress_message_buffer);
251
252 Self::bind_with_socket(socket, Some(tx), rx, local_node_record, secret_key, config)
253 }
254
255 pub fn bind_shared(
259 socket: Arc<UdpSocket>,
260 local_node_record: NodeRecord,
261 secret_key: SecretKey,
262 config: Discv4Config,
263 ) -> io::Result<(Self, Discv4Service, IngressHandler)> {
264 let (tx, rx) = mpsc::channel(config.udp_ingress_message_buffer);
265 let local_id = local_node_record.id;
266 let (discv4, service) =
267 Self::bind_with_socket(socket, None, rx, local_node_record, secret_key, config)?;
268
269 let handler = IngressHandler::new(tx, local_id);
270
271 Ok((discv4, service, handler))
272 }
273
274 fn bind_with_socket(
275 socket: Arc<UdpSocket>,
276 ingress_tx: Option<IngressSender>,
277 ingress_rx: IngressReceiver,
278 mut local_node_record: NodeRecord,
279 secret_key: SecretKey,
280 config: Discv4Config,
281 ) -> io::Result<(Self, Discv4Service)> {
282 let local_addr = socket.local_addr()?;
283 local_node_record.udp_port = local_addr.port();
284
285 let mut service = Discv4Service::new(
286 socket,
287 ingress_tx,
288 ingress_rx,
289 local_addr,
290 local_node_record,
291 secret_key,
292 config,
293 );
294
295 service.resolve_external_ip();
297
298 let discv4 = service.handle();
299 Ok((discv4, service))
300 }
301
302 pub const fn local_addr(&self) -> SocketAddr {
304 self.local_addr
305 }
306
307 pub fn node_record(&self) -> NodeRecord {
311 *self.node_record.lock()
312 }
313
314 pub fn external_ip(&self) -> IpAddr {
316 self.node_record.lock().address
317 }
318
319 pub fn set_lookup_interval(&self, duration: Duration) {
321 self.send_to_service(Discv4Command::SetLookupInterval(duration))
322 }
323
324 pub async fn lookup_self(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
339 self.lookup_node(None).await
340 }
341
342 pub async fn lookup(&self, node_id: PeerId) -> Result<Vec<NodeRecord>, Discv4Error> {
346 self.lookup_node(Some(node_id)).await
347 }
348
349 pub async fn lookup_random(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
351 let target = PeerId::random();
352 self.lookup_node(Some(target)).await
353 }
354
355 pub fn send_lookup(&self, node_id: PeerId) {
357 let cmd = Discv4Command::Lookup { node_id: Some(node_id), tx: None };
358 self.send_to_service(cmd);
359 }
360
361 async fn lookup_node(&self, node_id: Option<PeerId>) -> Result<Vec<NodeRecord>, Discv4Error> {
362 let (tx, rx) = oneshot::channel();
363 let cmd = Discv4Command::Lookup { node_id, tx: Some(tx) };
364 self.to_service.send(cmd)?;
365 Ok(rx.await?)
366 }
367
368 pub fn send_lookup_self(&self) {
370 let cmd = Discv4Command::Lookup { node_id: None, tx: None };
371 self.send_to_service(cmd);
372 }
373
374 pub fn remove_peer(&self, node_id: PeerId) {
376 let cmd = Discv4Command::Remove(node_id);
377 self.send_to_service(cmd);
378 }
379
380 pub fn add_node(&self, node_record: NodeRecord) {
382 let cmd = Discv4Command::Add(node_record);
383 self.send_to_service(cmd);
384 }
385
386 pub fn add_boot_node(&self, node_record: NodeRecord) {
391 let cmd = Discv4Command::AddBootNode(node_record);
392 self.send_to_service(cmd);
393 }
394
395 pub fn ban(&self, node_id: PeerId, ip: IpAddr) {
399 let cmd = Discv4Command::Ban(node_id, ip);
400 self.send_to_service(cmd);
401 }
402
403 pub fn ban_ip(&self, ip: IpAddr) {
407 let cmd = Discv4Command::BanIp(ip);
408 self.send_to_service(cmd);
409 }
410
411 pub fn ban_node(&self, node_id: PeerId) {
415 let cmd = Discv4Command::BanPeer(node_id);
416 self.send_to_service(cmd);
417 }
418
419 pub fn set_tcp_port(&self, port: u16) {
423 let cmd = Discv4Command::SetTcpPort(port);
424 self.send_to_service(cmd);
425 }
426
427 pub fn set_eip868_rlp_pair(&self, key: Vec<u8>, rlp: Bytes) {
433 let cmd = Discv4Command::SetEIP868RLPPair { key, rlp };
434 self.send_to_service(cmd);
435 }
436
437 pub fn set_eip868_rlp(&self, key: Vec<u8>, value: impl alloy_rlp::Encodable) {
441 self.set_eip868_rlp_pair(key, Bytes::from(alloy_rlp::encode(&value)))
442 }
443
444 #[inline]
445 fn send_to_service(&self, cmd: Discv4Command) {
446 let _ = self.to_service.send(cmd).map_err(|err| {
447 debug!(
448 target: "discv4",
449 %err,
450 "channel capacity reached, dropping command",
451 )
452 });
453 }
454
455 pub async fn update_stream(&self) -> Result<ReceiverStream<DiscoveryUpdate>, Discv4Error> {
457 let (tx, rx) = oneshot::channel();
458 let cmd = Discv4Command::Updates(tx);
459 self.to_service.send(cmd)?;
460 Ok(rx.await?)
461 }
462
463 pub fn terminate(&self) {
465 self.send_to_service(Discv4Command::Terminated);
466 }
467}
468
469#[must_use = "Stream does nothing unless polled"]
483pub struct Discv4Service {
484 local_address: SocketAddr,
486 local_eip_868_enr: Enr<SecretKey>,
488 local_node_record: NodeRecord,
490 shared_node_record: Arc<Mutex<NodeRecord>>,
492 secret_key: SecretKey,
494 _socket: Arc<UdpSocket>,
496 _tasks: JoinSet<()>,
500 kbuckets: KBucketsTable<NodeKey, NodeEntry>,
502 ingress: IngressReceiver,
506 egress: EgressSender,
510 queued_pings: VecDeque<(NodeRecord, PingReason)>,
517 pending_pings: HashMap<PeerId, PingRequest>,
519 pending_lookup: HashMap<PeerId, (Instant, LookupContext)>,
524 pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
526 pending_enr_requests: HashMap<PeerId, EnrRequestState>,
528 to_service: mpsc::UnboundedSender<Discv4Command>,
530 commands_rx: mpsc::UnboundedReceiver<Discv4Command>,
532 update_listeners: Vec<mpsc::Sender<DiscoveryUpdate>>,
534 lookup_interval: Interval,
536 lookup_rotator: LookupTargetRotator,
538 pending_lookup_reset: bool,
540 evict_expired_requests_interval: Interval,
542 ping_interval: Interval,
544 resolve_external_ip_interval: Option<ResolveNatInterval>,
546 config: Discv4Config,
548 queued_events: VecDeque<Discv4Event>,
550 received_pongs: PongTable,
552 expire_interval: Interval,
554 cached_find_node: Option<CachedFindNode>,
556}
557
558impl Discv4Service {
559 pub(crate) fn new(
564 socket: Arc<UdpSocket>,
565 ingress_tx: Option<IngressSender>,
566 ingress_rx: IngressReceiver,
567 local_address: SocketAddr,
568 local_node_record: NodeRecord,
569 secret_key: SecretKey,
570 config: Discv4Config,
571 ) -> Self {
572 let (egress_tx, egress_rx) = mpsc::channel(config.udp_egress_message_buffer);
573 let mut tasks = JoinSet::<()>::new();
574
575 if let Some(ingress_tx) = ingress_tx {
576 let udp = Arc::clone(&socket);
577 tasks.spawn(receive_loop(udp, ingress_tx, local_node_record.id));
578 }
579
580 let udp = Arc::clone(&socket);
581 tasks.spawn(send_loop(udp, egress_rx));
582
583 let kbuckets = KBucketsTable::new(
584 NodeKey::from(&local_node_record).into(),
585 Duration::from_secs(60),
586 MAX_NODES_PER_BUCKET,
587 None,
588 None,
589 );
590
591 let self_lookup_interval = tokio::time::interval(config.lookup_interval);
592
593 let ping_interval = tokio::time::interval_at(
596 tokio::time::Instant::now() + config.ping_interval,
597 config.ping_interval,
598 );
599
600 let evict_expired_requests_interval = tokio::time::interval_at(
601 tokio::time::Instant::now() + config.request_timeout,
602 config.request_timeout,
603 );
604
605 let lookup_rotator = if config.enable_dht_random_walk {
606 LookupTargetRotator::default()
607 } else {
608 LookupTargetRotator::local_only()
609 };
610
611 let local_eip_868_enr = {
613 let mut builder = Enr::builder();
614 builder.ip(local_node_record.address);
615 if local_node_record.address.is_ipv4() {
616 builder.udp4(local_node_record.udp_port);
617 builder.tcp4(local_node_record.tcp_port);
618 } else {
619 builder.udp6(local_node_record.udp_port);
620 builder.tcp6(local_node_record.tcp_port);
621 }
622
623 for (key, val) in &config.additional_eip868_rlp_pairs {
624 builder.add_value_rlp(key, val.clone());
625 }
626 builder.build(&secret_key).expect("v4 is set")
627 };
628
629 let (to_service, commands_rx) = mpsc::unbounded_channel();
630
631 let shared_node_record = Arc::new(Mutex::new(local_node_record));
632
633 Self {
634 local_address,
635 local_eip_868_enr,
636 local_node_record,
637 shared_node_record,
638 _socket: socket,
639 kbuckets,
640 secret_key,
641 _tasks: tasks,
642 ingress: ingress_rx,
643 egress: egress_tx,
644 queued_pings: VecDeque::with_capacity(MAX_QUEUED_PINGS),
645 pending_pings: Default::default(),
646 pending_lookup: Default::default(),
647 pending_find_nodes: Default::default(),
648 pending_enr_requests: Default::default(),
649 commands_rx,
650 to_service,
651 update_listeners: Vec::with_capacity(1),
652 lookup_interval: self_lookup_interval,
653 ping_interval,
654 evict_expired_requests_interval,
655 lookup_rotator,
656 pending_lookup_reset: config.enable_lookup,
657 resolve_external_ip_interval: config.resolve_external_ip_interval(),
658 config,
659 queued_events: Default::default(),
660 received_pongs: Default::default(),
661 expire_interval: tokio::time::interval(EXPIRE_DURATION),
662 cached_find_node: None,
663 }
664 }
665
666 pub fn handle(&self) -> Discv4 {
668 Discv4 {
669 local_addr: self.local_address,
670 to_service: self.to_service.clone(),
671 node_record: self.shared_node_record.clone(),
672 }
673 }
674
675 fn enr_seq(&self) -> Option<u64> {
677 self.config.enable_eip868.then(|| self.local_eip_868_enr.seq())
678 }
679
680 pub fn set_lookup_interval(&mut self, duration: Duration) {
682 self.lookup_interval = tokio::time::interval(duration);
683 }
684
685 fn resolve_external_ip(&mut self) {
689 if let Some(r) = &self.resolve_external_ip_interval &&
690 let Some(external_ip) =
691 r.resolver().clone().as_external_ip(self.local_node_record.udp_port)
692 {
693 self.set_external_ip_addr(external_ip);
694 }
695 }
696
697 pub fn set_external_ip_addr(&mut self, external_ip: IpAddr) {
700 if self.local_node_record.address != external_ip {
701 debug!(target: "discv4", ?external_ip, "Updating external ip");
702 self.local_node_record.address = external_ip;
703 let _ = self.local_eip_868_enr.set_ip(external_ip, &self.secret_key);
704 let mut lock = self.shared_node_record.lock();
705 *lock = self.local_node_record;
706 debug!(target: "discv4", enr=?self.local_eip_868_enr, "Updated local ENR");
707 }
708 }
709
710 pub const fn local_peer_id(&self) -> &PeerId {
712 &self.local_node_record.id
713 }
714
715 pub const fn local_addr(&self) -> SocketAddr {
717 self.local_address
718 }
719
720 pub const fn local_enr(&self) -> NodeRecord {
724 self.local_node_record
725 }
726
727 #[cfg(test)]
729 pub const fn local_enr_mut(&mut self) -> &mut NodeRecord {
730 &mut self.local_node_record
731 }
732
733 pub fn contains_node(&self, id: PeerId) -> bool {
735 let key = kad_key(id);
736 self.kbuckets.get_index(&key).is_some()
737 }
738
739 pub fn bootstrap(&mut self) {
751 for record in self.config.bootstrap_nodes.clone() {
752 debug!(target: "discv4", ?record, "pinging boot node");
753 let key = kad_key(record.id);
754 let entry = NodeEntry::new(record);
755
756 match self.kbuckets.insert_or_update(
758 &key,
759 entry,
760 NodeStatus {
761 state: ConnectionState::Disconnected,
762 direction: ConnectionDirection::Outgoing,
763 },
764 ) {
765 InsertResult::Failed(_) => {}
766 _ => {
767 self.try_ping(record, PingReason::InitialInsert);
768 }
769 }
770 }
771 }
772
773 pub fn add_boot_node(&mut self, record: NodeRecord) -> bool {
778 self.config.bootstrap_nodes.insert(record);
779 self.add_node(record)
780 }
781
782 pub fn spawn(mut self) -> JoinHandle<()> {
786 tokio::task::spawn(async move {
787 self.bootstrap();
788
789 while let Some(event) = self.next().await {
790 trace!(target: "discv4", ?event, "processed");
791 }
792 trace!(target: "discv4", "service terminated");
793 })
794 }
795
796 pub fn update_stream(&mut self) -> ReceiverStream<DiscoveryUpdate> {
798 let (tx, rx) = mpsc::channel(512);
799 self.update_listeners.push(tx);
800 ReceiverStream::new(rx)
801 }
802
803 pub fn lookup_self(&mut self) {
805 self.lookup(self.local_node_record.id)
806 }
807
808 pub fn lookup(&mut self, target: PeerId) {
818 self.lookup_with(target, None)
819 }
820
821 fn lookup_with(&mut self, target: PeerId, tx: Option<NodeRecordSender>) {
831 trace!(target: "discv4", ?target, "Starting lookup");
832 let target_key = kad_key(target);
833
834 let ctx = LookupContext::new(
837 target_key.clone(),
838 self.kbuckets
839 .closest_values(&target_key)
840 .filter(|node| {
841 node.value.has_endpoint_proof &&
842 !self.pending_find_nodes.contains_key(&node.key.preimage().0)
843 })
844 .take(MAX_NODES_PER_BUCKET)
845 .map(|n| (target_key.distance(&n.key), n.value.record)),
846 tx,
847 );
848
849 let closest = ctx.closest(ALPHA);
851
852 if closest.is_empty() && self.pending_find_nodes.is_empty() {
853 self.bootstrap();
858 return
859 }
860
861 trace!(target: "discv4", ?target, num = closest.len(), "Start lookup closest nodes");
862
863 for node in closest {
864 self.find_node_checked(&node, ctx.clone());
868 }
869 }
870
871 fn find_node(&mut self, node: &NodeRecord, ctx: LookupContext) {
875 trace!(target: "discv4", ?node, lookup=?ctx.target(), "Sending FindNode");
876 ctx.mark_queried(node.id);
877 let (payload, hash) = self.find_node_packet(ctx.target());
878 let to = node.udp_addr();
879 trace!(target: "discv4", ?to, ?hash, "sending FindNode packet");
880 let _ = self.egress.try_send((payload, to)).map_err(|err| {
881 debug!(target: "discv4", %err, "dropped outgoing packet");
882 });
883 self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx));
884 }
885
886 fn find_node_checked(&mut self, node: &NodeRecord, ctx: LookupContext) {
891 let max_failures = self.config.max_find_node_failures;
892 let needs_ping = self
893 .on_entry(node.id, |entry| entry.exceeds_find_node_failures(max_failures))
894 .unwrap_or(true);
895 if needs_ping {
896 self.try_ping(*node, PingReason::Lookup(*node, ctx))
897 } else {
898 self.find_node(node, ctx)
899 }
900 }
901
902 fn notify(&mut self, update: DiscoveryUpdate) {
906 self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) {
907 Ok(()) => true,
908 Err(err) => match err {
909 TrySendError::Full(_) => true,
910 TrySendError::Closed(_) => false,
911 },
912 });
913 }
914
915 pub fn ban_ip(&mut self, ip: IpAddr) {
917 self.config.ban_list.ban_ip(ip);
918 }
919
920 pub fn ban_node(&mut self, node_id: PeerId) {
922 self.remove_node(node_id);
923 self.config.ban_list.ban_peer(node_id);
924 }
925
926 pub fn ban_ip_until(&mut self, ip: IpAddr, until: Instant) {
928 self.config.ban_list.ban_ip_until(ip, until);
929 }
930
931 pub fn ban_node_until(&mut self, node_id: PeerId, until: Instant) {
933 self.remove_node(node_id);
934 self.config.ban_list.ban_peer_until(node_id, until);
935 }
936
937 pub fn remove_node(&mut self, node_id: PeerId) -> bool {
942 let key = kad_key(node_id);
943 self.remove_key(node_id, key)
944 }
945
946 pub fn soft_remove_node(&mut self, node_id: PeerId) -> bool {
951 let key = kad_key(node_id);
952 let Some(bucket) = self.kbuckets.get_bucket(&key) else { return false };
953 if bucket.num_entries() < MAX_NODES_PER_BUCKET / 2 {
954 return false
956 }
957 self.remove_key(node_id, key)
958 }
959
960 fn remove_key(&mut self, node_id: PeerId, key: discv5::Key<NodeKey>) -> bool {
961 let removed = self.kbuckets.remove(&key);
962 if removed {
963 trace!(target: "discv4", ?node_id, "removed node");
964 self.notify(DiscoveryUpdate::Removed(node_id));
965 }
966 removed
967 }
968
969 pub fn num_connected(&self) -> usize {
971 self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected())
972 }
973
974 fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool {
976 if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) &&
977 timestamp.elapsed() < self.config.bond_expiration
978 {
979 return true
980 }
981 false
982 }
983
984 fn on_entry<F, R>(&mut self, peer_id: PeerId, f: F) -> Option<R>
986 where
987 F: FnOnce(&NodeEntry) -> R,
988 {
989 let key = kad_key(peer_id);
990 match self.kbuckets.entry(&key) {
991 BucketEntry::Present(entry, _) => Some(f(entry.value())),
992 BucketEntry::Pending(entry, _) => Some(f(entry.value())),
993 _ => None,
994 }
995 }
996
997 fn update_on_reping(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
1004 if record.id == self.local_node_record.id {
1005 return
1006 }
1007
1008 if !self.config.enable_eip868 {
1010 last_enr_seq = None;
1011 }
1012
1013 let key = kad_key(record.id);
1014 let old_enr = match self.kbuckets.entry(&key) {
1015 kbucket::Entry::Present(mut entry, _) => {
1016 entry.value_mut().update_with_enr(last_enr_seq)
1017 }
1018 kbucket::Entry::Pending(mut entry, _) => {
1019 entry.value_mut().update_with_enr(last_enr_seq)
1020 }
1021 _ => return,
1022 };
1023
1024 match (last_enr_seq, old_enr) {
1026 (Some(new), Some(old)) if new > old => {
1027 self.send_enr_request(record);
1028 }
1029 (Some(_), None) => {
1030 self.send_enr_request(record);
1032 }
1033 _ => {}
1034 };
1035 }
1036
1037 fn update_on_pong(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
1039 if record.id == *self.local_peer_id() {
1040 return
1041 }
1042
1043 if !self.config.enable_eip868 {
1045 last_enr_seq = None;
1046 }
1047
1048 let has_enr_seq = last_enr_seq.is_some();
1051
1052 let key = kad_key(record.id);
1053 match self.kbuckets.entry(&key) {
1054 kbucket::Entry::Present(mut entry, old_status) => {
1055 entry.value_mut().establish_proof();
1057 entry.value_mut().update_with_enr(last_enr_seq);
1058
1059 if !old_status.is_connected() {
1060 let _ = entry.update(ConnectionState::Connected, Some(old_status.direction));
1061 trace!(target: "discv4", ?record, "added after successful endpoint proof");
1062 self.notify(DiscoveryUpdate::Added(record));
1063
1064 if has_enr_seq {
1065 self.send_enr_request(record);
1067 }
1068 }
1069 }
1070 kbucket::Entry::Pending(mut entry, mut status) => {
1071 entry.value_mut().establish_proof();
1073 entry.value_mut().update_with_enr(last_enr_seq);
1074
1075 if !status.is_connected() {
1076 status.state = ConnectionState::Connected;
1077 let _ = entry.update(status);
1078 trace!(target: "discv4", ?record, "added after successful endpoint proof");
1079 self.notify(DiscoveryUpdate::Added(record));
1080
1081 if has_enr_seq {
1082 self.send_enr_request(record);
1084 }
1085 }
1086 }
1087 _ => {}
1088 };
1089 }
1090
1091 pub fn add_all_nodes(&mut self, records: impl IntoIterator<Item = NodeRecord>) {
1095 for record in records {
1096 self.add_node(record);
1097 }
1098 }
1099
1100 pub fn add_node(&mut self, record: NodeRecord) -> bool {
1106 let key = kad_key(record.id);
1107 match self.kbuckets.entry(&key) {
1108 kbucket::Entry::Absent(entry) => {
1109 let node = NodeEntry::new(record);
1110 match entry.insert(
1111 node,
1112 NodeStatus {
1113 direction: ConnectionDirection::Outgoing,
1114 state: ConnectionState::Disconnected,
1115 },
1116 ) {
1117 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1118 trace!(target: "discv4", ?record, "inserted new record");
1119 }
1120 _ => return false,
1121 }
1122 }
1123 _ => return false,
1124 }
1125
1126 self.try_ping(record, PingReason::InitialInsert);
1128 true
1129 }
1130
1131 pub(crate) fn send_packet(&self, msg: Message, to: SocketAddr) -> B256 {
1133 let (payload, hash) = msg.encode(&self.secret_key);
1134 trace!(target: "discv4", r#type=?msg.msg_type(), ?to, ?hash, "sending packet");
1135 let _ = self.egress.try_send((payload, to)).map_err(|err| {
1136 debug!(
1137 target: "discv4",
1138 %err,
1139 "dropped outgoing packet",
1140 );
1141 });
1142 hash
1143 }
1144
1145 fn find_node_packet(&mut self, target: PeerId) -> (Bytes, B256) {
1147 let expire = self.find_node_expiration();
1148 let cache_ttl = self.config.request_timeout / 4;
1149 CachedFindNode::get_or_sign(
1150 &mut self.cached_find_node,
1151 target,
1152 cache_ttl,
1153 &self.secret_key,
1154 expire,
1155 )
1156 }
1157
1158 fn on_ping(&mut self, ping: Ping, remote_addr: SocketAddr, remote_id: PeerId, hash: B256) {
1160 if self.is_expired(ping.expire) {
1161 return
1163 }
1164
1165 let record = NodeRecord {
1167 address: remote_addr.ip(),
1168 udp_port: remote_addr.port(),
1169 tcp_port: ping.from.tcp_port,
1170 id: remote_id,
1171 }
1172 .into_ipv4_mapped();
1173
1174 let key = kad_key(record.id);
1175
1176 let mut is_new_insert = false;
1183 let mut needs_bond = false;
1184 let mut is_proven = false;
1185
1186 let old_enr = match self.kbuckets.entry(&key) {
1187 kbucket::Entry::Present(mut entry, _) => {
1188 if entry.value().is_expired() {
1189 needs_bond = true;
1192 } else {
1193 is_proven = entry.value().has_endpoint_proof;
1194 }
1195 entry.value_mut().update_with_enr(ping.enr_sq)
1196 }
1197 kbucket::Entry::Pending(mut entry, _) => {
1198 if entry.value().is_expired() {
1199 needs_bond = true;
1202 } else {
1203 is_proven = entry.value().has_endpoint_proof;
1204 }
1205 entry.value_mut().update_with_enr(ping.enr_sq)
1206 }
1207 kbucket::Entry::Absent(entry) => {
1208 let mut node = NodeEntry::new(record);
1209 node.last_enr_seq = ping.enr_sq;
1210
1211 match entry.insert(
1212 node,
1213 NodeStatus {
1214 direction: ConnectionDirection::Incoming,
1215 state: ConnectionState::Disconnected,
1217 },
1218 ) {
1219 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1220 is_new_insert = true;
1222 }
1223 BucketInsertResult::Full => {
1224 trace!(target: "discv4", ?record, "discovered new record but bucket is full");
1228 self.notify(DiscoveryUpdate::DiscoveredAtCapacity(record));
1229 needs_bond = true;
1230 }
1231 BucketInsertResult::TooManyIncoming | BucketInsertResult::NodeExists => {
1232 needs_bond = true;
1233 }
1235 BucketInsertResult::FailedFilter => return,
1236 }
1237
1238 None
1239 }
1240 kbucket::Entry::SelfEntry => return,
1241 };
1242
1243 let pong = Message::Pong(Pong {
1246 to: record.into(),
1248 echo: hash,
1249 expire: ping.expire,
1250 enr_sq: self.enr_seq(),
1251 });
1252 self.send_packet(pong, remote_addr);
1253
1254 if is_new_insert {
1256 self.try_ping(record, PingReason::InitialInsert);
1257 } else if needs_bond {
1258 self.try_ping(record, PingReason::EstablishBond);
1259 } else if is_proven {
1260 if let Some((_, ctx)) = self.pending_lookup.remove(&record.id) {
1264 if self.pending_find_nodes.contains_key(&record.id) {
1265 ctx.unmark_queried(record.id);
1268 } else {
1269 self.find_node(&record, ctx);
1272 }
1273 }
1274 } else {
1275 match (ping.enr_sq, old_enr) {
1277 (Some(new), Some(old)) if new > old => {
1278 self.send_enr_request(record);
1279 }
1280 (Some(_), None) => {
1281 self.send_enr_request(record);
1282 }
1283 _ => {}
1284 };
1285 }
1286 }
1287
1288 fn try_ping(&mut self, node: NodeRecord, reason: PingReason) {
1290 if node.id == *self.local_peer_id() {
1291 return
1293 }
1294
1295 if self.pending_pings.contains_key(&node.id) ||
1296 self.pending_find_nodes.contains_key(&node.id)
1297 {
1298 return
1299 }
1300
1301 if self.queued_pings.iter().any(|(n, _)| n.id == node.id) {
1302 return
1303 }
1304
1305 if self.pending_pings.len() < MAX_NODES_PING {
1306 self.send_ping(node, reason);
1307 } else if self.queued_pings.len() < MAX_QUEUED_PINGS {
1308 self.queued_pings.push_back((node, reason));
1309 }
1310 }
1311
1312 pub(crate) fn send_ping(&mut self, node: NodeRecord, reason: PingReason) -> B256 {
1316 let remote_addr = node.udp_addr();
1317 let id = node.id;
1318 let ping = Ping {
1319 from: self.local_node_record.into(),
1320 to: node.into(),
1321 expire: self.ping_expiration(),
1322 enr_sq: self.enr_seq(),
1323 };
1324 trace!(target: "discv4", ?ping, "sending ping");
1325 let echo_hash = self.send_packet(Message::Ping(ping), remote_addr);
1326
1327 self.pending_pings
1328 .insert(id, PingRequest { sent_at: Instant::now(), node, echo_hash, reason });
1329 echo_hash
1330 }
1331
1332 pub(crate) fn send_enr_request(&mut self, node: NodeRecord) {
1336 if !self.config.enable_eip868 {
1337 return
1338 }
1339 let remote_addr = node.udp_addr();
1340 let enr_request = EnrRequest { expire: self.enr_request_expiration() };
1341
1342 trace!(target: "discv4", ?enr_request, "sending enr request");
1343 let echo_hash = self.send_packet(Message::EnrRequest(enr_request), remote_addr);
1344
1345 self.pending_enr_requests
1346 .insert(node.id, EnrRequestState { sent_at: Instant::now(), echo_hash });
1347 }
1348
1349 fn on_pong(&mut self, pong: Pong, remote_addr: SocketAddr, remote_id: PeerId) {
1351 if self.is_expired(pong.expire) {
1352 return
1353 }
1354
1355 let PingRequest { node, reason, .. } = match self.pending_pings.entry(remote_id) {
1356 Entry::Occupied(entry) => {
1357 {
1358 let request = entry.get();
1359 if request.echo_hash != pong.echo {
1360 trace!(target: "discv4", from=?remote_addr, expected=?request.echo_hash, echo_hash=?pong.echo,"Got unexpected Pong");
1361 return
1362 }
1363 }
1364 entry.remove()
1365 }
1366 Entry::Vacant(_) => return,
1367 };
1368
1369 self.received_pongs.on_pong(remote_id, remote_addr.ip());
1371
1372 match reason {
1373 PingReason::InitialInsert => {
1374 self.update_on_pong(node, pong.enr_sq);
1375 if self.pending_lookup_reset && self.config.bootstrap_nodes.contains(&node) {
1378 self.pending_lookup_reset = false;
1379 self.lookup_interval.reset();
1380 }
1381 }
1382 PingReason::EstablishBond => {
1383 self.update_on_pong(node, pong.enr_sq);
1385 }
1386 PingReason::RePing => {
1387 self.update_on_reping(node, pong.enr_sq);
1388 }
1389 PingReason::Lookup(node, ctx) => {
1390 self.update_on_pong(node, pong.enr_sq);
1391 self.pending_lookup.insert(node.id, (Instant::now(), ctx));
1396 }
1397 }
1398 }
1399
1400 fn on_find_node(&mut self, msg: FindNode, remote_addr: SocketAddr, node_id: PeerId) {
1402 if self.is_expired(msg.expire) {
1403 return
1405 }
1406 if node_id == *self.local_peer_id() {
1407 return
1409 }
1410
1411 if self.has_bond(node_id, remote_addr.ip()) {
1412 self.respond_closest(msg.id, remote_addr)
1413 }
1414 }
1415
1416 fn on_enr_response(&mut self, msg: EnrResponse, remote_addr: SocketAddr, id: PeerId) {
1418 trace!(target: "discv4", ?remote_addr, ?msg, "received ENR response");
1419 if let Some(resp) = self.pending_enr_requests.remove(&id) {
1420 let enr_id = pk2id(&msg.enr.public_key());
1422 if id != enr_id {
1423 return
1424 }
1425
1426 if resp.echo_hash == msg.request_hash {
1427 let key = kad_key(id);
1428 let fork_id = msg.eth_fork_id();
1429 let (record, old_fork_id) = match self.kbuckets.entry(&key) {
1430 kbucket::Entry::Present(mut entry, _) => {
1431 let id = entry.value_mut().update_with_fork_id(fork_id);
1432 (entry.value().record, id)
1433 }
1434 kbucket::Entry::Pending(mut entry, _) => {
1435 let id = entry.value_mut().update_with_fork_id(fork_id);
1436 (entry.value().record, id)
1437 }
1438 _ => return,
1439 };
1440 match (fork_id, old_fork_id) {
1441 (Some(new), Some(old)) if new != old => {
1442 self.notify(DiscoveryUpdate::EnrForkId(record, new))
1443 }
1444 (Some(new), None) => self.notify(DiscoveryUpdate::EnrForkId(record, new)),
1445 _ => {}
1446 }
1447 }
1448 }
1449 }
1450
1451 fn on_enr_request(
1453 &self,
1454 msg: EnrRequest,
1455 remote_addr: SocketAddr,
1456 id: PeerId,
1457 request_hash: B256,
1458 ) {
1459 if !self.config.enable_eip868 || self.is_expired(msg.expire) {
1460 return
1461 }
1462
1463 if self.has_bond(id, remote_addr.ip()) {
1464 self.send_packet(
1465 Message::EnrResponse(EnrResponse {
1466 request_hash,
1467 enr: self.local_eip_868_enr.clone(),
1468 }),
1469 remote_addr,
1470 );
1471 }
1472 }
1473
1474 fn on_neighbours(&mut self, msg: Neighbours, remote_addr: SocketAddr, node_id: PeerId) {
1477 if self.is_expired(msg.expire) {
1478 return
1480 }
1481 let ctx = match self.pending_find_nodes.entry(node_id) {
1483 Entry::Occupied(mut entry) => {
1484 {
1485 let request = entry.get_mut();
1486 request.answered = true;
1488 let total = request.response_count + msg.nodes.len();
1489
1490 if total <= MAX_NODES_PER_BUCKET {
1492 request.response_count = total;
1493 } else {
1494 trace!(target: "discv4", total, from=?remote_addr, "Received neighbors packet entries exceeds max nodes per bucket");
1495 return
1496 }
1497 };
1498
1499 if entry.get().response_count == MAX_NODES_PER_BUCKET {
1500 let ctx = entry.remove().lookup_context;
1502 ctx.mark_responded(node_id);
1503 ctx
1504 } else {
1505 entry.get().lookup_context.clone()
1506 }
1507 }
1508 Entry::Vacant(_) => {
1509 trace!(target: "discv4", from=?remote_addr, "Received unsolicited Neighbours");
1511 return
1512 }
1513 };
1514
1515 trace!(target: "discv4",
1517 target=format!("{:#?}", node_id),
1518 peers_count=msg.nodes.len(),
1519 peers=format!("[{:#}]", msg.nodes.iter()
1520 .map(|node_rec| node_rec.id
1521 ).format(", ")),
1522 "Received peers from Neighbours packet"
1523 );
1524
1525 for node in msg.nodes.into_iter().map(NodeRecord::into_ipv4_mapped) {
1528 if self.config.ban_list.is_banned(&node.id, &node.address) {
1530 trace!(target: "discv4", peer_id=?node.id, ip=?node.address, "ignoring banned record");
1531 continue
1532 }
1533
1534 ctx.add_node(node);
1535 }
1536
1537 let closest =
1539 ctx.filter_closest(ALPHA, |node| !self.pending_find_nodes.contains_key(&node.id));
1540
1541 for closest in closest {
1542 let key = kad_key(closest.id);
1543 match self.kbuckets.entry(&key) {
1544 BucketEntry::Absent(entry) => {
1545 ctx.mark_queried(closest.id);
1551 let node = NodeEntry::new(closest);
1552 match entry.insert(
1553 node,
1554 NodeStatus {
1555 direction: ConnectionDirection::Outgoing,
1556 state: ConnectionState::Disconnected,
1557 },
1558 ) {
1559 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1560 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1562 }
1563 BucketInsertResult::Full => {
1564 self.notify(DiscoveryUpdate::DiscoveredAtCapacity(closest))
1566 }
1567 _ => {}
1568 }
1569 }
1570 BucketEntry::SelfEntry => {
1571 }
1573 BucketEntry::Present(entry, _) => {
1574 if entry.value().has_endpoint_proof {
1575 if entry
1576 .value()
1577 .exceeds_find_node_failures(self.config.max_find_node_failures)
1578 {
1579 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1580 } else {
1581 self.find_node(&closest, ctx.clone());
1582 }
1583 }
1584 }
1585 BucketEntry::Pending(entry, _) => {
1586 if entry.value().has_endpoint_proof {
1587 if entry
1588 .value()
1589 .exceeds_find_node_failures(self.config.max_find_node_failures)
1590 {
1591 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1592 } else {
1593 self.find_node(&closest, ctx.clone());
1594 }
1595 }
1596 }
1597 }
1598 }
1599 }
1600
1601 fn respond_closest(&mut self, target: PeerId, to: SocketAddr) {
1603 let key = kad_key(target);
1604 let expire = self.send_neighbours_expiration();
1605
1606 let enforce_eip868 = self.config.enable_eip868 && self.config.enforce_eip868_neighbours;
1607
1608 let closest_nodes = self
1611 .kbuckets
1612 .closest_values(&key)
1613 .filter(|entry| !enforce_eip868 || entry.value.fork_id.is_some())
1614 .take(MAX_NODES_PER_BUCKET)
1615 .collect::<Vec<_>>();
1616
1617 if closest_nodes.is_empty() {
1618 let msg = Message::Neighbours(Neighbours { nodes: Vec::new(), expire });
1620 self.send_packet(msg, to);
1621 return;
1622 }
1623
1624 for nodes in closest_nodes.chunks(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) {
1625 let nodes = nodes.iter().map(|node| node.value.record).collect::<Vec<NodeRecord>>();
1626 trace!(target: "discv4", len = nodes.len(), to=?to,"Sent neighbours packet");
1627 let msg = Message::Neighbours(Neighbours { nodes, expire });
1628 self.send_packet(msg, to);
1629 }
1630 }
1631
1632 fn evict_expired_requests(&mut self, now: Instant) {
1633 self.pending_enr_requests.retain(|_node_id, enr_request| {
1634 now.duration_since(enr_request.sent_at) < self.config.enr_expiration
1635 });
1636
1637 let mut failed_pings = Vec::new();
1638 self.pending_pings.retain(|node_id, ping_request| {
1639 if now.duration_since(ping_request.sent_at) > self.config.ping_expiration {
1640 failed_pings.push(*node_id);
1641 return false
1642 }
1643 true
1644 });
1645
1646 if !failed_pings.is_empty() {
1647 trace!(target: "discv4", num=%failed_pings.len(), "evicting nodes due to failed pong");
1649 for node_id in failed_pings {
1650 self.remove_node(node_id);
1651 }
1652 }
1653
1654 let mut failed_lookups = Vec::new();
1655 self.pending_lookup.retain(|node_id, (lookup_sent_at, _)| {
1656 if now.duration_since(*lookup_sent_at) > self.config.request_timeout {
1657 failed_lookups.push(*node_id);
1658 return false
1659 }
1660 true
1661 });
1662
1663 if !failed_lookups.is_empty() {
1664 trace!(target: "discv4", num=%failed_lookups.len(), "evicting nodes due to failed lookup");
1666 for node_id in failed_lookups {
1667 self.remove_node(node_id);
1668 }
1669 }
1670
1671 self.evict_failed_find_nodes(now);
1672 }
1673
1674 fn evict_failed_find_nodes(&mut self, now: Instant) {
1676 let mut failed_find_nodes = Vec::new();
1677 self.pending_find_nodes.retain(|node_id, find_node_request| {
1678 if now.duration_since(find_node_request.sent_at) > self.config.neighbours_expiration {
1679 if !find_node_request.answered {
1680 failed_find_nodes.push(*node_id);
1683 }
1684 return false
1685 }
1686 true
1687 });
1688
1689 if failed_find_nodes.is_empty() {
1690 return
1691 }
1692
1693 trace!(target: "discv4", num=%failed_find_nodes.len(), "processing failed find nodes");
1694
1695 for node_id in failed_find_nodes {
1696 let key = kad_key(node_id);
1697 let failures = match self.kbuckets.entry(&key) {
1698 kbucket::Entry::Present(mut entry, _) => {
1699 entry.value_mut().inc_failed_request();
1700 entry.value().find_node_failures
1701 }
1702 kbucket::Entry::Pending(mut entry, _) => {
1703 entry.value_mut().inc_failed_request();
1704 entry.value().find_node_failures
1705 }
1706 _ => continue,
1707 };
1708
1709 if failures > self.config.max_find_node_failures {
1713 self.soft_remove_node(node_id);
1714 }
1715 }
1716 }
1717
1718 fn re_ping_oldest(&mut self) {
1723 let mut nodes = self
1724 .kbuckets
1725 .iter_ref()
1726 .filter(|entry| entry.node.value.is_expired())
1727 .map(|n| n.node.value)
1728 .collect::<Vec<_>>();
1729 nodes.sort_unstable_by_key(|a| a.last_seen);
1730 let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::<Vec<_>>();
1731 for node in to_ping {
1732 self.try_ping(node, PingReason::RePing)
1733 }
1734 }
1735
1736 fn is_expired(&self, expiration: u64) -> bool {
1738 self.ensure_not_expired(expiration).is_err()
1739 }
1740
1741 fn ensure_not_expired(&self, timestamp: u64) -> Result<(), ()> {
1751 let _ = i64::try_from(timestamp).map_err(drop)?;
1753
1754 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
1755 if self.config.enforce_expiration_timestamps && timestamp < now {
1756 trace!(target: "discv4", "Expired packet");
1757 return Err(())
1758 }
1759 Ok(())
1760 }
1761
1762 fn ping_buffered(&mut self) {
1764 while self.pending_pings.len() < MAX_NODES_PING {
1765 match self.queued_pings.pop_front() {
1766 Some((next, reason)) => self.try_ping(next, reason),
1767 None => break,
1768 }
1769 }
1770 }
1771
1772 fn ping_expiration(&self) -> u64 {
1773 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.ping_expiration)
1774 .as_secs()
1775 }
1776
1777 fn find_node_expiration(&self) -> u64 {
1778 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.request_timeout)
1779 .as_secs()
1780 }
1781
1782 fn enr_request_expiration(&self) -> u64 {
1783 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.enr_expiration)
1784 .as_secs()
1785 }
1786
1787 fn send_neighbours_expiration(&self) -> u64 {
1788 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.neighbours_expiration)
1789 .as_secs()
1790 }
1791
1792 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Discv4Event> {
1798 loop {
1799 if let Some(event) = self.queued_events.pop_front() {
1801 return Poll::Ready(event)
1802 }
1803
1804 if self.config.enable_lookup {
1806 while self.lookup_interval.poll_tick(cx).is_ready() {
1807 let target = self.lookup_rotator.next(&self.local_node_record.id);
1808 self.lookup_with(target, None);
1809 }
1810 }
1811
1812 while self.ping_interval.poll_tick(cx).is_ready() {
1814 self.re_ping_oldest();
1815 }
1816
1817 if let Some(Poll::Ready(Some(ip))) =
1818 self.resolve_external_ip_interval.as_mut().map(|r| r.poll_tick(cx))
1819 {
1820 self.set_external_ip_addr(ip);
1821 }
1822
1823 while let Poll::Ready(Some(cmd)) = self.commands_rx.poll_recv(cx) {
1825 match cmd {
1826 Discv4Command::Add(enr) => {
1827 self.add_node(enr);
1828 }
1829 Discv4Command::AddBootNode(record) => {
1830 self.add_boot_node(record);
1831 }
1832 Discv4Command::Lookup { node_id, tx } => {
1833 let node_id = node_id.unwrap_or(self.local_node_record.id);
1834 self.lookup_with(node_id, tx);
1835 }
1836 Discv4Command::SetLookupInterval(duration) => {
1837 self.set_lookup_interval(duration);
1838 }
1839 Discv4Command::Updates(tx) => {
1840 let rx = self.update_stream();
1841 let _ = tx.send(rx);
1842 }
1843 Discv4Command::BanPeer(node_id) => self.ban_node(node_id),
1844 Discv4Command::Remove(node_id) => {
1845 self.remove_node(node_id);
1846 }
1847 Discv4Command::Ban(node_id, ip) => {
1848 self.ban_node(node_id);
1849 self.ban_ip(ip);
1850 }
1851 Discv4Command::BanIp(ip) => {
1852 self.ban_ip(ip);
1853 }
1854 Discv4Command::SetEIP868RLPPair { key, rlp } => {
1855 debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair");
1856
1857 let _ = self.local_eip_868_enr.insert_raw_rlp(key, rlp, &self.secret_key);
1858 }
1859 Discv4Command::SetTcpPort(port) => {
1860 debug!(target: "discv4", %port, "Update tcp port");
1861 self.local_node_record.tcp_port = port;
1862 if self.local_node_record.address.is_ipv4() {
1863 let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key);
1864 } else {
1865 let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key);
1866 }
1867 }
1868
1869 Discv4Command::Terminated => {
1870 self.queued_events.push_back(Discv4Event::Terminated);
1872 }
1873 }
1874 }
1875
1876 let mut udp_message_budget = UDP_MESSAGE_POLL_LOOP_BUDGET;
1878
1879 while let Poll::Ready(Some(event)) = self.ingress.poll_recv(cx) {
1881 match event {
1882 IngressEvent::RecvError(err) => {
1883 debug!(target: "discv4", %err, "failed to read datagram");
1884 }
1885 IngressEvent::BadPacket(from, err, data) => {
1886 trace!(target: "discv4", ?from, %err, packet=?hex::encode(&data), "bad packet");
1887 }
1888 IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => {
1889 trace!(target: "discv4", r#type=?msg.msg_type(), from=?remote_addr,"received packet");
1890 let event = match msg {
1891 Message::Ping(ping) => {
1892 self.on_ping(ping, remote_addr, node_id, hash);
1893 Discv4Event::Ping
1894 }
1895 Message::Pong(pong) => {
1896 self.on_pong(pong, remote_addr, node_id);
1897 Discv4Event::Pong
1898 }
1899 Message::FindNode(msg) => {
1900 self.on_find_node(msg, remote_addr, node_id);
1901 Discv4Event::FindNode
1902 }
1903 Message::Neighbours(msg) => {
1904 self.on_neighbours(msg, remote_addr, node_id);
1905 Discv4Event::Neighbours
1906 }
1907 Message::EnrRequest(msg) => {
1908 self.on_enr_request(msg, remote_addr, node_id, hash);
1909 Discv4Event::EnrRequest
1910 }
1911 Message::EnrResponse(msg) => {
1912 self.on_enr_response(msg, remote_addr, node_id);
1913 Discv4Event::EnrResponse
1914 }
1915 };
1916
1917 self.queued_events.push_back(event);
1918 }
1919 }
1920
1921 udp_message_budget -= 1;
1922 if udp_message_budget < 0 {
1923 trace!(target: "discv4", budget=UDP_MESSAGE_POLL_LOOP_BUDGET, "exhausted message poll budget");
1924 if self.queued_events.is_empty() {
1925 cx.waker().wake_by_ref();
1928 }
1929 break
1930 }
1931 }
1932
1933 self.ping_buffered();
1935
1936 while self.evict_expired_requests_interval.poll_tick(cx).is_ready() {
1938 self.evict_expired_requests(Instant::now());
1939 }
1940
1941 while self.expire_interval.poll_tick(cx).is_ready() {
1943 self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION);
1944 }
1945
1946 if self.queued_events.is_empty() {
1947 return Poll::Pending
1948 }
1949 }
1950 }
1951}
1952
1953impl Stream for Discv4Service {
1955 type Item = Discv4Event;
1956
1957 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1958 match ready!(self.get_mut().poll(cx)) {
1960 Discv4Event::Terminated => Poll::Ready(None),
1962 ev => Poll::Ready(Some(ev)),
1964 }
1965 }
1966}
1967
1968impl fmt::Debug for Discv4Service {
1969 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1970 f.debug_struct("Discv4Service")
1971 .field("local_address", &self.local_address)
1972 .field("local_peer_id", &self.local_peer_id())
1973 .field("local_node_record", &self.local_node_record)
1974 .field("queued_pings", &self.queued_pings)
1975 .field("pending_lookup", &self.pending_lookup)
1976 .field("pending_find_nodes", &self.pending_find_nodes)
1977 .field("lookup_interval", &self.lookup_interval)
1978 .finish_non_exhaustive()
1979 }
1980}
1981
1982#[derive(Debug, Eq, PartialEq)]
1986pub enum Discv4Event {
1987 Ping,
1989 Pong,
1991 FindNode,
1993 Neighbours,
1995 EnrRequest,
1997 EnrResponse,
1999 Terminated,
2001}
2002
2003pub(crate) async fn send_loop(udp: Arc<UdpSocket>, rx: EgressReceiver) {
2005 let mut stream = ReceiverStream::new(rx);
2006 while let Some((payload, to)) = stream.next().await {
2007 match udp.send_to(&payload, to).await {
2008 Ok(size) => {
2009 trace!(target: "discv4", ?to, ?size,"sent payload");
2010 }
2011 Err(err) => {
2012 debug!(target: "discv4", ?to, %err,"Failed to send datagram.");
2013 }
2014 }
2015 }
2016}
2017
2018const MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP: usize = 60usize;
2020
2021pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_id: PeerId) {
2026 let mut handler = IngressHandler::new(tx, local_id);
2027 let mut buf = [0; MAX_PACKET_SIZE];
2028 loop {
2029 let res = udp.recv_from(&mut buf).await;
2030 match res {
2031 Err(err) => {
2032 debug!(target: "discv4", %err, "Failed to read datagram.");
2033 handler.send(IngressEvent::RecvError(err)).await;
2034 }
2035 Ok((read, remote_addr)) => {
2036 handler.handle_packet(&buf[..read], remote_addr).await;
2037 }
2038 }
2039 }
2040}
2041
2042#[derive(Debug)]
2047pub struct IngressHandler {
2048 tx: IngressSender,
2049 local_id: PeerId,
2050 tick: usize,
2051 tick_interval: Duration,
2052 cache: ReceiveCache,
2053 last_tick: Instant,
2054}
2055
2056impl IngressHandler {
2057 fn new(tx: IngressSender, local_id: PeerId) -> Self {
2058 let tick = MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP / 2;
2059 Self {
2060 tx,
2061 local_id,
2062 tick,
2063 tick_interval: Duration::from_secs(tick as u64),
2064 cache: ReceiveCache::default(),
2065 last_tick: Instant::now(),
2066 }
2067 }
2068
2069 async fn send(&self, event: IngressEvent) {
2070 let _ = self.tx.send(event).await.map_err(|err| {
2071 debug!(target: "discv4", %err, "failed send incoming packet");
2072 });
2073 }
2074
2075 pub async fn handle_packet(&mut self, data: &[u8], src: SocketAddr) {
2078 if self.last_tick.elapsed() >= self.tick_interval {
2079 self.cache.tick_ips(self.tick);
2080 self.last_tick = Instant::now();
2081 }
2082
2083 if self.cache.inc_ip(src.ip()) > MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP {
2085 trace!(target: "discv4", ?src, "Too many incoming packets from IP.");
2086 return
2087 }
2088
2089 let event = match Message::decode(data) {
2090 Ok(packet) => {
2091 if packet.node_id == self.local_id {
2092 debug!(target: "discv4", ?src, "Received own packet.");
2093 return
2094 }
2095
2096 if self.cache.contains_packet(packet.hash) {
2097 debug!(target: "discv4", ?src, "Received duplicate packet.");
2098 return
2099 }
2100
2101 IngressEvent::Packet(src, packet)
2102 }
2103 Err(err) => {
2104 trace!(target: "discv4", %err, "Failed to decode packet");
2105 IngressEvent::BadPacket(src, err, data.to_vec())
2106 }
2107 };
2108
2109 self.send(event).await;
2110 }
2111}
2112
2113#[derive(Debug)]
2117struct ReceiveCache {
2118 ip_messages: HashMap<IpAddr, usize>,
2124 unique_packets: schnellru::LruMap<B256, ()>,
2126}
2127
2128impl ReceiveCache {
2129 fn tick_ips(&mut self, tick: usize) {
2133 self.ip_messages.retain(|_, count| {
2134 if let Some(reset) = count.checked_sub(tick) {
2135 *count = reset;
2136 true
2137 } else {
2138 false
2139 }
2140 });
2141 }
2142
2143 fn inc_ip(&mut self, ip: IpAddr) -> usize {
2145 let ctn = self.ip_messages.entry(ip).or_default();
2146 *ctn = ctn.saturating_add(1);
2147 *ctn
2148 }
2149
2150 fn contains_packet(&mut self, hash: B256) -> bool {
2152 !self.unique_packets.insert(hash, ())
2153 }
2154}
2155
2156impl Default for ReceiveCache {
2157 fn default() -> Self {
2158 Self {
2159 ip_messages: Default::default(),
2160 unique_packets: schnellru::LruMap::new(schnellru::ByLength::new(32)),
2161 }
2162 }
2163}
2164
2165enum Discv4Command {
2167 Add(NodeRecord),
2168 AddBootNode(NodeRecord),
2169 SetTcpPort(u16),
2170 SetEIP868RLPPair { key: Vec<u8>, rlp: Bytes },
2171 Ban(PeerId, IpAddr),
2172 BanPeer(PeerId),
2173 BanIp(IpAddr),
2174 Remove(PeerId),
2175 Lookup { node_id: Option<PeerId>, tx: Option<NodeRecordSender> },
2176 SetLookupInterval(Duration),
2177 Updates(OneshotSender<ReceiverStream<DiscoveryUpdate>>),
2178 Terminated,
2179}
2180
2181#[derive(Debug)]
2183pub(crate) enum IngressEvent {
2184 RecvError(io::Error),
2186 BadPacket(SocketAddr, DecodePacketError, Vec<u8>),
2188 Packet(SocketAddr, Packet),
2190}
2191
2192#[derive(Debug)]
2194struct PingRequest {
2195 sent_at: Instant,
2197 node: NodeRecord,
2199 echo_hash: B256,
2201 reason: PingReason,
2203}
2204
2205#[derive(Debug)]
2209struct LookupTargetRotator {
2210 interval: usize,
2211 counter: usize,
2212}
2213
2214impl LookupTargetRotator {
2217 const fn local_only() -> Self {
2219 Self { interval: 1, counter: 0 }
2220 }
2221}
2222
2223impl Default for LookupTargetRotator {
2224 fn default() -> Self {
2225 Self {
2226 interval: 4,
2228 counter: 3,
2229 }
2230 }
2231}
2232
2233impl LookupTargetRotator {
2234 fn next(&mut self, local: &PeerId) -> PeerId {
2236 self.counter += 1;
2237 self.counter %= self.interval;
2238 if self.counter == 0 {
2239 return *local
2240 }
2241 PeerId::random()
2242 }
2243}
2244
2245#[derive(Clone, Debug)]
2250struct LookupContext {
2251 inner: Rc<LookupContextInner>,
2252}
2253
2254impl LookupContext {
2255 fn new(
2257 target: discv5::Key<NodeKey>,
2258 nearest_nodes: impl IntoIterator<Item = (Distance, NodeRecord)>,
2259 listener: Option<NodeRecordSender>,
2260 ) -> Self {
2261 let closest_nodes = nearest_nodes
2262 .into_iter()
2263 .map(|(distance, record)| {
2264 (distance, QueryNode { record, queried: false, responded: false })
2265 })
2266 .collect();
2267
2268 let inner = Rc::new(LookupContextInner {
2269 target,
2270 closest_nodes: RefCell::new(closest_nodes),
2271 listener,
2272 });
2273 Self { inner }
2274 }
2275
2276 fn target(&self) -> PeerId {
2278 self.inner.target.preimage().0
2279 }
2280
2281 fn closest(&self, num: usize) -> Vec<NodeRecord> {
2282 self.inner
2283 .closest_nodes
2284 .borrow()
2285 .iter()
2286 .filter(|(_, node)| !node.queried)
2287 .map(|(_, n)| n.record)
2288 .take(num)
2289 .collect()
2290 }
2291
2292 fn filter_closest<P>(&self, num: usize, filter: P) -> Vec<NodeRecord>
2294 where
2295 P: FnMut(&NodeRecord) -> bool,
2296 {
2297 self.inner
2298 .closest_nodes
2299 .borrow()
2300 .iter()
2301 .filter(|(_, node)| !node.queried)
2302 .map(|(_, n)| n.record)
2303 .filter(filter)
2304 .take(num)
2305 .collect()
2306 }
2307
2308 fn add_node(&self, record: NodeRecord) {
2310 let distance = self.inner.target.distance(&kad_key(record.id));
2311 let mut closest = self.inner.closest_nodes.borrow_mut();
2312 if let btree_map::Entry::Vacant(entry) = closest.entry(distance) {
2313 entry.insert(QueryNode { record, queried: false, responded: false });
2314 }
2315 }
2316
2317 fn set_queried(&self, id: PeerId, val: bool) {
2318 if let Some((_, node)) =
2319 self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2320 {
2321 node.queried = val;
2322 }
2323 }
2324
2325 fn mark_queried(&self, id: PeerId) {
2327 self.set_queried(id, true)
2328 }
2329
2330 fn unmark_queried(&self, id: PeerId) {
2332 self.set_queried(id, false)
2333 }
2334
2335 fn mark_responded(&self, id: PeerId) {
2337 if let Some((_, node)) =
2338 self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2339 {
2340 node.responded = true;
2341 }
2342 }
2343}
2344
2345unsafe impl Send for LookupContext {}
2352#[derive(Debug)]
2353struct LookupContextInner {
2354 target: discv5::Key<NodeKey>,
2356 closest_nodes: RefCell<BTreeMap<Distance, QueryNode>>,
2358 listener: Option<NodeRecordSender>,
2363}
2364
2365impl Drop for LookupContextInner {
2366 fn drop(&mut self) {
2367 if let Some(tx) = self.listener.take() {
2368 let nodes = self
2371 .closest_nodes
2372 .take()
2373 .into_values()
2374 .filter(|node| node.responded)
2375 .map(|node| node.record)
2376 .collect();
2377 let _ = tx.send(nodes);
2378 }
2379 }
2380}
2381
2382#[derive(Debug, Clone, Copy)]
2384struct QueryNode {
2385 record: NodeRecord,
2386 queried: bool,
2387 responded: bool,
2388}
2389
2390#[derive(Debug)]
2391struct FindNodeRequest {
2392 sent_at: Instant,
2394 response_count: usize,
2396 answered: bool,
2398 lookup_context: LookupContext,
2400}
2401
2402impl FindNodeRequest {
2405 fn new(resp: LookupContext) -> Self {
2406 Self { sent_at: Instant::now(), response_count: 0, answered: false, lookup_context: resp }
2407 }
2408}
2409
2410#[derive(Debug)]
2412struct CachedFindNode {
2413 target: PeerId,
2414 payload: Bytes,
2415 hash: B256,
2416 cached_at: Instant,
2417}
2418
2419impl CachedFindNode {
2420 fn get_or_sign(
2423 cache: &mut Option<Self>,
2424 target: PeerId,
2425 ttl: Duration,
2426 secret_key: &secp256k1::SecretKey,
2427 expire: u64,
2428 ) -> (Bytes, B256) {
2429 if let Some(c) = cache.as_ref() &&
2430 c.target == target &&
2431 c.cached_at.elapsed() < ttl
2432 {
2433 return (c.payload.clone(), c.hash);
2434 }
2435
2436 let msg = Message::FindNode(FindNode { id: target, expire });
2437 let (payload, hash) = msg.encode(secret_key);
2438
2439 *cache = Some(Self { target, payload: payload.clone(), hash, cached_at: Instant::now() });
2440
2441 (payload, hash)
2442 }
2443}
2444
2445#[derive(Debug)]
2446struct EnrRequestState {
2447 sent_at: Instant,
2449 echo_hash: B256,
2451}
2452
2453#[derive(Debug, Clone, Eq, PartialEq)]
2455struct NodeEntry {
2456 record: NodeRecord,
2458 last_seen: Instant,
2460 last_enr_seq: Option<u64>,
2462 fork_id: Option<ForkId>,
2464 find_node_failures: u8,
2466 has_endpoint_proof: bool,
2468}
2469
2470impl NodeEntry {
2473 fn new(record: NodeRecord) -> Self {
2475 Self {
2476 record,
2477 last_seen: Instant::now(),
2478 last_enr_seq: None,
2479 fork_id: None,
2480 find_node_failures: 0,
2481 has_endpoint_proof: false,
2482 }
2483 }
2484
2485 #[cfg(test)]
2486 fn new_proven(record: NodeRecord) -> Self {
2487 let mut node = Self::new(record);
2488 node.has_endpoint_proof = true;
2489 node
2490 }
2491
2492 const fn establish_proof(&mut self) {
2494 self.has_endpoint_proof = true;
2495 self.find_node_failures = 0;
2496 }
2497
2498 const fn exceeds_find_node_failures(&self, max_failures: u8) -> bool {
2500 self.find_node_failures >= max_failures
2501 }
2502
2503 fn update_with_enr(&mut self, last_enr_seq: Option<u64>) -> Option<u64> {
2505 self.update_now(|s| std::mem::replace(&mut s.last_enr_seq, last_enr_seq))
2506 }
2507
2508 const fn inc_failed_request(&mut self) {
2510 self.find_node_failures += 1;
2511 }
2512
2513 fn update_with_fork_id(&mut self, fork_id: Option<ForkId>) -> Option<ForkId> {
2515 self.update_now(|s| std::mem::replace(&mut s.fork_id, fork_id))
2516 }
2517
2518 fn update_now<F, R>(&mut self, f: F) -> R
2520 where
2521 F: FnOnce(&mut Self) -> R,
2522 {
2523 self.last_seen = Instant::now();
2524 f(self)
2525 }
2526}
2527
2528impl NodeEntry {
2531 fn is_expired(&self) -> bool {
2533 self.last_seen.elapsed() > (ENDPOINT_PROOF_EXPIRATION / 2)
2534 }
2535}
2536
2537#[derive(Debug)]
2539enum PingReason {
2540 InitialInsert,
2542 EstablishBond,
2544 RePing,
2546 Lookup(NodeRecord, LookupContext),
2548}
2549
2550#[derive(Debug, Clone)]
2552pub enum DiscoveryUpdate {
2553 Added(NodeRecord),
2555 DiscoveredAtCapacity(NodeRecord),
2557 EnrForkId(NodeRecord, ForkId),
2559 Removed(PeerId),
2561 Batch(Vec<Self>),
2563}
2564
2565#[cfg(test)]
2566mod tests {
2567 use super::*;
2568 use crate::test_utils::{create_discv4, create_discv4_with_config, rng_endpoint, rng_record};
2569 use alloy_primitives::hex;
2570 use alloy_rlp::{Decodable, Encodable};
2571 use rand_08::Rng;
2572 use reth_ethereum_forks::{EnrForkIdEntry, ForkHash};
2573 use reth_network_peers::mainnet_nodes;
2574 use std::future::poll_fn;
2575
2576 #[tokio::test]
2577 async fn test_configured_enr_forkid_entry() {
2578 let fork: ForkId = ForkId { hash: ForkHash([220, 233, 108, 45]), next: 0u64 };
2579 let mut disc_conf = Discv4Config::default();
2580 disc_conf.add_eip868_pair("eth", EnrForkIdEntry::from(fork));
2581 let (_discv4, service) = create_discv4_with_config(disc_conf).await;
2582 let eth = service.local_eip_868_enr.get_raw_rlp(b"eth").unwrap();
2583 let fork_entry_id = EnrForkIdEntry::decode(&mut ð[..]).unwrap();
2584
2585 let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2586 let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2587 let expected = EnrForkIdEntry {
2588 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2589 };
2590 assert_eq!(expected, fork_entry_id);
2591 assert_eq!(expected, decoded);
2592 }
2593
2594 #[test]
2595 fn test_enr_forkid_entry_decode() {
2596 let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2597 let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2598 let expected = EnrForkIdEntry {
2599 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2600 };
2601 assert_eq!(expected, decoded);
2602 }
2603
2604 #[test]
2605 fn test_enr_forkid_entry_encode() {
2606 let original = EnrForkIdEntry {
2607 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2608 };
2609 let expected: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2610 let mut encoded = Vec::with_capacity(expected.len());
2611 original.encode(&mut encoded);
2612 assert_eq!(&expected[..], encoded.as_slice());
2613 }
2614
2615 #[test]
2616 fn test_local_rotator() {
2617 let id = PeerId::random();
2618 let mut rotator = LookupTargetRotator::local_only();
2619 assert_eq!(rotator.next(&id), id);
2620 assert_eq!(rotator.next(&id), id);
2621 }
2622
2623 #[test]
2624 fn test_rotator() {
2625 let id = PeerId::random();
2626 let mut rotator = LookupTargetRotator::default();
2627 assert_eq!(rotator.next(&id), id);
2628 assert_ne!(rotator.next(&id), id);
2629 assert_ne!(rotator.next(&id), id);
2630 assert_ne!(rotator.next(&id), id);
2631 assert_eq!(rotator.next(&id), id);
2632 }
2633
2634 #[tokio::test]
2635 async fn test_pending_ping() {
2636 let (_, mut service) = create_discv4().await;
2637
2638 let local_addr = service.local_addr();
2639
2640 let mut num_inserted = 0;
2641 loop {
2642 let node = NodeRecord::new(local_addr, PeerId::random());
2643 if service.add_node(node) {
2644 num_inserted += 1;
2645 assert!(service.pending_pings.contains_key(&node.id));
2646 assert_eq!(service.pending_pings.len(), num_inserted);
2647 if num_inserted == MAX_NODES_PING {
2648 break
2649 }
2650 }
2651 }
2652
2653 num_inserted = 0;
2655 for _ in 0..MAX_NODES_PING {
2656 let node = NodeRecord::new(local_addr, PeerId::random());
2657 if service.add_node(node) {
2658 num_inserted += 1;
2659 assert!(!service.pending_pings.contains_key(&node.id));
2660 assert_eq!(service.pending_pings.len(), MAX_NODES_PING);
2661 assert_eq!(service.queued_pings.len(), num_inserted);
2662 }
2663 }
2664 }
2665
2666 #[tokio::test(flavor = "multi_thread")]
2668 #[ignore]
2669 async fn test_mainnet_lookup() {
2670 reth_tracing::init_test_tracing();
2671 let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2672
2673 let all_nodes = mainnet_nodes();
2674 let config = Discv4Config::builder()
2675 .add_boot_nodes(all_nodes)
2676 .lookup_interval(Duration::from_secs(1))
2677 .add_eip868_pair("eth", fork_id)
2678 .build();
2679 let (_discv4, mut service) = create_discv4_with_config(config).await;
2680
2681 let mut updates = service.update_stream();
2682
2683 let _handle = service.spawn();
2684
2685 let mut table = HashMap::new();
2686 while let Some(update) = updates.next().await {
2687 match update {
2688 DiscoveryUpdate::EnrForkId(record, fork_id) => {
2689 println!("{record:?}, {fork_id:?}");
2690 }
2691 DiscoveryUpdate::Added(record) => {
2692 table.insert(record.id, record);
2693 }
2694 DiscoveryUpdate::Removed(id) => {
2695 table.remove(&id);
2696 }
2697 _ => {}
2698 }
2699 println!("total peers {}", table.len());
2700 }
2701 }
2702
2703 #[tokio::test]
2704 async fn test_mapped_ipv4() {
2705 reth_tracing::init_test_tracing();
2706 let mut rng = rand_08::thread_rng();
2707 let config = Discv4Config::builder().build();
2708 let (_discv4, mut service) = create_discv4_with_config(config).await;
2709
2710 let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2711 let v6 = v4.to_ipv6_mapped();
2712 let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2713
2714 let ping = Ping {
2715 from: rng_endpoint(&mut rng),
2716 to: rng_endpoint(&mut rng),
2717 expire: service.ping_expiration(),
2718 enr_sq: Some(rng.r#gen()),
2719 };
2720
2721 let id = PeerId::random();
2722 service.on_ping(ping, addr, id, B256::random());
2723
2724 let key = kad_key(id);
2725 match service.kbuckets.entry(&key) {
2726 kbucket::Entry::Present(entry, _) => {
2727 let node_addr = entry.value().record.address;
2728 assert!(node_addr.is_ipv4());
2729 assert_eq!(node_addr, IpAddr::from(v4));
2730 }
2731 _ => unreachable!(),
2732 };
2733 }
2734
2735 #[tokio::test]
2736 async fn test_respect_ping_expiration() {
2737 reth_tracing::init_test_tracing();
2738 let mut rng = rand_08::thread_rng();
2739 let config = Discv4Config::builder().build();
2740 let (_discv4, mut service) = create_discv4_with_config(config).await;
2741
2742 let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2743 let v6 = v4.to_ipv6_mapped();
2744 let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2745
2746 let ping = Ping {
2747 from: rng_endpoint(&mut rng),
2748 to: rng_endpoint(&mut rng),
2749 expire: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() - 1,
2750 enr_sq: Some(rng.r#gen()),
2751 };
2752
2753 let id = PeerId::random();
2754 service.on_ping(ping, addr, id, B256::random());
2755
2756 let key = kad_key(id);
2757 match service.kbuckets.entry(&key) {
2758 kbucket::Entry::Absent(_) => {}
2759 _ => unreachable!(),
2760 };
2761 }
2762
2763 #[tokio::test]
2764 async fn test_single_lookups() {
2765 reth_tracing::init_test_tracing();
2766
2767 let config = Discv4Config::builder().build();
2768 let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2769
2770 let id = PeerId::random();
2771 let key = kad_key(id);
2772 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2773
2774 let _ = service.kbuckets.insert_or_update(
2775 &key,
2776 NodeEntry::new_proven(record),
2777 NodeStatus {
2778 direction: ConnectionDirection::Incoming,
2779 state: ConnectionState::Connected,
2780 },
2781 );
2782
2783 service.lookup_self();
2784 assert_eq!(service.pending_find_nodes.len(), 1);
2785
2786 poll_fn(|cx| {
2787 let _ = service.poll(cx);
2788 assert_eq!(service.pending_find_nodes.len(), 1);
2789
2790 Poll::Ready(())
2791 })
2792 .await;
2793 }
2794
2795 #[tokio::test]
2796 async fn test_on_neighbours_recursive_lookup() {
2797 reth_tracing::init_test_tracing();
2798
2799 let config = Discv4Config::builder().build();
2800 let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2801 let (_discv4, mut service2) = create_discv4_with_config(config).await;
2802
2803 let id = PeerId::random();
2804 let key = kad_key(id);
2805 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2806
2807 let _ = service.kbuckets.insert_or_update(
2808 &key,
2809 NodeEntry::new_proven(record),
2810 NodeStatus {
2811 direction: ConnectionDirection::Incoming,
2812 state: ConnectionState::Connected,
2813 },
2814 );
2815 service.lookup_self();
2818 assert_eq!(service.pending_find_nodes.len(), 1);
2819
2820 poll_fn(|cx| {
2821 let _ = service.poll(cx);
2822 assert_eq!(service.pending_find_nodes.len(), 1);
2823
2824 Poll::Ready(())
2825 })
2826 .await;
2827
2828 let expiry = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() +
2829 10000000000000;
2830 let msg = Neighbours { nodes: vec![service2.local_node_record], expire: expiry };
2831 service.on_neighbours(msg, record.tcp_addr(), id);
2832 let event = poll_fn(|cx| service2.poll(cx)).await;
2834 assert_eq!(event, Discv4Event::Ping);
2835 assert_eq!(service.pending_find_nodes.len(), 1);
2838 let event = poll_fn(|cx| service.poll(cx)).await;
2840 assert_eq!(event, Discv4Event::Pong);
2841 let event = poll_fn(|cx| service.poll(cx)).await;
2846 assert_eq!(event, Discv4Event::Ping);
2847 assert_eq!(service.pending_find_nodes.len(), 2);
2850 }
2851
2852 #[tokio::test]
2853 async fn test_no_local_in_closest() {
2854 reth_tracing::init_test_tracing();
2855
2856 let config = Discv4Config::builder().build();
2857 let (_discv4, mut service) = create_discv4_with_config(config).await;
2858
2859 let target_key = kad_key(PeerId::random());
2860
2861 let id = PeerId::random();
2862 let key = kad_key(id);
2863 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2864
2865 let _ = service.kbuckets.insert_or_update(
2866 &key,
2867 NodeEntry::new(record),
2868 NodeStatus {
2869 direction: ConnectionDirection::Incoming,
2870 state: ConnectionState::Connected,
2871 },
2872 );
2873
2874 let closest = service
2875 .kbuckets
2876 .closest_values(&target_key)
2877 .map(|n| n.value.record)
2878 .take(MAX_NODES_PER_BUCKET)
2879 .collect::<Vec<_>>();
2880
2881 assert_eq!(closest.len(), 1);
2882 assert!(!closest.iter().any(|r| r.id == *service.local_peer_id()));
2883 }
2884
2885 #[tokio::test]
2886 async fn test_random_lookup() {
2887 reth_tracing::init_test_tracing();
2888
2889 let config = Discv4Config::builder().build();
2890 let (_discv4, mut service) = create_discv4_with_config(config).await;
2891
2892 let target = PeerId::random();
2893
2894 let id = PeerId::random();
2895 let key = kad_key(id);
2896 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2897
2898 let _ = service.kbuckets.insert_or_update(
2899 &key,
2900 NodeEntry::new_proven(record),
2901 NodeStatus {
2902 direction: ConnectionDirection::Incoming,
2903 state: ConnectionState::Connected,
2904 },
2905 );
2906
2907 service.lookup(target);
2908 assert_eq!(service.pending_find_nodes.len(), 1);
2909
2910 let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2911
2912 assert_eq!(ctx.target(), target);
2913 assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2914
2915 ctx.add_node(record);
2916 assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2917 }
2918
2919 #[tokio::test]
2920 async fn test_reping_on_find_node_failures() {
2921 reth_tracing::init_test_tracing();
2922
2923 let config = Discv4Config::builder().build();
2924 let (_discv4, mut service) = create_discv4_with_config(config).await;
2925
2926 let target = PeerId::random();
2927
2928 let id = PeerId::random();
2929 let key = kad_key(id);
2930 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2931
2932 let mut entry = NodeEntry::new_proven(record);
2933 entry.find_node_failures = u8::MAX;
2934 let _ = service.kbuckets.insert_or_update(
2935 &key,
2936 entry,
2937 NodeStatus {
2938 direction: ConnectionDirection::Incoming,
2939 state: ConnectionState::Connected,
2940 },
2941 );
2942
2943 service.lookup(target);
2944 assert_eq!(service.pending_find_nodes.len(), 0);
2945 assert_eq!(service.pending_pings.len(), 1);
2946
2947 service.update_on_pong(record, None);
2948
2949 service
2950 .on_entry(record.id, |entry| {
2951 assert_eq!(entry.find_node_failures, 0);
2953 assert!(entry.has_endpoint_proof);
2954 })
2955 .unwrap();
2956 }
2957
2958 #[tokio::test]
2959 async fn test_service_commands() {
2960 reth_tracing::init_test_tracing();
2961
2962 let config = Discv4Config::builder().build();
2963 let (discv4, mut service) = create_discv4_with_config(config).await;
2964
2965 service.lookup_self();
2966
2967 let _handle = service.spawn();
2968 discv4.send_lookup_self();
2969 let _ = discv4.lookup_self().await;
2970 }
2971
2972 #[tokio::test]
2973 async fn test_requests_timeout() {
2974 reth_tracing::init_test_tracing();
2975 let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2976
2977 let config = Discv4Config::builder()
2978 .request_timeout(Duration::from_millis(200))
2979 .ping_expiration(Duration::from_millis(200))
2980 .lookup_neighbours_expiration(Duration::from_millis(200))
2981 .add_eip868_pair("eth", fork_id)
2982 .build();
2983 let (_disv4, mut service) = create_discv4_with_config(config).await;
2984
2985 let id = PeerId::random();
2986 let key = kad_key(id);
2987 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2988
2989 let _ = service.kbuckets.insert_or_update(
2990 &key,
2991 NodeEntry::new_proven(record),
2992 NodeStatus {
2993 direction: ConnectionDirection::Incoming,
2994 state: ConnectionState::Connected,
2995 },
2996 );
2997
2998 service.lookup_self();
2999 assert_eq!(service.pending_find_nodes.len(), 1);
3000
3001 let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
3002
3003 service.pending_lookup.insert(record.id, (Instant::now(), ctx));
3004
3005 assert_eq!(service.pending_lookup.len(), 1);
3006
3007 let ping = Ping {
3008 from: service.local_node_record.into(),
3009 to: record.into(),
3010 expire: service.ping_expiration(),
3011 enr_sq: service.enr_seq(),
3012 };
3013 let echo_hash = service.send_packet(Message::Ping(ping), record.udp_addr());
3014 let ping_request = PingRequest {
3015 sent_at: Instant::now(),
3016 node: record,
3017 echo_hash,
3018 reason: PingReason::InitialInsert,
3019 };
3020 service.pending_pings.insert(record.id, ping_request);
3021
3022 assert_eq!(service.pending_pings.len(), 1);
3023
3024 tokio::time::sleep(Duration::from_secs(1)).await;
3025
3026 poll_fn(|cx| {
3027 let _ = service.poll(cx);
3028
3029 assert_eq!(service.pending_find_nodes.len(), 0);
3030 assert_eq!(service.pending_lookup.len(), 0);
3031 assert_eq!(service.pending_pings.len(), 0);
3032
3033 Poll::Ready(())
3034 })
3035 .await;
3036 }
3037
3038 #[tokio::test(flavor = "multi_thread")]
3040 async fn test_check_wrong_to() {
3041 reth_tracing::init_test_tracing();
3042
3043 let config = Discv4Config::builder().external_ip_resolver(None).build();
3044 let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
3045 let (_discv4, mut service_2) = create_discv4_with_config(config).await;
3046
3047 let mut ping = Ping {
3049 from: service_1.local_node_record.into(),
3050 to: service_2.local_node_record.into(),
3051 expire: service_1.ping_expiration(),
3052 enr_sq: service_1.enr_seq(),
3053 };
3054 ping.to.address = "192.0.2.0".parse().unwrap();
3055
3056 let echo_hash = service_1.send_packet(Message::Ping(ping), service_2.local_addr());
3057 let ping_request = PingRequest {
3058 sent_at: Instant::now(),
3059 node: service_2.local_node_record,
3060 echo_hash,
3061 reason: PingReason::InitialInsert,
3062 };
3063 service_1.pending_pings.insert(*service_2.local_peer_id(), ping_request);
3064
3065 let event = poll_fn(|cx| service_2.poll(cx)).await;
3067 assert_eq!(event, Discv4Event::Ping);
3068
3069 let event = poll_fn(|cx| service_1.poll(cx)).await;
3071 assert_eq!(event, Discv4Event::Pong);
3072 let event = poll_fn(|cx| service_1.poll(cx)).await;
3074 assert_eq!(event, Discv4Event::Ping);
3075 }
3076
3077 #[tokio::test(flavor = "multi_thread")]
3078 async fn test_check_ping_pong() {
3079 reth_tracing::init_test_tracing();
3080
3081 let config = Discv4Config::builder().external_ip_resolver(None).build();
3082 let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
3083 let (_discv4, mut service_2) = create_discv4_with_config(config).await;
3084
3085 service_1.add_node(service_2.local_node_record);
3087
3088 let event = poll_fn(|cx| service_2.poll(cx)).await;
3090 assert_eq!(event, Discv4Event::Ping);
3091
3092 let key1 = kad_key(*service_1.local_peer_id());
3094 match service_2.kbuckets.entry(&key1) {
3095 kbucket::Entry::Present(_entry, status) => {
3096 assert!(!status.is_connected());
3097 }
3098 _ => unreachable!(),
3099 }
3100
3101 let event = poll_fn(|cx| service_1.poll(cx)).await;
3103 assert_eq!(event, Discv4Event::Pong);
3104
3105 let key2 = kad_key(*service_2.local_peer_id());
3107 match service_1.kbuckets.entry(&key2) {
3108 kbucket::Entry::Present(_entry, status) => {
3109 assert!(status.is_connected());
3110 }
3111 _ => unreachable!(),
3112 }
3113
3114 let event = poll_fn(|cx| service_1.poll(cx)).await;
3116 assert_eq!(event, Discv4Event::Ping);
3117
3118 tokio::time::timeout(Duration::from_secs(5), async {
3122 loop {
3123 let event = poll_fn(|cx| service_2.poll(cx)).await;
3124 match event {
3125 Discv4Event::Pong => break,
3126 Discv4Event::EnrRequest | Discv4Event::FindNode => {}
3127 ev => unreachable!("{ev:?}"),
3128 }
3129 }
3130 })
3131 .await
3132 .expect("timed out waiting for Pong from service_2");
3133
3134 match service_2.kbuckets.entry(&key1) {
3136 kbucket::Entry::Present(_entry, status) => {
3137 assert!(status.is_connected());
3138 }
3139 ev => unreachable!("{ev:?}"),
3140 }
3141 }
3142
3143 #[test]
3144 fn test_insert() {
3145 let local_node_record = rng_record(&mut rand_08::thread_rng());
3146 let mut kbuckets: KBucketsTable<NodeKey, NodeEntry> = KBucketsTable::new(
3147 NodeKey::from(&local_node_record).into(),
3148 Duration::from_secs(60),
3149 MAX_NODES_PER_BUCKET,
3150 None,
3151 None,
3152 );
3153
3154 let new_record = rng_record(&mut rand_08::thread_rng());
3155 let key = kad_key(new_record.id);
3156 match kbuckets.entry(&key) {
3157 kbucket::Entry::Absent(entry) => {
3158 let node = NodeEntry::new(new_record);
3159 let _ = entry.insert(
3160 node,
3161 NodeStatus {
3162 direction: ConnectionDirection::Outgoing,
3163 state: ConnectionState::Disconnected,
3164 },
3165 );
3166 }
3167 _ => {
3168 unreachable!()
3169 }
3170 };
3171 match kbuckets.entry(&key) {
3172 kbucket::Entry::Present(_, _) => {}
3173 _ => {
3174 unreachable!()
3175 }
3176 }
3177 }
3178
3179 #[tokio::test]
3180 async fn test_bootnode_not_in_update_stream() {
3181 reth_tracing::init_test_tracing();
3182 let (_, service_1) = create_discv4().await;
3183 let peerid_1 = *service_1.local_peer_id();
3184
3185 let config = Discv4Config::builder().add_boot_node(service_1.local_node_record).build();
3186 service_1.spawn();
3187
3188 let (_, mut service_2) = create_discv4_with_config(config).await;
3189
3190 let mut updates = service_2.update_stream();
3191
3192 service_2.spawn();
3193
3194 let mut bootnode_appeared = false;
3196 let timeout = tokio::time::sleep(Duration::from_secs(1));
3197 tokio::pin!(timeout);
3198
3199 loop {
3200 tokio::select! {
3201 Some(update) = updates.next() => {
3202 if let DiscoveryUpdate::Added(record) = update
3203 && record.id == peerid_1 {
3204 bootnode_appeared = true;
3205 break;
3206 }
3207 }
3208 _ = &mut timeout => break,
3209 }
3210 }
3211
3212 assert!(bootnode_appeared, "Bootnode should appear in update stream");
3214 }
3215
3216 fn insert_proven_node(service: &mut Discv4Service, record: NodeRecord) {
3217 let key = kad_key(record.id);
3218 let _ = service.kbuckets.insert_or_update(
3219 &key,
3220 NodeEntry::new_proven(record),
3221 NodeStatus {
3222 direction: ConnectionDirection::Incoming,
3223 state: ConnectionState::Connected,
3224 },
3225 );
3226 }
3227
3228 fn insert_initial_ping(service: &mut Discv4Service, record: NodeRecord) -> B256 {
3229 let echo_hash = B256::random();
3230 service.pending_pings.insert(
3231 record.id,
3232 PingRequest {
3233 sent_at: Instant::now(),
3234 node: record,
3235 echo_hash,
3236 reason: PingReason::InitialInsert,
3237 },
3238 );
3239 echo_hash
3240 }
3241
3242 fn make_pong(service: &Discv4Service, echo_hash: B256) -> Pong {
3243 Pong {
3244 to: rng_endpoint(&mut rand_08::thread_rng()),
3245 echo: echo_hash,
3246 expire: service.ping_expiration(),
3247 enr_sq: None,
3248 }
3249 }
3250
3251 #[tokio::test]
3252 async fn test_lookup_reset_on_first_bootnode_pong() {
3253 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
3254 let config = Discv4Config::builder().add_boot_node(record).build();
3255 let (_discv4, mut service) = create_discv4_with_config(config).await;
3256
3257 assert!(service.pending_lookup_reset);
3259
3260 insert_proven_node(&mut service, record);
3262 let echo_hash = insert_initial_ping(&mut service, record);
3263
3264 service.on_pong(make_pong(&service, echo_hash), record.udp_addr(), record.id);
3266
3267 assert!(!service.pending_lookup_reset, "flag should be consumed");
3269 }
3270
3271 #[tokio::test]
3272 async fn test_lookup_reset_fires_only_once() {
3273 let records: Vec<_> = (0..2)
3274 .map(|_| NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random()))
3275 .collect();
3276 let config = Discv4Config::builder().add_boot_nodes(records.clone()).build();
3277 let (_discv4, mut service) = create_discv4_with_config(config).await;
3278
3279 for &r in &records {
3281 insert_proven_node(&mut service, r);
3282 }
3283 let hashes: Vec<_> =
3284 records.iter().map(|r| insert_initial_ping(&mut service, *r)).collect();
3285
3286 service.on_pong(make_pong(&service, hashes[0]), records[0].udp_addr(), records[0].id);
3288 assert!(!service.pending_lookup_reset);
3289
3290 service.on_pong(make_pong(&service, hashes[1]), records[1].udp_addr(), records[1].id);
3292 assert!(!service.pending_lookup_reset);
3293 }
3294
3295 #[tokio::test]
3296 async fn test_lookup_reset_not_triggered_by_non_bootnode() {
3297 let bootnode = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
3298 let config = Discv4Config::builder().add_boot_node(bootnode).build();
3299 let (_discv4, mut service) = create_discv4_with_config(config).await;
3300
3301 assert!(service.pending_lookup_reset);
3302
3303 let stranger = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
3305 insert_proven_node(&mut service, stranger);
3306 let echo_hash = insert_initial_ping(&mut service, stranger);
3307 service.on_pong(make_pong(&service, echo_hash), stranger.udp_addr(), stranger.id);
3308
3309 assert!(service.pending_lookup_reset, "flag should not be consumed by non-bootnode");
3310 }
3311
3312 #[tokio::test]
3313 async fn test_lookup_reset_disabled_when_lookup_disabled() {
3314 let config = Discv4Config::builder().enable_lookup(false).build();
3315 let (_discv4, service) = create_discv4_with_config(config).await;
3316
3317 assert!(!service.pending_lookup_reset);
3319 }
3320}