1#![doc(
20 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
21 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
22 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
23)]
24#![cfg_attr(not(test), warn(unused_crate_dependencies))]
25#![cfg_attr(docsrs, feature(doc_cfg))]
26
27use crate::{
28 error::{DecodePacketError, Discv4Error},
29 proto::{FindNode, Message, Neighbours, Packet, Ping, Pong},
30};
31use alloy_primitives::{bytes::Bytes, hex, B256};
32use discv5::{
33 kbucket,
34 kbucket::{
35 BucketInsertResult, Distance, Entry as BucketEntry, InsertResult, KBucketsTable,
36 NodeStatus, MAX_NODES_PER_BUCKET,
37 },
38 ConnectionDirection, ConnectionState,
39};
40use enr::Enr;
41use itertools::Itertools;
42use parking_lot::Mutex;
43use proto::{EnrRequest, EnrResponse};
44use reth_ethereum_forks::ForkId;
45use reth_network_peers::{pk2id, PeerId};
46use secp256k1::SecretKey;
47use std::{
48 cell::RefCell,
49 collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque},
50 fmt,
51 future::poll_fn,
52 io,
53 net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
54 pin::Pin,
55 rc::Rc,
56 sync::Arc,
57 task::{ready, Context, Poll},
58 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
59};
60use tokio::{
61 net::UdpSocket,
62 sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::Sender as OneshotSender},
63 task::{JoinHandle, JoinSet},
64 time::Interval,
65};
66use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
67use tracing::{debug, trace};
68
69pub mod error;
70pub mod proto;
71
72mod config;
73pub use config::{Discv4Config, Discv4ConfigBuilder};
74
75mod node;
76use node::{kad_key, NodeKey};
77
78mod table;
79
80pub use reth_network_peers::NodeRecord;
82
83#[cfg(any(test, feature = "test-utils"))]
84pub mod test_utils;
85
86use crate::table::PongTable;
87use reth_net_nat::ResolveNatInterval;
88pub use reth_net_nat::{external_ip, NatResolver};
90
91pub const DEFAULT_DISCOVERY_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
95
96pub const DEFAULT_DISCOVERY_PORT: u16 = 30303;
100
101pub const DEFAULT_DISCOVERY_ADDRESS: SocketAddr =
105 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT));
106
107const MAX_PACKET_SIZE: usize = 1280;
109
110const MIN_PACKET_SIZE: usize = 32 + 65 + 1;
112
113const ALPHA: usize = 3;
115
116const MAX_NODES_PING: usize = 2 * MAX_NODES_PER_BUCKET;
121
122const MAX_QUEUED_PINGS: usize = 2 * MAX_NODES_PER_BUCKET;
130
131const SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS: usize = (MAX_PACKET_SIZE - 109) / 91;
137
138const ENDPOINT_PROOF_EXPIRATION: Duration = Duration::from_secs(24 * 60 * 60);
142
143const EXPIRE_DURATION: Duration = Duration::from_secs(60 * 60);
145
146const UDP_MESSAGE_POLL_LOOP_BUDGET: i32 = 4;
151
152type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>;
153type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>;
154
155pub(crate) type IngressSender = mpsc::Sender<IngressEvent>;
156pub(crate) type IngressReceiver = mpsc::Receiver<IngressEvent>;
157
158type NodeRecordSender = OneshotSender<Vec<NodeRecord>>;
159
160#[derive(Debug, Clone)]
167pub struct Discv4 {
168 local_addr: SocketAddr,
170 to_service: mpsc::UnboundedSender<Discv4Command>,
172 node_record: Arc<Mutex<NodeRecord>>,
176}
177
178impl Discv4 {
179 pub async fn spawn(
183 local_address: SocketAddr,
184 local_enr: NodeRecord,
185 secret_key: SecretKey,
186 config: Discv4Config,
187 ) -> io::Result<Self> {
188 let (discv4, service) = Self::bind(local_address, local_enr, secret_key, config).await?;
189
190 service.spawn();
191
192 Ok(discv4)
193 }
194
195 #[cfg(feature = "test-utils")]
199 pub fn noop() -> Self {
200 let (to_service, _rx) = mpsc::unbounded_channel();
201 let local_addr =
202 (IpAddr::from(std::net::Ipv4Addr::UNSPECIFIED), DEFAULT_DISCOVERY_PORT).into();
203 Self {
204 local_addr,
205 to_service,
206 node_record: Arc::new(Mutex::new(NodeRecord::new(
207 "127.0.0.1:3030".parse().unwrap(),
208 PeerId::random(),
209 ))),
210 }
211 }
212
213 pub async fn bind(
245 local_address: SocketAddr,
246 mut local_node_record: NodeRecord,
247 secret_key: SecretKey,
248 config: Discv4Config,
249 ) -> io::Result<(Self, Discv4Service)> {
250 let socket = UdpSocket::bind(local_address).await?;
251 let local_addr = socket.local_addr()?;
252 local_node_record.udp_port = local_addr.port();
253 trace!(target: "discv4", ?local_addr,"opened UDP socket");
254
255 let mut service =
256 Discv4Service::new(socket, local_addr, local_node_record, secret_key, config);
257
258 service.resolve_external_ip();
260
261 let discv4 = service.handle();
262 Ok((discv4, service))
263 }
264
265 pub const fn local_addr(&self) -> SocketAddr {
267 self.local_addr
268 }
269
270 pub fn node_record(&self) -> NodeRecord {
274 *self.node_record.lock()
275 }
276
277 pub fn external_ip(&self) -> IpAddr {
279 self.node_record.lock().address
280 }
281
282 pub fn set_lookup_interval(&self, duration: Duration) {
284 self.send_to_service(Discv4Command::SetLookupInterval(duration))
285 }
286
287 pub async fn lookup_self(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
302 self.lookup_node(None).await
303 }
304
305 pub async fn lookup(&self, node_id: PeerId) -> Result<Vec<NodeRecord>, Discv4Error> {
309 self.lookup_node(Some(node_id)).await
310 }
311
312 pub async fn lookup_random(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
314 let target = PeerId::random();
315 self.lookup_node(Some(target)).await
316 }
317
318 pub fn send_lookup(&self, node_id: PeerId) {
320 let cmd = Discv4Command::Lookup { node_id: Some(node_id), tx: None };
321 self.send_to_service(cmd);
322 }
323
324 async fn lookup_node(&self, node_id: Option<PeerId>) -> Result<Vec<NodeRecord>, Discv4Error> {
325 let (tx, rx) = oneshot::channel();
326 let cmd = Discv4Command::Lookup { node_id, tx: Some(tx) };
327 self.to_service.send(cmd)?;
328 Ok(rx.await?)
329 }
330
331 pub fn send_lookup_self(&self) {
333 let cmd = Discv4Command::Lookup { node_id: None, tx: None };
334 self.send_to_service(cmd);
335 }
336
337 pub fn remove_peer(&self, node_id: PeerId) {
339 let cmd = Discv4Command::Remove(node_id);
340 self.send_to_service(cmd);
341 }
342
343 pub fn add_node(&self, node_record: NodeRecord) {
345 let cmd = Discv4Command::Add(node_record);
346 self.send_to_service(cmd);
347 }
348
349 pub fn add_boot_node(&self, node_record: NodeRecord) {
354 let cmd = Discv4Command::AddBootNode(node_record);
355 self.send_to_service(cmd);
356 }
357
358 pub fn ban(&self, node_id: PeerId, ip: IpAddr) {
362 let cmd = Discv4Command::Ban(node_id, ip);
363 self.send_to_service(cmd);
364 }
365
366 pub fn ban_ip(&self, ip: IpAddr) {
370 let cmd = Discv4Command::BanIp(ip);
371 self.send_to_service(cmd);
372 }
373
374 pub fn ban_node(&self, node_id: PeerId) {
378 let cmd = Discv4Command::BanPeer(node_id);
379 self.send_to_service(cmd);
380 }
381
382 pub fn set_tcp_port(&self, port: u16) {
386 let cmd = Discv4Command::SetTcpPort(port);
387 self.send_to_service(cmd);
388 }
389
390 pub fn set_eip868_rlp_pair(&self, key: Vec<u8>, rlp: Bytes) {
396 let cmd = Discv4Command::SetEIP868RLPPair { key, rlp };
397 self.send_to_service(cmd);
398 }
399
400 pub fn set_eip868_rlp(&self, key: Vec<u8>, value: impl alloy_rlp::Encodable) {
404 self.set_eip868_rlp_pair(key, Bytes::from(alloy_rlp::encode(&value)))
405 }
406
407 #[inline]
408 fn send_to_service(&self, cmd: Discv4Command) {
409 let _ = self.to_service.send(cmd).map_err(|err| {
410 debug!(
411 target: "discv4",
412 %err,
413 "channel capacity reached, dropping command",
414 )
415 });
416 }
417
418 pub async fn update_stream(&self) -> Result<ReceiverStream<DiscoveryUpdate>, Discv4Error> {
420 let (tx, rx) = oneshot::channel();
421 let cmd = Discv4Command::Updates(tx);
422 self.to_service.send(cmd)?;
423 Ok(rx.await?)
424 }
425
426 pub fn terminate(&self) {
428 self.send_to_service(Discv4Command::Terminated);
429 }
430}
431
432#[must_use = "Stream does nothing unless polled"]
446pub struct Discv4Service {
447 local_address: SocketAddr,
449 local_eip_868_enr: Enr<SecretKey>,
451 local_node_record: NodeRecord,
453 shared_node_record: Arc<Mutex<NodeRecord>>,
455 secret_key: SecretKey,
457 _socket: Arc<UdpSocket>,
459 _tasks: JoinSet<()>,
463 kbuckets: KBucketsTable<NodeKey, NodeEntry>,
465 ingress: IngressReceiver,
469 egress: EgressSender,
473 queued_pings: VecDeque<(NodeRecord, PingReason)>,
480 pending_pings: HashMap<PeerId, PingRequest>,
482 pending_lookup: HashMap<PeerId, (Instant, LookupContext)>,
487 pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
489 pending_enr_requests: HashMap<PeerId, EnrRequestState>,
491 to_service: mpsc::UnboundedSender<Discv4Command>,
493 commands_rx: mpsc::UnboundedReceiver<Discv4Command>,
495 update_listeners: Vec<mpsc::Sender<DiscoveryUpdate>>,
497 lookup_interval: Interval,
499 lookup_rotator: LookupTargetRotator,
501 pending_lookup_reset: bool,
503 evict_expired_requests_interval: Interval,
505 ping_interval: Interval,
507 resolve_external_ip_interval: Option<ResolveNatInterval>,
509 config: Discv4Config,
511 queued_events: VecDeque<Discv4Event>,
513 received_pongs: PongTable,
515 expire_interval: Interval,
517 cached_find_node: Option<CachedFindNode>,
519}
520
521impl Discv4Service {
522 pub(crate) fn new(
524 socket: UdpSocket,
525 local_address: SocketAddr,
526 local_node_record: NodeRecord,
527 secret_key: SecretKey,
528 config: Discv4Config,
529 ) -> Self {
530 let socket = Arc::new(socket);
531 let (ingress_tx, ingress_rx) = mpsc::channel(config.udp_ingress_message_buffer);
532 let (egress_tx, egress_rx) = mpsc::channel(config.udp_egress_message_buffer);
533 let mut tasks = JoinSet::<()>::new();
534
535 let udp = Arc::clone(&socket);
536 tasks.spawn(receive_loop(udp, ingress_tx, local_node_record.id));
537
538 let udp = Arc::clone(&socket);
539 tasks.spawn(send_loop(udp, egress_rx));
540
541 let kbuckets = KBucketsTable::new(
542 NodeKey::from(&local_node_record).into(),
543 Duration::from_secs(60),
544 MAX_NODES_PER_BUCKET,
545 None,
546 None,
547 );
548
549 let self_lookup_interval = tokio::time::interval(config.lookup_interval);
550
551 let ping_interval = tokio::time::interval_at(
554 tokio::time::Instant::now() + config.ping_interval,
555 config.ping_interval,
556 );
557
558 let evict_expired_requests_interval = tokio::time::interval_at(
559 tokio::time::Instant::now() + config.request_timeout,
560 config.request_timeout,
561 );
562
563 let lookup_rotator = if config.enable_dht_random_walk {
564 LookupTargetRotator::default()
565 } else {
566 LookupTargetRotator::local_only()
567 };
568
569 let local_eip_868_enr = {
571 let mut builder = Enr::builder();
572 builder.ip(local_node_record.address);
573 if local_node_record.address.is_ipv4() {
574 builder.udp4(local_node_record.udp_port);
575 builder.tcp4(local_node_record.tcp_port);
576 } else {
577 builder.udp6(local_node_record.udp_port);
578 builder.tcp6(local_node_record.tcp_port);
579 }
580
581 for (key, val) in &config.additional_eip868_rlp_pairs {
582 builder.add_value_rlp(key, val.clone());
583 }
584 builder.build(&secret_key).expect("v4 is set")
585 };
586
587 let (to_service, commands_rx) = mpsc::unbounded_channel();
588
589 let shared_node_record = Arc::new(Mutex::new(local_node_record));
590
591 Self {
592 local_address,
593 local_eip_868_enr,
594 local_node_record,
595 shared_node_record,
596 _socket: socket,
597 kbuckets,
598 secret_key,
599 _tasks: tasks,
600 ingress: ingress_rx,
601 egress: egress_tx,
602 queued_pings: VecDeque::with_capacity(MAX_QUEUED_PINGS),
603 pending_pings: Default::default(),
604 pending_lookup: Default::default(),
605 pending_find_nodes: Default::default(),
606 pending_enr_requests: Default::default(),
607 commands_rx,
608 to_service,
609 update_listeners: Vec::with_capacity(1),
610 lookup_interval: self_lookup_interval,
611 ping_interval,
612 evict_expired_requests_interval,
613 lookup_rotator,
614 pending_lookup_reset: config.enable_lookup,
615 resolve_external_ip_interval: config.resolve_external_ip_interval(),
616 config,
617 queued_events: Default::default(),
618 received_pongs: Default::default(),
619 expire_interval: tokio::time::interval(EXPIRE_DURATION),
620 cached_find_node: None,
621 }
622 }
623
624 pub fn handle(&self) -> Discv4 {
626 Discv4 {
627 local_addr: self.local_address,
628 to_service: self.to_service.clone(),
629 node_record: self.shared_node_record.clone(),
630 }
631 }
632
633 fn enr_seq(&self) -> Option<u64> {
635 self.config.enable_eip868.then(|| self.local_eip_868_enr.seq())
636 }
637
638 pub fn set_lookup_interval(&mut self, duration: Duration) {
640 self.lookup_interval = tokio::time::interval(duration);
641 }
642
643 fn resolve_external_ip(&mut self) {
647 if let Some(r) = &self.resolve_external_ip_interval &&
648 let Some(external_ip) =
649 r.resolver().clone().as_external_ip(self.local_node_record.udp_port)
650 {
651 self.set_external_ip_addr(external_ip);
652 }
653 }
654
655 pub fn set_external_ip_addr(&mut self, external_ip: IpAddr) {
658 if self.local_node_record.address != external_ip {
659 debug!(target: "discv4", ?external_ip, "Updating external ip");
660 self.local_node_record.address = external_ip;
661 let _ = self.local_eip_868_enr.set_ip(external_ip, &self.secret_key);
662 let mut lock = self.shared_node_record.lock();
663 *lock = self.local_node_record;
664 debug!(target: "discv4", enr=?self.local_eip_868_enr, "Updated local ENR");
665 }
666 }
667
668 pub const fn local_peer_id(&self) -> &PeerId {
670 &self.local_node_record.id
671 }
672
673 pub const fn local_addr(&self) -> SocketAddr {
675 self.local_address
676 }
677
678 pub const fn local_enr(&self) -> NodeRecord {
682 self.local_node_record
683 }
684
685 #[cfg(test)]
687 pub const fn local_enr_mut(&mut self) -> &mut NodeRecord {
688 &mut self.local_node_record
689 }
690
691 pub fn contains_node(&self, id: PeerId) -> bool {
693 let key = kad_key(id);
694 self.kbuckets.get_index(&key).is_some()
695 }
696
697 pub fn bootstrap(&mut self) {
709 for record in self.config.bootstrap_nodes.clone() {
710 debug!(target: "discv4", ?record, "pinging boot node");
711 let key = kad_key(record.id);
712 let entry = NodeEntry::new(record);
713
714 match self.kbuckets.insert_or_update(
716 &key,
717 entry,
718 NodeStatus {
719 state: ConnectionState::Disconnected,
720 direction: ConnectionDirection::Outgoing,
721 },
722 ) {
723 InsertResult::Failed(_) => {}
724 _ => {
725 self.try_ping(record, PingReason::InitialInsert);
726 }
727 }
728 }
729 }
730
731 pub fn add_boot_node(&mut self, record: NodeRecord) -> bool {
736 self.config.bootstrap_nodes.insert(record);
737 self.add_node(record)
738 }
739
740 pub fn spawn(mut self) -> JoinHandle<()> {
744 tokio::task::spawn(async move {
745 self.bootstrap();
746
747 while let Some(event) = self.next().await {
748 trace!(target: "discv4", ?event, "processed");
749 }
750 trace!(target: "discv4", "service terminated");
751 })
752 }
753
754 pub fn update_stream(&mut self) -> ReceiverStream<DiscoveryUpdate> {
756 let (tx, rx) = mpsc::channel(512);
757 self.update_listeners.push(tx);
758 ReceiverStream::new(rx)
759 }
760
761 pub fn lookup_self(&mut self) {
763 self.lookup(self.local_node_record.id)
764 }
765
766 pub fn lookup(&mut self, target: PeerId) {
776 self.lookup_with(target, None)
777 }
778
779 fn lookup_with(&mut self, target: PeerId, tx: Option<NodeRecordSender>) {
789 trace!(target: "discv4", ?target, "Starting lookup");
790 let target_key = kad_key(target);
791
792 let ctx = LookupContext::new(
795 target_key.clone(),
796 self.kbuckets
797 .closest_values(&target_key)
798 .filter(|node| {
799 node.value.has_endpoint_proof &&
800 !self.pending_find_nodes.contains_key(&node.key.preimage().0)
801 })
802 .take(MAX_NODES_PER_BUCKET)
803 .map(|n| (target_key.distance(&n.key), n.value.record)),
804 tx,
805 );
806
807 let closest = ctx.closest(ALPHA);
809
810 if closest.is_empty() && self.pending_find_nodes.is_empty() {
811 self.bootstrap();
816 return
817 }
818
819 trace!(target: "discv4", ?target, num = closest.len(), "Start lookup closest nodes");
820
821 for node in closest {
822 self.find_node_checked(&node, ctx.clone());
826 }
827 }
828
829 fn find_node(&mut self, node: &NodeRecord, ctx: LookupContext) {
833 trace!(target: "discv4", ?node, lookup=?ctx.target(), "Sending FindNode");
834 ctx.mark_queried(node.id);
835 let (payload, hash) = self.find_node_packet(ctx.target());
836 let to = node.udp_addr();
837 trace!(target: "discv4", ?to, ?hash, "sending FindNode packet");
838 let _ = self.egress.try_send((payload, to)).map_err(|err| {
839 debug!(target: "discv4", %err, "dropped outgoing packet");
840 });
841 self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx));
842 }
843
844 fn find_node_checked(&mut self, node: &NodeRecord, ctx: LookupContext) {
849 let max_failures = self.config.max_find_node_failures;
850 let needs_ping = self
851 .on_entry(node.id, |entry| entry.exceeds_find_node_failures(max_failures))
852 .unwrap_or(true);
853 if needs_ping {
854 self.try_ping(*node, PingReason::Lookup(*node, ctx))
855 } else {
856 self.find_node(node, ctx)
857 }
858 }
859
860 fn notify(&mut self, update: DiscoveryUpdate) {
864 self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) {
865 Ok(()) => true,
866 Err(err) => match err {
867 TrySendError::Full(_) => true,
868 TrySendError::Closed(_) => false,
869 },
870 });
871 }
872
873 pub fn ban_ip(&mut self, ip: IpAddr) {
875 self.config.ban_list.ban_ip(ip);
876 }
877
878 pub fn ban_node(&mut self, node_id: PeerId) {
880 self.remove_node(node_id);
881 self.config.ban_list.ban_peer(node_id);
882 }
883
884 pub fn ban_ip_until(&mut self, ip: IpAddr, until: Instant) {
886 self.config.ban_list.ban_ip_until(ip, until);
887 }
888
889 pub fn ban_node_until(&mut self, node_id: PeerId, until: Instant) {
891 self.remove_node(node_id);
892 self.config.ban_list.ban_peer_until(node_id, until);
893 }
894
895 pub fn remove_node(&mut self, node_id: PeerId) -> bool {
900 let key = kad_key(node_id);
901 self.remove_key(node_id, key)
902 }
903
904 pub fn soft_remove_node(&mut self, node_id: PeerId) -> bool {
909 let key = kad_key(node_id);
910 let Some(bucket) = self.kbuckets.get_bucket(&key) else { return false };
911 if bucket.num_entries() < MAX_NODES_PER_BUCKET / 2 {
912 return false
914 }
915 self.remove_key(node_id, key)
916 }
917
918 fn remove_key(&mut self, node_id: PeerId, key: discv5::Key<NodeKey>) -> bool {
919 let removed = self.kbuckets.remove(&key);
920 if removed {
921 trace!(target: "discv4", ?node_id, "removed node");
922 self.notify(DiscoveryUpdate::Removed(node_id));
923 }
924 removed
925 }
926
927 pub fn num_connected(&self) -> usize {
929 self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected())
930 }
931
932 fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool {
934 if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) &&
935 timestamp.elapsed() < self.config.bond_expiration
936 {
937 return true
938 }
939 false
940 }
941
942 fn on_entry<F, R>(&mut self, peer_id: PeerId, f: F) -> Option<R>
944 where
945 F: FnOnce(&NodeEntry) -> R,
946 {
947 let key = kad_key(peer_id);
948 match self.kbuckets.entry(&key) {
949 BucketEntry::Present(entry, _) => Some(f(entry.value())),
950 BucketEntry::Pending(mut entry, _) => Some(f(entry.value())),
951 _ => None,
952 }
953 }
954
955 fn update_on_reping(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
962 if record.id == self.local_node_record.id {
963 return
964 }
965
966 if !self.config.enable_eip868 {
968 last_enr_seq = None;
969 }
970
971 let key = kad_key(record.id);
972 let old_enr = match self.kbuckets.entry(&key) {
973 kbucket::Entry::Present(mut entry, _) => {
974 entry.value_mut().update_with_enr(last_enr_seq)
975 }
976 kbucket::Entry::Pending(mut entry, _) => entry.value().update_with_enr(last_enr_seq),
977 _ => return,
978 };
979
980 match (last_enr_seq, old_enr) {
982 (Some(new), Some(old)) if new > old => {
983 self.send_enr_request(record);
984 }
985 (Some(_), None) => {
986 self.send_enr_request(record);
988 }
989 _ => {}
990 };
991 }
992
993 fn update_on_pong(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
995 if record.id == *self.local_peer_id() {
996 return
997 }
998
999 if !self.config.enable_eip868 {
1001 last_enr_seq = None;
1002 }
1003
1004 let has_enr_seq = last_enr_seq.is_some();
1007
1008 let key = kad_key(record.id);
1009 match self.kbuckets.entry(&key) {
1010 kbucket::Entry::Present(mut entry, old_status) => {
1011 entry.value_mut().establish_proof();
1013 entry.value_mut().update_with_enr(last_enr_seq);
1014
1015 if !old_status.is_connected() {
1016 let _ = entry.update(ConnectionState::Connected, Some(old_status.direction));
1017 trace!(target: "discv4", ?record, "added after successful endpoint proof");
1018 self.notify(DiscoveryUpdate::Added(record));
1019
1020 if has_enr_seq {
1021 self.send_enr_request(record);
1023 }
1024 }
1025 }
1026 kbucket::Entry::Pending(mut entry, mut status) => {
1027 entry.value().establish_proof();
1029 entry.value().update_with_enr(last_enr_seq);
1030
1031 if !status.is_connected() {
1032 status.state = ConnectionState::Connected;
1033 let _ = entry.update(status);
1034 trace!(target: "discv4", ?record, "added after successful endpoint proof");
1035 self.notify(DiscoveryUpdate::Added(record));
1036
1037 if has_enr_seq {
1038 self.send_enr_request(record);
1040 }
1041 }
1042 }
1043 _ => {}
1044 };
1045 }
1046
1047 pub fn add_all_nodes(&mut self, records: impl IntoIterator<Item = NodeRecord>) {
1051 for record in records {
1052 self.add_node(record);
1053 }
1054 }
1055
1056 pub fn add_node(&mut self, record: NodeRecord) -> bool {
1062 let key = kad_key(record.id);
1063 match self.kbuckets.entry(&key) {
1064 kbucket::Entry::Absent(entry) => {
1065 let node = NodeEntry::new(record);
1066 match entry.insert(
1067 node,
1068 NodeStatus {
1069 direction: ConnectionDirection::Outgoing,
1070 state: ConnectionState::Disconnected,
1071 },
1072 ) {
1073 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1074 trace!(target: "discv4", ?record, "inserted new record");
1075 }
1076 _ => return false,
1077 }
1078 }
1079 _ => return false,
1080 }
1081
1082 self.try_ping(record, PingReason::InitialInsert);
1084 true
1085 }
1086
1087 pub(crate) fn send_packet(&self, msg: Message, to: SocketAddr) -> B256 {
1089 let (payload, hash) = msg.encode(&self.secret_key);
1090 trace!(target: "discv4", r#type=?msg.msg_type(), ?to, ?hash, "sending packet");
1091 let _ = self.egress.try_send((payload, to)).map_err(|err| {
1092 debug!(
1093 target: "discv4",
1094 %err,
1095 "dropped outgoing packet",
1096 );
1097 });
1098 hash
1099 }
1100
1101 fn find_node_packet(&mut self, target: PeerId) -> (Bytes, B256) {
1103 let expire = self.find_node_expiration();
1104 let cache_ttl = self.config.request_timeout / 4;
1105 CachedFindNode::get_or_sign(
1106 &mut self.cached_find_node,
1107 target,
1108 cache_ttl,
1109 &self.secret_key,
1110 expire,
1111 )
1112 }
1113
1114 fn on_ping(&mut self, ping: Ping, remote_addr: SocketAddr, remote_id: PeerId, hash: B256) {
1116 if self.is_expired(ping.expire) {
1117 return
1119 }
1120
1121 let record = NodeRecord {
1123 address: remote_addr.ip(),
1124 udp_port: remote_addr.port(),
1125 tcp_port: ping.from.tcp_port,
1126 id: remote_id,
1127 }
1128 .into_ipv4_mapped();
1129
1130 let key = kad_key(record.id);
1131
1132 let mut is_new_insert = false;
1139 let mut needs_bond = false;
1140 let mut is_proven = false;
1141
1142 let old_enr = match self.kbuckets.entry(&key) {
1143 kbucket::Entry::Present(mut entry, _) => {
1144 if entry.value().is_expired() {
1145 needs_bond = true;
1148 } else {
1149 is_proven = entry.value().has_endpoint_proof;
1150 }
1151 entry.value_mut().update_with_enr(ping.enr_sq)
1152 }
1153 kbucket::Entry::Pending(mut entry, _) => {
1154 if entry.value().is_expired() {
1155 needs_bond = true;
1158 } else {
1159 is_proven = entry.value().has_endpoint_proof;
1160 }
1161 entry.value().update_with_enr(ping.enr_sq)
1162 }
1163 kbucket::Entry::Absent(entry) => {
1164 let mut node = NodeEntry::new(record);
1165 node.last_enr_seq = ping.enr_sq;
1166
1167 match entry.insert(
1168 node,
1169 NodeStatus {
1170 direction: ConnectionDirection::Incoming,
1171 state: ConnectionState::Disconnected,
1173 },
1174 ) {
1175 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1176 is_new_insert = true;
1178 }
1179 BucketInsertResult::Full => {
1180 trace!(target: "discv4", ?record, "discovered new record but bucket is full");
1184 self.notify(DiscoveryUpdate::DiscoveredAtCapacity(record));
1185 needs_bond = true;
1186 }
1187 BucketInsertResult::TooManyIncoming | BucketInsertResult::NodeExists => {
1188 needs_bond = true;
1189 }
1191 BucketInsertResult::FailedFilter => return,
1192 }
1193
1194 None
1195 }
1196 kbucket::Entry::SelfEntry => return,
1197 };
1198
1199 let pong = Message::Pong(Pong {
1202 to: record.into(),
1204 echo: hash,
1205 expire: ping.expire,
1206 enr_sq: self.enr_seq(),
1207 });
1208 self.send_packet(pong, remote_addr);
1209
1210 if is_new_insert {
1212 self.try_ping(record, PingReason::InitialInsert);
1213 } else if needs_bond {
1214 self.try_ping(record, PingReason::EstablishBond);
1215 } else if is_proven {
1216 if let Some((_, ctx)) = self.pending_lookup.remove(&record.id) {
1220 if self.pending_find_nodes.contains_key(&record.id) {
1221 ctx.unmark_queried(record.id);
1224 } else {
1225 self.find_node(&record, ctx);
1228 }
1229 }
1230 } else {
1231 match (ping.enr_sq, old_enr) {
1233 (Some(new), Some(old)) if new > old => {
1234 self.send_enr_request(record);
1235 }
1236 (Some(_), None) => {
1237 self.send_enr_request(record);
1238 }
1239 _ => {}
1240 };
1241 }
1242 }
1243
1244 fn try_ping(&mut self, node: NodeRecord, reason: PingReason) {
1246 if node.id == *self.local_peer_id() {
1247 return
1249 }
1250
1251 if self.pending_pings.contains_key(&node.id) ||
1252 self.pending_find_nodes.contains_key(&node.id)
1253 {
1254 return
1255 }
1256
1257 if self.queued_pings.iter().any(|(n, _)| n.id == node.id) {
1258 return
1259 }
1260
1261 if self.pending_pings.len() < MAX_NODES_PING {
1262 self.send_ping(node, reason);
1263 } else if self.queued_pings.len() < MAX_QUEUED_PINGS {
1264 self.queued_pings.push_back((node, reason));
1265 }
1266 }
1267
1268 pub(crate) fn send_ping(&mut self, node: NodeRecord, reason: PingReason) -> B256 {
1272 let remote_addr = node.udp_addr();
1273 let id = node.id;
1274 let ping = Ping {
1275 from: self.local_node_record.into(),
1276 to: node.into(),
1277 expire: self.ping_expiration(),
1278 enr_sq: self.enr_seq(),
1279 };
1280 trace!(target: "discv4", ?ping, "sending ping");
1281 let echo_hash = self.send_packet(Message::Ping(ping), remote_addr);
1282
1283 self.pending_pings
1284 .insert(id, PingRequest { sent_at: Instant::now(), node, echo_hash, reason });
1285 echo_hash
1286 }
1287
1288 pub(crate) fn send_enr_request(&mut self, node: NodeRecord) {
1292 if !self.config.enable_eip868 {
1293 return
1294 }
1295 let remote_addr = node.udp_addr();
1296 let enr_request = EnrRequest { expire: self.enr_request_expiration() };
1297
1298 trace!(target: "discv4", ?enr_request, "sending enr request");
1299 let echo_hash = self.send_packet(Message::EnrRequest(enr_request), remote_addr);
1300
1301 self.pending_enr_requests
1302 .insert(node.id, EnrRequestState { sent_at: Instant::now(), echo_hash });
1303 }
1304
1305 fn on_pong(&mut self, pong: Pong, remote_addr: SocketAddr, remote_id: PeerId) {
1307 if self.is_expired(pong.expire) {
1308 return
1309 }
1310
1311 let PingRequest { node, reason, .. } = match self.pending_pings.entry(remote_id) {
1312 Entry::Occupied(entry) => {
1313 {
1314 let request = entry.get();
1315 if request.echo_hash != pong.echo {
1316 trace!(target: "discv4", from=?remote_addr, expected=?request.echo_hash, echo_hash=?pong.echo,"Got unexpected Pong");
1317 return
1318 }
1319 }
1320 entry.remove()
1321 }
1322 Entry::Vacant(_) => return,
1323 };
1324
1325 self.received_pongs.on_pong(remote_id, remote_addr.ip());
1327
1328 match reason {
1329 PingReason::InitialInsert => {
1330 self.update_on_pong(node, pong.enr_sq);
1331 if self.pending_lookup_reset && self.config.bootstrap_nodes.contains(&node) {
1334 self.pending_lookup_reset = false;
1335 self.lookup_interval.reset();
1336 }
1337 }
1338 PingReason::EstablishBond => {
1339 self.update_on_pong(node, pong.enr_sq);
1341 }
1342 PingReason::RePing => {
1343 self.update_on_reping(node, pong.enr_sq);
1344 }
1345 PingReason::Lookup(node, ctx) => {
1346 self.update_on_pong(node, pong.enr_sq);
1347 self.pending_lookup.insert(node.id, (Instant::now(), ctx));
1352 }
1353 }
1354 }
1355
1356 fn on_find_node(&mut self, msg: FindNode, remote_addr: SocketAddr, node_id: PeerId) {
1358 if self.is_expired(msg.expire) {
1359 return
1361 }
1362 if node_id == *self.local_peer_id() {
1363 return
1365 }
1366
1367 if self.has_bond(node_id, remote_addr.ip()) {
1368 self.respond_closest(msg.id, remote_addr)
1369 }
1370 }
1371
1372 fn on_enr_response(&mut self, msg: EnrResponse, remote_addr: SocketAddr, id: PeerId) {
1374 trace!(target: "discv4", ?remote_addr, ?msg, "received ENR response");
1375 if let Some(resp) = self.pending_enr_requests.remove(&id) {
1376 let enr_id = pk2id(&msg.enr.public_key());
1378 if id != enr_id {
1379 return
1380 }
1381
1382 if resp.echo_hash == msg.request_hash {
1383 let key = kad_key(id);
1384 let fork_id = msg.eth_fork_id();
1385 let (record, old_fork_id) = match self.kbuckets.entry(&key) {
1386 kbucket::Entry::Present(mut entry, _) => {
1387 let id = entry.value_mut().update_with_fork_id(fork_id);
1388 (entry.value().record, id)
1389 }
1390 kbucket::Entry::Pending(mut entry, _) => {
1391 let id = entry.value().update_with_fork_id(fork_id);
1392 (entry.value().record, id)
1393 }
1394 _ => return,
1395 };
1396 match (fork_id, old_fork_id) {
1397 (Some(new), Some(old)) if new != old => {
1398 self.notify(DiscoveryUpdate::EnrForkId(record, new))
1399 }
1400 (Some(new), None) => self.notify(DiscoveryUpdate::EnrForkId(record, new)),
1401 _ => {}
1402 }
1403 }
1404 }
1405 }
1406
1407 fn on_enr_request(
1409 &self,
1410 msg: EnrRequest,
1411 remote_addr: SocketAddr,
1412 id: PeerId,
1413 request_hash: B256,
1414 ) {
1415 if !self.config.enable_eip868 || self.is_expired(msg.expire) {
1416 return
1417 }
1418
1419 if self.has_bond(id, remote_addr.ip()) {
1420 self.send_packet(
1421 Message::EnrResponse(EnrResponse {
1422 request_hash,
1423 enr: self.local_eip_868_enr.clone(),
1424 }),
1425 remote_addr,
1426 );
1427 }
1428 }
1429
1430 fn on_neighbours(&mut self, msg: Neighbours, remote_addr: SocketAddr, node_id: PeerId) {
1433 if self.is_expired(msg.expire) {
1434 return
1436 }
1437 let ctx = match self.pending_find_nodes.entry(node_id) {
1439 Entry::Occupied(mut entry) => {
1440 {
1441 let request = entry.get_mut();
1442 request.answered = true;
1444 let total = request.response_count + msg.nodes.len();
1445
1446 if total <= MAX_NODES_PER_BUCKET {
1448 request.response_count = total;
1449 } else {
1450 trace!(target: "discv4", total, from=?remote_addr, "Received neighbors packet entries exceeds max nodes per bucket");
1451 return
1452 }
1453 };
1454
1455 if entry.get().response_count == MAX_NODES_PER_BUCKET {
1456 let ctx = entry.remove().lookup_context;
1458 ctx.mark_responded(node_id);
1459 ctx
1460 } else {
1461 entry.get().lookup_context.clone()
1462 }
1463 }
1464 Entry::Vacant(_) => {
1465 trace!(target: "discv4", from=?remote_addr, "Received unsolicited Neighbours");
1467 return
1468 }
1469 };
1470
1471 trace!(target: "discv4",
1473 target=format!("{:#?}", node_id),
1474 peers_count=msg.nodes.len(),
1475 peers=format!("[{:#}]", msg.nodes.iter()
1476 .map(|node_rec| node_rec.id
1477 ).format(", ")),
1478 "Received peers from Neighbours packet"
1479 );
1480
1481 for node in msg.nodes.into_iter().map(NodeRecord::into_ipv4_mapped) {
1484 if self.config.ban_list.is_banned(&node.id, &node.address) {
1486 trace!(target: "discv4", peer_id=?node.id, ip=?node.address, "ignoring banned record");
1487 continue
1488 }
1489
1490 ctx.add_node(node);
1491 }
1492
1493 let closest =
1495 ctx.filter_closest(ALPHA, |node| !self.pending_find_nodes.contains_key(&node.id));
1496
1497 for closest in closest {
1498 let key = kad_key(closest.id);
1499 match self.kbuckets.entry(&key) {
1500 BucketEntry::Absent(entry) => {
1501 ctx.mark_queried(closest.id);
1507 let node = NodeEntry::new(closest);
1508 match entry.insert(
1509 node,
1510 NodeStatus {
1511 direction: ConnectionDirection::Outgoing,
1512 state: ConnectionState::Disconnected,
1513 },
1514 ) {
1515 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1516 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1518 }
1519 BucketInsertResult::Full => {
1520 self.notify(DiscoveryUpdate::DiscoveredAtCapacity(closest))
1522 }
1523 _ => {}
1524 }
1525 }
1526 BucketEntry::SelfEntry => {
1527 }
1529 BucketEntry::Present(entry, _) => {
1530 if entry.value().has_endpoint_proof {
1531 if entry
1532 .value()
1533 .exceeds_find_node_failures(self.config.max_find_node_failures)
1534 {
1535 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1536 } else {
1537 self.find_node(&closest, ctx.clone());
1538 }
1539 }
1540 }
1541 BucketEntry::Pending(mut entry, _) => {
1542 if entry.value().has_endpoint_proof {
1543 if entry
1544 .value()
1545 .exceeds_find_node_failures(self.config.max_find_node_failures)
1546 {
1547 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1548 } else {
1549 self.find_node(&closest, ctx.clone());
1550 }
1551 }
1552 }
1553 }
1554 }
1555 }
1556
1557 fn respond_closest(&mut self, target: PeerId, to: SocketAddr) {
1559 let key = kad_key(target);
1560 let expire = self.send_neighbours_expiration();
1561
1562 let closest_nodes =
1564 self.kbuckets.closest_values(&key).take(MAX_NODES_PER_BUCKET).collect::<Vec<_>>();
1565
1566 for nodes in closest_nodes.chunks(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) {
1567 let nodes = nodes.iter().map(|node| node.value.record).collect::<Vec<NodeRecord>>();
1568 trace!(target: "discv4", len = nodes.len(), to=?to,"Sent neighbours packet");
1569 let msg = Message::Neighbours(Neighbours { nodes, expire });
1570 self.send_packet(msg, to);
1571 }
1572 }
1573
1574 fn evict_expired_requests(&mut self, now: Instant) {
1575 self.pending_enr_requests.retain(|_node_id, enr_request| {
1576 now.duration_since(enr_request.sent_at) < self.config.enr_expiration
1577 });
1578
1579 let mut failed_pings = Vec::new();
1580 self.pending_pings.retain(|node_id, ping_request| {
1581 if now.duration_since(ping_request.sent_at) > self.config.ping_expiration {
1582 failed_pings.push(*node_id);
1583 return false
1584 }
1585 true
1586 });
1587
1588 if !failed_pings.is_empty() {
1589 trace!(target: "discv4", num=%failed_pings.len(), "evicting nodes due to failed pong");
1591 for node_id in failed_pings {
1592 self.remove_node(node_id);
1593 }
1594 }
1595
1596 let mut failed_lookups = Vec::new();
1597 self.pending_lookup.retain(|node_id, (lookup_sent_at, _)| {
1598 if now.duration_since(*lookup_sent_at) > self.config.request_timeout {
1599 failed_lookups.push(*node_id);
1600 return false
1601 }
1602 true
1603 });
1604
1605 if !failed_lookups.is_empty() {
1606 trace!(target: "discv4", num=%failed_lookups.len(), "evicting nodes due to failed lookup");
1608 for node_id in failed_lookups {
1609 self.remove_node(node_id);
1610 }
1611 }
1612
1613 self.evict_failed_find_nodes(now);
1614 }
1615
1616 fn evict_failed_find_nodes(&mut self, now: Instant) {
1618 let mut failed_find_nodes = Vec::new();
1619 self.pending_find_nodes.retain(|node_id, find_node_request| {
1620 if now.duration_since(find_node_request.sent_at) > self.config.neighbours_expiration {
1621 if !find_node_request.answered {
1622 failed_find_nodes.push(*node_id);
1625 }
1626 return false
1627 }
1628 true
1629 });
1630
1631 if failed_find_nodes.is_empty() {
1632 return
1633 }
1634
1635 trace!(target: "discv4", num=%failed_find_nodes.len(), "processing failed find nodes");
1636
1637 for node_id in failed_find_nodes {
1638 let key = kad_key(node_id);
1639 let failures = match self.kbuckets.entry(&key) {
1640 kbucket::Entry::Present(mut entry, _) => {
1641 entry.value_mut().inc_failed_request();
1642 entry.value().find_node_failures
1643 }
1644 kbucket::Entry::Pending(mut entry, _) => {
1645 entry.value().inc_failed_request();
1646 entry.value().find_node_failures
1647 }
1648 _ => continue,
1649 };
1650
1651 if failures > self.config.max_find_node_failures {
1655 self.soft_remove_node(node_id);
1656 }
1657 }
1658 }
1659
1660 fn re_ping_oldest(&mut self) {
1665 let mut nodes = self
1666 .kbuckets
1667 .iter_ref()
1668 .filter(|entry| entry.node.value.is_expired())
1669 .map(|n| n.node.value)
1670 .collect::<Vec<_>>();
1671 nodes.sort_unstable_by_key(|a| a.last_seen);
1672 let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::<Vec<_>>();
1673 for node in to_ping {
1674 self.try_ping(node, PingReason::RePing)
1675 }
1676 }
1677
1678 fn is_expired(&self, expiration: u64) -> bool {
1680 self.ensure_not_expired(expiration).is_err()
1681 }
1682
1683 fn ensure_not_expired(&self, timestamp: u64) -> Result<(), ()> {
1693 let _ = i64::try_from(timestamp).map_err(drop)?;
1695
1696 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
1697 if self.config.enforce_expiration_timestamps && timestamp < now {
1698 trace!(target: "discv4", "Expired packet");
1699 return Err(())
1700 }
1701 Ok(())
1702 }
1703
1704 fn ping_buffered(&mut self) {
1706 while self.pending_pings.len() < MAX_NODES_PING {
1707 match self.queued_pings.pop_front() {
1708 Some((next, reason)) => self.try_ping(next, reason),
1709 None => break,
1710 }
1711 }
1712 }
1713
1714 fn ping_expiration(&self) -> u64 {
1715 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.ping_expiration)
1716 .as_secs()
1717 }
1718
1719 fn find_node_expiration(&self) -> u64 {
1720 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.request_timeout)
1721 .as_secs()
1722 }
1723
1724 fn enr_request_expiration(&self) -> u64 {
1725 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.enr_expiration)
1726 .as_secs()
1727 }
1728
1729 fn send_neighbours_expiration(&self) -> u64 {
1730 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.neighbours_expiration)
1731 .as_secs()
1732 }
1733
1734 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Discv4Event> {
1740 loop {
1741 if let Some(event) = self.queued_events.pop_front() {
1743 return Poll::Ready(event)
1744 }
1745
1746 if self.config.enable_lookup {
1748 while self.lookup_interval.poll_tick(cx).is_ready() {
1749 let target = self.lookup_rotator.next(&self.local_node_record.id);
1750 self.lookup_with(target, None);
1751 }
1752 }
1753
1754 while self.ping_interval.poll_tick(cx).is_ready() {
1756 self.re_ping_oldest();
1757 }
1758
1759 if let Some(Poll::Ready(Some(ip))) =
1760 self.resolve_external_ip_interval.as_mut().map(|r| r.poll_tick(cx))
1761 {
1762 self.set_external_ip_addr(ip);
1763 }
1764
1765 while let Poll::Ready(Some(cmd)) = self.commands_rx.poll_recv(cx) {
1767 match cmd {
1768 Discv4Command::Add(enr) => {
1769 self.add_node(enr);
1770 }
1771 Discv4Command::AddBootNode(record) => {
1772 self.add_boot_node(record);
1773 }
1774 Discv4Command::Lookup { node_id, tx } => {
1775 let node_id = node_id.unwrap_or(self.local_node_record.id);
1776 self.lookup_with(node_id, tx);
1777 }
1778 Discv4Command::SetLookupInterval(duration) => {
1779 self.set_lookup_interval(duration);
1780 }
1781 Discv4Command::Updates(tx) => {
1782 let rx = self.update_stream();
1783 let _ = tx.send(rx);
1784 }
1785 Discv4Command::BanPeer(node_id) => self.ban_node(node_id),
1786 Discv4Command::Remove(node_id) => {
1787 self.remove_node(node_id);
1788 }
1789 Discv4Command::Ban(node_id, ip) => {
1790 self.ban_node(node_id);
1791 self.ban_ip(ip);
1792 }
1793 Discv4Command::BanIp(ip) => {
1794 self.ban_ip(ip);
1795 }
1796 Discv4Command::SetEIP868RLPPair { key, rlp } => {
1797 debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair");
1798
1799 let _ = self.local_eip_868_enr.insert_raw_rlp(key, rlp, &self.secret_key);
1800 }
1801 Discv4Command::SetTcpPort(port) => {
1802 debug!(target: "discv4", %port, "Update tcp port");
1803 self.local_node_record.tcp_port = port;
1804 if self.local_node_record.address.is_ipv4() {
1805 let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key);
1806 } else {
1807 let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key);
1808 }
1809 }
1810
1811 Discv4Command::Terminated => {
1812 self.queued_events.push_back(Discv4Event::Terminated);
1814 }
1815 }
1816 }
1817
1818 let mut udp_message_budget = UDP_MESSAGE_POLL_LOOP_BUDGET;
1820
1821 while let Poll::Ready(Some(event)) = self.ingress.poll_recv(cx) {
1823 match event {
1824 IngressEvent::RecvError(err) => {
1825 debug!(target: "discv4", %err, "failed to read datagram");
1826 }
1827 IngressEvent::BadPacket(from, err, data) => {
1828 trace!(target: "discv4", ?from, %err, packet=?hex::encode(&data), "bad packet");
1829 }
1830 IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => {
1831 trace!(target: "discv4", r#type=?msg.msg_type(), from=?remote_addr,"received packet");
1832 let event = match msg {
1833 Message::Ping(ping) => {
1834 self.on_ping(ping, remote_addr, node_id, hash);
1835 Discv4Event::Ping
1836 }
1837 Message::Pong(pong) => {
1838 self.on_pong(pong, remote_addr, node_id);
1839 Discv4Event::Pong
1840 }
1841 Message::FindNode(msg) => {
1842 self.on_find_node(msg, remote_addr, node_id);
1843 Discv4Event::FindNode
1844 }
1845 Message::Neighbours(msg) => {
1846 self.on_neighbours(msg, remote_addr, node_id);
1847 Discv4Event::Neighbours
1848 }
1849 Message::EnrRequest(msg) => {
1850 self.on_enr_request(msg, remote_addr, node_id, hash);
1851 Discv4Event::EnrRequest
1852 }
1853 Message::EnrResponse(msg) => {
1854 self.on_enr_response(msg, remote_addr, node_id);
1855 Discv4Event::EnrResponse
1856 }
1857 };
1858
1859 self.queued_events.push_back(event);
1860 }
1861 }
1862
1863 udp_message_budget -= 1;
1864 if udp_message_budget < 0 {
1865 trace!(target: "discv4", budget=UDP_MESSAGE_POLL_LOOP_BUDGET, "exhausted message poll budget");
1866 if self.queued_events.is_empty() {
1867 cx.waker().wake_by_ref();
1870 }
1871 break
1872 }
1873 }
1874
1875 self.ping_buffered();
1877
1878 while self.evict_expired_requests_interval.poll_tick(cx).is_ready() {
1880 self.evict_expired_requests(Instant::now());
1881 }
1882
1883 while self.expire_interval.poll_tick(cx).is_ready() {
1885 self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION);
1886 }
1887
1888 if self.queued_events.is_empty() {
1889 return Poll::Pending
1890 }
1891 }
1892 }
1893}
1894
1895impl Stream for Discv4Service {
1897 type Item = Discv4Event;
1898
1899 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1900 match ready!(self.get_mut().poll(cx)) {
1902 Discv4Event::Terminated => Poll::Ready(None),
1904 ev => Poll::Ready(Some(ev)),
1906 }
1907 }
1908}
1909
1910impl fmt::Debug for Discv4Service {
1911 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1912 f.debug_struct("Discv4Service")
1913 .field("local_address", &self.local_address)
1914 .field("local_peer_id", &self.local_peer_id())
1915 .field("local_node_record", &self.local_node_record)
1916 .field("queued_pings", &self.queued_pings)
1917 .field("pending_lookup", &self.pending_lookup)
1918 .field("pending_find_nodes", &self.pending_find_nodes)
1919 .field("lookup_interval", &self.lookup_interval)
1920 .finish_non_exhaustive()
1921 }
1922}
1923
1924#[derive(Debug, Eq, PartialEq)]
1928pub enum Discv4Event {
1929 Ping,
1931 Pong,
1933 FindNode,
1935 Neighbours,
1937 EnrRequest,
1939 EnrResponse,
1941 Terminated,
1943}
1944
1945pub(crate) async fn send_loop(udp: Arc<UdpSocket>, rx: EgressReceiver) {
1947 let mut stream = ReceiverStream::new(rx);
1948 while let Some((payload, to)) = stream.next().await {
1949 match udp.send_to(&payload, to).await {
1950 Ok(size) => {
1951 trace!(target: "discv4", ?to, ?size,"sent payload");
1952 }
1953 Err(err) => {
1954 debug!(target: "discv4", ?to, %err,"Failed to send datagram.");
1955 }
1956 }
1957 }
1958}
1959
1960const MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP: usize = 60usize;
1962
1963pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_id: PeerId) {
1968 let send = |event: IngressEvent| async {
1969 let _ = tx.send(event).await.map_err(|err| {
1970 debug!(
1971 target: "discv4",
1972 %err,
1973 "failed send incoming packet",
1974 )
1975 });
1976 };
1977
1978 let mut cache = ReceiveCache::default();
1979
1980 let tick = MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP / 2;
1982 let mut interval = tokio::time::interval(Duration::from_secs(tick as u64));
1983
1984 let mut buf = [0; MAX_PACKET_SIZE];
1985 loop {
1986 let res = udp.recv_from(&mut buf).await;
1987 match res {
1988 Err(err) => {
1989 debug!(target: "discv4", %err, "Failed to read datagram.");
1990 send(IngressEvent::RecvError(err)).await;
1991 }
1992 Ok((read, remote_addr)) => {
1993 if cache.inc_ip(remote_addr.ip()) > MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP {
1995 trace!(target: "discv4", ?remote_addr, "Too many incoming packets from IP.");
1996 continue
1997 }
1998
1999 let packet = &buf[..read];
2000 match Message::decode(packet) {
2001 Ok(packet) => {
2002 if packet.node_id == local_id {
2003 debug!(target: "discv4", ?remote_addr, "Received own packet.");
2005 continue
2006 }
2007
2008 if cache.contains_packet(packet.hash) {
2010 debug!(target: "discv4", ?remote_addr, "Received duplicate packet.");
2011 continue
2012 }
2013
2014 send(IngressEvent::Packet(remote_addr, packet)).await;
2015 }
2016 Err(err) => {
2017 trace!(target: "discv4", %err,"Failed to decode packet");
2018 send(IngressEvent::BadPacket(remote_addr, err, packet.to_vec())).await
2019 }
2020 }
2021 }
2022 }
2023
2024 if poll_fn(|cx| match interval.poll_tick(cx) {
2026 Poll::Ready(_) => Poll::Ready(true),
2027 Poll::Pending => Poll::Ready(false),
2028 })
2029 .await
2030 {
2031 cache.tick_ips(tick);
2032 }
2033 }
2034}
2035
2036struct ReceiveCache {
2040 ip_messages: HashMap<IpAddr, usize>,
2046 unique_packets: schnellru::LruMap<B256, ()>,
2048}
2049
2050impl ReceiveCache {
2051 fn tick_ips(&mut self, tick: usize) {
2055 self.ip_messages.retain(|_, count| {
2056 if let Some(reset) = count.checked_sub(tick) {
2057 *count = reset;
2058 true
2059 } else {
2060 false
2061 }
2062 });
2063 }
2064
2065 fn inc_ip(&mut self, ip: IpAddr) -> usize {
2067 let ctn = self.ip_messages.entry(ip).or_default();
2068 *ctn = ctn.saturating_add(1);
2069 *ctn
2070 }
2071
2072 fn contains_packet(&mut self, hash: B256) -> bool {
2074 !self.unique_packets.insert(hash, ())
2075 }
2076}
2077
2078impl Default for ReceiveCache {
2079 fn default() -> Self {
2080 Self {
2081 ip_messages: Default::default(),
2082 unique_packets: schnellru::LruMap::new(schnellru::ByLength::new(32)),
2083 }
2084 }
2085}
2086
2087enum Discv4Command {
2089 Add(NodeRecord),
2090 AddBootNode(NodeRecord),
2091 SetTcpPort(u16),
2092 SetEIP868RLPPair { key: Vec<u8>, rlp: Bytes },
2093 Ban(PeerId, IpAddr),
2094 BanPeer(PeerId),
2095 BanIp(IpAddr),
2096 Remove(PeerId),
2097 Lookup { node_id: Option<PeerId>, tx: Option<NodeRecordSender> },
2098 SetLookupInterval(Duration),
2099 Updates(OneshotSender<ReceiverStream<DiscoveryUpdate>>),
2100 Terminated,
2101}
2102
2103#[derive(Debug)]
2105pub(crate) enum IngressEvent {
2106 RecvError(io::Error),
2108 BadPacket(SocketAddr, DecodePacketError, Vec<u8>),
2110 Packet(SocketAddr, Packet),
2112}
2113
2114#[derive(Debug)]
2116struct PingRequest {
2117 sent_at: Instant,
2119 node: NodeRecord,
2121 echo_hash: B256,
2123 reason: PingReason,
2125}
2126
2127#[derive(Debug)]
2131struct LookupTargetRotator {
2132 interval: usize,
2133 counter: usize,
2134}
2135
2136impl LookupTargetRotator {
2139 const fn local_only() -> Self {
2141 Self { interval: 1, counter: 0 }
2142 }
2143}
2144
2145impl Default for LookupTargetRotator {
2146 fn default() -> Self {
2147 Self {
2148 interval: 4,
2150 counter: 3,
2151 }
2152 }
2153}
2154
2155impl LookupTargetRotator {
2156 fn next(&mut self, local: &PeerId) -> PeerId {
2158 self.counter += 1;
2159 self.counter %= self.interval;
2160 if self.counter == 0 {
2161 return *local
2162 }
2163 PeerId::random()
2164 }
2165}
2166
2167#[derive(Clone, Debug)]
2172struct LookupContext {
2173 inner: Rc<LookupContextInner>,
2174}
2175
2176impl LookupContext {
2177 fn new(
2179 target: discv5::Key<NodeKey>,
2180 nearest_nodes: impl IntoIterator<Item = (Distance, NodeRecord)>,
2181 listener: Option<NodeRecordSender>,
2182 ) -> Self {
2183 let closest_nodes = nearest_nodes
2184 .into_iter()
2185 .map(|(distance, record)| {
2186 (distance, QueryNode { record, queried: false, responded: false })
2187 })
2188 .collect();
2189
2190 let inner = Rc::new(LookupContextInner {
2191 target,
2192 closest_nodes: RefCell::new(closest_nodes),
2193 listener,
2194 });
2195 Self { inner }
2196 }
2197
2198 fn target(&self) -> PeerId {
2200 self.inner.target.preimage().0
2201 }
2202
2203 fn closest(&self, num: usize) -> Vec<NodeRecord> {
2204 self.inner
2205 .closest_nodes
2206 .borrow()
2207 .iter()
2208 .filter(|(_, node)| !node.queried)
2209 .map(|(_, n)| n.record)
2210 .take(num)
2211 .collect()
2212 }
2213
2214 fn filter_closest<P>(&self, num: usize, filter: P) -> Vec<NodeRecord>
2216 where
2217 P: FnMut(&NodeRecord) -> bool,
2218 {
2219 self.inner
2220 .closest_nodes
2221 .borrow()
2222 .iter()
2223 .filter(|(_, node)| !node.queried)
2224 .map(|(_, n)| n.record)
2225 .filter(filter)
2226 .take(num)
2227 .collect()
2228 }
2229
2230 fn add_node(&self, record: NodeRecord) {
2232 let distance = self.inner.target.distance(&kad_key(record.id));
2233 let mut closest = self.inner.closest_nodes.borrow_mut();
2234 if let btree_map::Entry::Vacant(entry) = closest.entry(distance) {
2235 entry.insert(QueryNode { record, queried: false, responded: false });
2236 }
2237 }
2238
2239 fn set_queried(&self, id: PeerId, val: bool) {
2240 if let Some((_, node)) =
2241 self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2242 {
2243 node.queried = val;
2244 }
2245 }
2246
2247 fn mark_queried(&self, id: PeerId) {
2249 self.set_queried(id, true)
2250 }
2251
2252 fn unmark_queried(&self, id: PeerId) {
2254 self.set_queried(id, false)
2255 }
2256
2257 fn mark_responded(&self, id: PeerId) {
2259 if let Some((_, node)) =
2260 self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2261 {
2262 node.responded = true;
2263 }
2264 }
2265}
2266
2267unsafe impl Send for LookupContext {}
2274#[derive(Debug)]
2275struct LookupContextInner {
2276 target: discv5::Key<NodeKey>,
2278 closest_nodes: RefCell<BTreeMap<Distance, QueryNode>>,
2280 listener: Option<NodeRecordSender>,
2285}
2286
2287impl Drop for LookupContextInner {
2288 fn drop(&mut self) {
2289 if let Some(tx) = self.listener.take() {
2290 let nodes = self
2293 .closest_nodes
2294 .take()
2295 .into_values()
2296 .filter(|node| node.responded)
2297 .map(|node| node.record)
2298 .collect();
2299 let _ = tx.send(nodes);
2300 }
2301 }
2302}
2303
2304#[derive(Debug, Clone, Copy)]
2306struct QueryNode {
2307 record: NodeRecord,
2308 queried: bool,
2309 responded: bool,
2310}
2311
2312#[derive(Debug)]
2313struct FindNodeRequest {
2314 sent_at: Instant,
2316 response_count: usize,
2318 answered: bool,
2320 lookup_context: LookupContext,
2322}
2323
2324impl FindNodeRequest {
2327 fn new(resp: LookupContext) -> Self {
2328 Self { sent_at: Instant::now(), response_count: 0, answered: false, lookup_context: resp }
2329 }
2330}
2331
2332#[derive(Debug)]
2334struct CachedFindNode {
2335 target: PeerId,
2336 payload: Bytes,
2337 hash: B256,
2338 cached_at: Instant,
2339}
2340
2341impl CachedFindNode {
2342 fn get_or_sign(
2345 cache: &mut Option<Self>,
2346 target: PeerId,
2347 ttl: Duration,
2348 secret_key: &secp256k1::SecretKey,
2349 expire: u64,
2350 ) -> (Bytes, B256) {
2351 if let Some(c) = cache.as_ref() &&
2352 c.target == target &&
2353 c.cached_at.elapsed() < ttl
2354 {
2355 return (c.payload.clone(), c.hash);
2356 }
2357
2358 let msg = Message::FindNode(FindNode { id: target, expire });
2359 let (payload, hash) = msg.encode(secret_key);
2360
2361 *cache = Some(Self { target, payload: payload.clone(), hash, cached_at: Instant::now() });
2362
2363 (payload, hash)
2364 }
2365}
2366
2367#[derive(Debug)]
2368struct EnrRequestState {
2369 sent_at: Instant,
2371 echo_hash: B256,
2373}
2374
2375#[derive(Debug, Clone, Eq, PartialEq)]
2377struct NodeEntry {
2378 record: NodeRecord,
2380 last_seen: Instant,
2382 last_enr_seq: Option<u64>,
2384 fork_id: Option<ForkId>,
2386 find_node_failures: u8,
2388 has_endpoint_proof: bool,
2390}
2391
2392impl NodeEntry {
2395 fn new(record: NodeRecord) -> Self {
2397 Self {
2398 record,
2399 last_seen: Instant::now(),
2400 last_enr_seq: None,
2401 fork_id: None,
2402 find_node_failures: 0,
2403 has_endpoint_proof: false,
2404 }
2405 }
2406
2407 #[cfg(test)]
2408 fn new_proven(record: NodeRecord) -> Self {
2409 let mut node = Self::new(record);
2410 node.has_endpoint_proof = true;
2411 node
2412 }
2413
2414 const fn establish_proof(&mut self) {
2416 self.has_endpoint_proof = true;
2417 self.find_node_failures = 0;
2418 }
2419
2420 const fn exceeds_find_node_failures(&self, max_failures: u8) -> bool {
2422 self.find_node_failures >= max_failures
2423 }
2424
2425 fn update_with_enr(&mut self, last_enr_seq: Option<u64>) -> Option<u64> {
2427 self.update_now(|s| std::mem::replace(&mut s.last_enr_seq, last_enr_seq))
2428 }
2429
2430 const fn inc_failed_request(&mut self) {
2432 self.find_node_failures += 1;
2433 }
2434
2435 fn update_with_fork_id(&mut self, fork_id: Option<ForkId>) -> Option<ForkId> {
2437 self.update_now(|s| std::mem::replace(&mut s.fork_id, fork_id))
2438 }
2439
2440 fn update_now<F, R>(&mut self, f: F) -> R
2442 where
2443 F: FnOnce(&mut Self) -> R,
2444 {
2445 self.last_seen = Instant::now();
2446 f(self)
2447 }
2448}
2449
2450impl NodeEntry {
2453 fn is_expired(&self) -> bool {
2455 self.last_seen.elapsed() > (ENDPOINT_PROOF_EXPIRATION / 2)
2456 }
2457}
2458
2459#[derive(Debug)]
2461enum PingReason {
2462 InitialInsert,
2464 EstablishBond,
2466 RePing,
2468 Lookup(NodeRecord, LookupContext),
2470}
2471
2472#[derive(Debug, Clone)]
2474pub enum DiscoveryUpdate {
2475 Added(NodeRecord),
2477 DiscoveredAtCapacity(NodeRecord),
2479 EnrForkId(NodeRecord, ForkId),
2481 Removed(PeerId),
2483 Batch(Vec<Self>),
2485}
2486
2487#[cfg(test)]
2488mod tests {
2489 use super::*;
2490 use crate::test_utils::{create_discv4, create_discv4_with_config, rng_endpoint, rng_record};
2491 use alloy_primitives::hex;
2492 use alloy_rlp::{Decodable, Encodable};
2493 use rand_08::Rng;
2494 use reth_ethereum_forks::{EnrForkIdEntry, ForkHash};
2495 use reth_network_peers::mainnet_nodes;
2496 use std::future::poll_fn;
2497
2498 #[tokio::test]
2499 async fn test_configured_enr_forkid_entry() {
2500 let fork: ForkId = ForkId { hash: ForkHash([220, 233, 108, 45]), next: 0u64 };
2501 let mut disc_conf = Discv4Config::default();
2502 disc_conf.add_eip868_pair("eth", EnrForkIdEntry::from(fork));
2503 let (_discv4, service) = create_discv4_with_config(disc_conf).await;
2504 let eth = service.local_eip_868_enr.get_raw_rlp(b"eth").unwrap();
2505 let fork_entry_id = EnrForkIdEntry::decode(&mut ð[..]).unwrap();
2506
2507 let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2508 let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2509 let expected = EnrForkIdEntry {
2510 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2511 };
2512 assert_eq!(expected, fork_entry_id);
2513 assert_eq!(expected, decoded);
2514 }
2515
2516 #[test]
2517 fn test_enr_forkid_entry_decode() {
2518 let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2519 let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2520 let expected = EnrForkIdEntry {
2521 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2522 };
2523 assert_eq!(expected, decoded);
2524 }
2525
2526 #[test]
2527 fn test_enr_forkid_entry_encode() {
2528 let original = EnrForkIdEntry {
2529 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2530 };
2531 let expected: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2532 let mut encoded = Vec::with_capacity(expected.len());
2533 original.encode(&mut encoded);
2534 assert_eq!(&expected[..], encoded.as_slice());
2535 }
2536
2537 #[test]
2538 fn test_local_rotator() {
2539 let id = PeerId::random();
2540 let mut rotator = LookupTargetRotator::local_only();
2541 assert_eq!(rotator.next(&id), id);
2542 assert_eq!(rotator.next(&id), id);
2543 }
2544
2545 #[test]
2546 fn test_rotator() {
2547 let id = PeerId::random();
2548 let mut rotator = LookupTargetRotator::default();
2549 assert_eq!(rotator.next(&id), id);
2550 assert_ne!(rotator.next(&id), id);
2551 assert_ne!(rotator.next(&id), id);
2552 assert_ne!(rotator.next(&id), id);
2553 assert_eq!(rotator.next(&id), id);
2554 }
2555
2556 #[tokio::test]
2557 async fn test_pending_ping() {
2558 let (_, mut service) = create_discv4().await;
2559
2560 let local_addr = service.local_addr();
2561
2562 let mut num_inserted = 0;
2563 loop {
2564 let node = NodeRecord::new(local_addr, PeerId::random());
2565 if service.add_node(node) {
2566 num_inserted += 1;
2567 assert!(service.pending_pings.contains_key(&node.id));
2568 assert_eq!(service.pending_pings.len(), num_inserted);
2569 if num_inserted == MAX_NODES_PING {
2570 break
2571 }
2572 }
2573 }
2574
2575 num_inserted = 0;
2577 for _ in 0..MAX_NODES_PING {
2578 let node = NodeRecord::new(local_addr, PeerId::random());
2579 if service.add_node(node) {
2580 num_inserted += 1;
2581 assert!(!service.pending_pings.contains_key(&node.id));
2582 assert_eq!(service.pending_pings.len(), MAX_NODES_PING);
2583 assert_eq!(service.queued_pings.len(), num_inserted);
2584 }
2585 }
2586 }
2587
2588 #[tokio::test(flavor = "multi_thread")]
2590 #[ignore]
2591 async fn test_mainnet_lookup() {
2592 reth_tracing::init_test_tracing();
2593 let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2594
2595 let all_nodes = mainnet_nodes();
2596 let config = Discv4Config::builder()
2597 .add_boot_nodes(all_nodes)
2598 .lookup_interval(Duration::from_secs(1))
2599 .add_eip868_pair("eth", fork_id)
2600 .build();
2601 let (_discv4, mut service) = create_discv4_with_config(config).await;
2602
2603 let mut updates = service.update_stream();
2604
2605 let _handle = service.spawn();
2606
2607 let mut table = HashMap::new();
2608 while let Some(update) = updates.next().await {
2609 match update {
2610 DiscoveryUpdate::EnrForkId(record, fork_id) => {
2611 println!("{record:?}, {fork_id:?}");
2612 }
2613 DiscoveryUpdate::Added(record) => {
2614 table.insert(record.id, record);
2615 }
2616 DiscoveryUpdate::Removed(id) => {
2617 table.remove(&id);
2618 }
2619 _ => {}
2620 }
2621 println!("total peers {}", table.len());
2622 }
2623 }
2624
2625 #[tokio::test]
2626 async fn test_mapped_ipv4() {
2627 reth_tracing::init_test_tracing();
2628 let mut rng = rand_08::thread_rng();
2629 let config = Discv4Config::builder().build();
2630 let (_discv4, mut service) = create_discv4_with_config(config).await;
2631
2632 let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2633 let v6 = v4.to_ipv6_mapped();
2634 let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2635
2636 let ping = Ping {
2637 from: rng_endpoint(&mut rng),
2638 to: rng_endpoint(&mut rng),
2639 expire: service.ping_expiration(),
2640 enr_sq: Some(rng.r#gen()),
2641 };
2642
2643 let id = PeerId::random();
2644 service.on_ping(ping, addr, id, B256::random());
2645
2646 let key = kad_key(id);
2647 match service.kbuckets.entry(&key) {
2648 kbucket::Entry::Present(entry, _) => {
2649 let node_addr = entry.value().record.address;
2650 assert!(node_addr.is_ipv4());
2651 assert_eq!(node_addr, IpAddr::from(v4));
2652 }
2653 _ => unreachable!(),
2654 };
2655 }
2656
2657 #[tokio::test]
2658 async fn test_respect_ping_expiration() {
2659 reth_tracing::init_test_tracing();
2660 let mut rng = rand_08::thread_rng();
2661 let config = Discv4Config::builder().build();
2662 let (_discv4, mut service) = create_discv4_with_config(config).await;
2663
2664 let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2665 let v6 = v4.to_ipv6_mapped();
2666 let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2667
2668 let ping = Ping {
2669 from: rng_endpoint(&mut rng),
2670 to: rng_endpoint(&mut rng),
2671 expire: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() - 1,
2672 enr_sq: Some(rng.r#gen()),
2673 };
2674
2675 let id = PeerId::random();
2676 service.on_ping(ping, addr, id, B256::random());
2677
2678 let key = kad_key(id);
2679 match service.kbuckets.entry(&key) {
2680 kbucket::Entry::Absent(_) => {}
2681 _ => unreachable!(),
2682 };
2683 }
2684
2685 #[tokio::test]
2686 async fn test_single_lookups() {
2687 reth_tracing::init_test_tracing();
2688
2689 let config = Discv4Config::builder().build();
2690 let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2691
2692 let id = PeerId::random();
2693 let key = kad_key(id);
2694 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2695
2696 let _ = service.kbuckets.insert_or_update(
2697 &key,
2698 NodeEntry::new_proven(record),
2699 NodeStatus {
2700 direction: ConnectionDirection::Incoming,
2701 state: ConnectionState::Connected,
2702 },
2703 );
2704
2705 service.lookup_self();
2706 assert_eq!(service.pending_find_nodes.len(), 1);
2707
2708 poll_fn(|cx| {
2709 let _ = service.poll(cx);
2710 assert_eq!(service.pending_find_nodes.len(), 1);
2711
2712 Poll::Ready(())
2713 })
2714 .await;
2715 }
2716
2717 #[tokio::test]
2718 async fn test_on_neighbours_recursive_lookup() {
2719 reth_tracing::init_test_tracing();
2720
2721 let config = Discv4Config::builder().build();
2722 let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2723 let (_discv4, mut service2) = create_discv4_with_config(config).await;
2724
2725 let id = PeerId::random();
2726 let key = kad_key(id);
2727 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2728
2729 let _ = service.kbuckets.insert_or_update(
2730 &key,
2731 NodeEntry::new_proven(record),
2732 NodeStatus {
2733 direction: ConnectionDirection::Incoming,
2734 state: ConnectionState::Connected,
2735 },
2736 );
2737 service.lookup_self();
2740 assert_eq!(service.pending_find_nodes.len(), 1);
2741
2742 poll_fn(|cx| {
2743 let _ = service.poll(cx);
2744 assert_eq!(service.pending_find_nodes.len(), 1);
2745
2746 Poll::Ready(())
2747 })
2748 .await;
2749
2750 let expiry = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() +
2751 10000000000000;
2752 let msg = Neighbours { nodes: vec![service2.local_node_record], expire: expiry };
2753 service.on_neighbours(msg, record.tcp_addr(), id);
2754 let event = poll_fn(|cx| service2.poll(cx)).await;
2756 assert_eq!(event, Discv4Event::Ping);
2757 assert_eq!(service.pending_find_nodes.len(), 1);
2760 let event = poll_fn(|cx| service.poll(cx)).await;
2762 assert_eq!(event, Discv4Event::Pong);
2763 let event = poll_fn(|cx| service.poll(cx)).await;
2768 assert_eq!(event, Discv4Event::Ping);
2769 assert_eq!(service.pending_find_nodes.len(), 2);
2772 }
2773
2774 #[tokio::test]
2775 async fn test_no_local_in_closest() {
2776 reth_tracing::init_test_tracing();
2777
2778 let config = Discv4Config::builder().build();
2779 let (_discv4, mut service) = create_discv4_with_config(config).await;
2780
2781 let target_key = kad_key(PeerId::random());
2782
2783 let id = PeerId::random();
2784 let key = kad_key(id);
2785 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2786
2787 let _ = service.kbuckets.insert_or_update(
2788 &key,
2789 NodeEntry::new(record),
2790 NodeStatus {
2791 direction: ConnectionDirection::Incoming,
2792 state: ConnectionState::Connected,
2793 },
2794 );
2795
2796 let closest = service
2797 .kbuckets
2798 .closest_values(&target_key)
2799 .map(|n| n.value.record)
2800 .take(MAX_NODES_PER_BUCKET)
2801 .collect::<Vec<_>>();
2802
2803 assert_eq!(closest.len(), 1);
2804 assert!(!closest.iter().any(|r| r.id == *service.local_peer_id()));
2805 }
2806
2807 #[tokio::test]
2808 async fn test_random_lookup() {
2809 reth_tracing::init_test_tracing();
2810
2811 let config = Discv4Config::builder().build();
2812 let (_discv4, mut service) = create_discv4_with_config(config).await;
2813
2814 let target = PeerId::random();
2815
2816 let id = PeerId::random();
2817 let key = kad_key(id);
2818 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2819
2820 let _ = service.kbuckets.insert_or_update(
2821 &key,
2822 NodeEntry::new_proven(record),
2823 NodeStatus {
2824 direction: ConnectionDirection::Incoming,
2825 state: ConnectionState::Connected,
2826 },
2827 );
2828
2829 service.lookup(target);
2830 assert_eq!(service.pending_find_nodes.len(), 1);
2831
2832 let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2833
2834 assert_eq!(ctx.target(), target);
2835 assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2836
2837 ctx.add_node(record);
2838 assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2839 }
2840
2841 #[tokio::test]
2842 async fn test_reping_on_find_node_failures() {
2843 reth_tracing::init_test_tracing();
2844
2845 let config = Discv4Config::builder().build();
2846 let (_discv4, mut service) = create_discv4_with_config(config).await;
2847
2848 let target = PeerId::random();
2849
2850 let id = PeerId::random();
2851 let key = kad_key(id);
2852 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2853
2854 let mut entry = NodeEntry::new_proven(record);
2855 entry.find_node_failures = u8::MAX;
2856 let _ = service.kbuckets.insert_or_update(
2857 &key,
2858 entry,
2859 NodeStatus {
2860 direction: ConnectionDirection::Incoming,
2861 state: ConnectionState::Connected,
2862 },
2863 );
2864
2865 service.lookup(target);
2866 assert_eq!(service.pending_find_nodes.len(), 0);
2867 assert_eq!(service.pending_pings.len(), 1);
2868
2869 service.update_on_pong(record, None);
2870
2871 service
2872 .on_entry(record.id, |entry| {
2873 assert_eq!(entry.find_node_failures, 0);
2875 assert!(entry.has_endpoint_proof);
2876 })
2877 .unwrap();
2878 }
2879
2880 #[tokio::test]
2881 async fn test_service_commands() {
2882 reth_tracing::init_test_tracing();
2883
2884 let config = Discv4Config::builder().build();
2885 let (discv4, mut service) = create_discv4_with_config(config).await;
2886
2887 service.lookup_self();
2888
2889 let _handle = service.spawn();
2890 discv4.send_lookup_self();
2891 let _ = discv4.lookup_self().await;
2892 }
2893
2894 #[tokio::test]
2895 async fn test_requests_timeout() {
2896 reth_tracing::init_test_tracing();
2897 let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2898
2899 let config = Discv4Config::builder()
2900 .request_timeout(Duration::from_millis(200))
2901 .ping_expiration(Duration::from_millis(200))
2902 .lookup_neighbours_expiration(Duration::from_millis(200))
2903 .add_eip868_pair("eth", fork_id)
2904 .build();
2905 let (_disv4, mut service) = create_discv4_with_config(config).await;
2906
2907 let id = PeerId::random();
2908 let key = kad_key(id);
2909 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2910
2911 let _ = service.kbuckets.insert_or_update(
2912 &key,
2913 NodeEntry::new_proven(record),
2914 NodeStatus {
2915 direction: ConnectionDirection::Incoming,
2916 state: ConnectionState::Connected,
2917 },
2918 );
2919
2920 service.lookup_self();
2921 assert_eq!(service.pending_find_nodes.len(), 1);
2922
2923 let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2924
2925 service.pending_lookup.insert(record.id, (Instant::now(), ctx));
2926
2927 assert_eq!(service.pending_lookup.len(), 1);
2928
2929 let ping = Ping {
2930 from: service.local_node_record.into(),
2931 to: record.into(),
2932 expire: service.ping_expiration(),
2933 enr_sq: service.enr_seq(),
2934 };
2935 let echo_hash = service.send_packet(Message::Ping(ping), record.udp_addr());
2936 let ping_request = PingRequest {
2937 sent_at: Instant::now(),
2938 node: record,
2939 echo_hash,
2940 reason: PingReason::InitialInsert,
2941 };
2942 service.pending_pings.insert(record.id, ping_request);
2943
2944 assert_eq!(service.pending_pings.len(), 1);
2945
2946 tokio::time::sleep(Duration::from_secs(1)).await;
2947
2948 poll_fn(|cx| {
2949 let _ = service.poll(cx);
2950
2951 assert_eq!(service.pending_find_nodes.len(), 0);
2952 assert_eq!(service.pending_lookup.len(), 0);
2953 assert_eq!(service.pending_pings.len(), 0);
2954
2955 Poll::Ready(())
2956 })
2957 .await;
2958 }
2959
2960 #[tokio::test(flavor = "multi_thread")]
2962 async fn test_check_wrong_to() {
2963 reth_tracing::init_test_tracing();
2964
2965 let config = Discv4Config::builder().external_ip_resolver(None).build();
2966 let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2967 let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2968
2969 let mut ping = Ping {
2971 from: service_1.local_node_record.into(),
2972 to: service_2.local_node_record.into(),
2973 expire: service_1.ping_expiration(),
2974 enr_sq: service_1.enr_seq(),
2975 };
2976 ping.to.address = "192.0.2.0".parse().unwrap();
2977
2978 let echo_hash = service_1.send_packet(Message::Ping(ping), service_2.local_addr());
2979 let ping_request = PingRequest {
2980 sent_at: Instant::now(),
2981 node: service_2.local_node_record,
2982 echo_hash,
2983 reason: PingReason::InitialInsert,
2984 };
2985 service_1.pending_pings.insert(*service_2.local_peer_id(), ping_request);
2986
2987 let event = poll_fn(|cx| service_2.poll(cx)).await;
2989 assert_eq!(event, Discv4Event::Ping);
2990
2991 let event = poll_fn(|cx| service_1.poll(cx)).await;
2993 assert_eq!(event, Discv4Event::Pong);
2994 let event = poll_fn(|cx| service_1.poll(cx)).await;
2996 assert_eq!(event, Discv4Event::Ping);
2997 }
2998
2999 #[tokio::test(flavor = "multi_thread")]
3000 async fn test_check_ping_pong() {
3001 reth_tracing::init_test_tracing();
3002
3003 let config = Discv4Config::builder().external_ip_resolver(None).build();
3004 let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
3005 let (_discv4, mut service_2) = create_discv4_with_config(config).await;
3006
3007 service_1.add_node(service_2.local_node_record);
3009
3010 let event = poll_fn(|cx| service_2.poll(cx)).await;
3012 assert_eq!(event, Discv4Event::Ping);
3013
3014 let key1 = kad_key(*service_1.local_peer_id());
3016 match service_2.kbuckets.entry(&key1) {
3017 kbucket::Entry::Present(_entry, status) => {
3018 assert!(!status.is_connected());
3019 }
3020 _ => unreachable!(),
3021 }
3022
3023 let event = poll_fn(|cx| service_1.poll(cx)).await;
3025 assert_eq!(event, Discv4Event::Pong);
3026
3027 let key2 = kad_key(*service_2.local_peer_id());
3029 match service_1.kbuckets.entry(&key2) {
3030 kbucket::Entry::Present(_entry, status) => {
3031 assert!(status.is_connected());
3032 }
3033 _ => unreachable!(),
3034 }
3035
3036 let event = poll_fn(|cx| service_1.poll(cx)).await;
3038 assert_eq!(event, Discv4Event::Ping);
3039
3040 tokio::time::timeout(Duration::from_secs(5), async {
3044 loop {
3045 let event = poll_fn(|cx| service_2.poll(cx)).await;
3046 match event {
3047 Discv4Event::Pong => break,
3048 Discv4Event::EnrRequest | Discv4Event::FindNode => {}
3049 ev => unreachable!("{ev:?}"),
3050 }
3051 }
3052 })
3053 .await
3054 .expect("timed out waiting for Pong from service_2");
3055
3056 match service_2.kbuckets.entry(&key1) {
3058 kbucket::Entry::Present(_entry, status) => {
3059 assert!(status.is_connected());
3060 }
3061 ev => unreachable!("{ev:?}"),
3062 }
3063 }
3064
3065 #[test]
3066 fn test_insert() {
3067 let local_node_record = rng_record(&mut rand_08::thread_rng());
3068 let mut kbuckets: KBucketsTable<NodeKey, NodeEntry> = KBucketsTable::new(
3069 NodeKey::from(&local_node_record).into(),
3070 Duration::from_secs(60),
3071 MAX_NODES_PER_BUCKET,
3072 None,
3073 None,
3074 );
3075
3076 let new_record = rng_record(&mut rand_08::thread_rng());
3077 let key = kad_key(new_record.id);
3078 match kbuckets.entry(&key) {
3079 kbucket::Entry::Absent(entry) => {
3080 let node = NodeEntry::new(new_record);
3081 let _ = entry.insert(
3082 node,
3083 NodeStatus {
3084 direction: ConnectionDirection::Outgoing,
3085 state: ConnectionState::Disconnected,
3086 },
3087 );
3088 }
3089 _ => {
3090 unreachable!()
3091 }
3092 };
3093 match kbuckets.entry(&key) {
3094 kbucket::Entry::Present(_, _) => {}
3095 _ => {
3096 unreachable!()
3097 }
3098 }
3099 }
3100
3101 #[tokio::test]
3102 async fn test_bootnode_not_in_update_stream() {
3103 reth_tracing::init_test_tracing();
3104 let (_, service_1) = create_discv4().await;
3105 let peerid_1 = *service_1.local_peer_id();
3106
3107 let config = Discv4Config::builder().add_boot_node(service_1.local_node_record).build();
3108 service_1.spawn();
3109
3110 let (_, mut service_2) = create_discv4_with_config(config).await;
3111
3112 let mut updates = service_2.update_stream();
3113
3114 service_2.spawn();
3115
3116 let mut bootnode_appeared = false;
3118 let timeout = tokio::time::sleep(Duration::from_secs(1));
3119 tokio::pin!(timeout);
3120
3121 loop {
3122 tokio::select! {
3123 Some(update) = updates.next() => {
3124 if let DiscoveryUpdate::Added(record) = update
3125 && record.id == peerid_1 {
3126 bootnode_appeared = true;
3127 break;
3128 }
3129 }
3130 _ = &mut timeout => break,
3131 }
3132 }
3133
3134 assert!(bootnode_appeared, "Bootnode should appear in update stream");
3136 }
3137
3138 fn insert_proven_node(service: &mut Discv4Service, record: NodeRecord) {
3139 let key = kad_key(record.id);
3140 let _ = service.kbuckets.insert_or_update(
3141 &key,
3142 NodeEntry::new_proven(record),
3143 NodeStatus {
3144 direction: ConnectionDirection::Incoming,
3145 state: ConnectionState::Connected,
3146 },
3147 );
3148 }
3149
3150 fn insert_initial_ping(service: &mut Discv4Service, record: NodeRecord) -> B256 {
3151 let echo_hash = B256::random();
3152 service.pending_pings.insert(
3153 record.id,
3154 PingRequest {
3155 sent_at: Instant::now(),
3156 node: record,
3157 echo_hash,
3158 reason: PingReason::InitialInsert,
3159 },
3160 );
3161 echo_hash
3162 }
3163
3164 fn make_pong(service: &Discv4Service, echo_hash: B256) -> Pong {
3165 Pong {
3166 to: rng_endpoint(&mut rand_08::thread_rng()),
3167 echo: echo_hash,
3168 expire: service.ping_expiration(),
3169 enr_sq: None,
3170 }
3171 }
3172
3173 #[tokio::test]
3174 async fn test_lookup_reset_on_first_bootnode_pong() {
3175 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
3176 let config = Discv4Config::builder().add_boot_node(record).build();
3177 let (_discv4, mut service) = create_discv4_with_config(config).await;
3178
3179 assert!(service.pending_lookup_reset);
3181
3182 insert_proven_node(&mut service, record);
3184 let echo_hash = insert_initial_ping(&mut service, record);
3185
3186 service.on_pong(make_pong(&service, echo_hash), record.udp_addr(), record.id);
3188
3189 assert!(!service.pending_lookup_reset, "flag should be consumed");
3191 }
3192
3193 #[tokio::test]
3194 async fn test_lookup_reset_fires_only_once() {
3195 let records: Vec<_> = (0..2)
3196 .map(|_| NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random()))
3197 .collect();
3198 let config = Discv4Config::builder().add_boot_nodes(records.clone()).build();
3199 let (_discv4, mut service) = create_discv4_with_config(config).await;
3200
3201 for &r in &records {
3203 insert_proven_node(&mut service, r);
3204 }
3205 let hashes: Vec<_> =
3206 records.iter().map(|r| insert_initial_ping(&mut service, *r)).collect();
3207
3208 service.on_pong(make_pong(&service, hashes[0]), records[0].udp_addr(), records[0].id);
3210 assert!(!service.pending_lookup_reset);
3211
3212 service.on_pong(make_pong(&service, hashes[1]), records[1].udp_addr(), records[1].id);
3214 assert!(!service.pending_lookup_reset);
3215 }
3216
3217 #[tokio::test]
3218 async fn test_lookup_reset_not_triggered_by_non_bootnode() {
3219 let bootnode = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
3220 let config = Discv4Config::builder().add_boot_node(bootnode).build();
3221 let (_discv4, mut service) = create_discv4_with_config(config).await;
3222
3223 assert!(service.pending_lookup_reset);
3224
3225 let stranger = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
3227 insert_proven_node(&mut service, stranger);
3228 let echo_hash = insert_initial_ping(&mut service, stranger);
3229 service.on_pong(make_pong(&service, echo_hash), stranger.udp_addr(), stranger.id);
3230
3231 assert!(service.pending_lookup_reset, "flag should not be consumed by non-bootnode");
3232 }
3233
3234 #[tokio::test]
3235 async fn test_lookup_reset_disabled_when_lookup_disabled() {
3236 let config = Discv4Config::builder().enable_lookup(false).build();
3237 let (_discv4, service) = create_discv4_with_config(config).await;
3238
3239 assert!(!service.pending_lookup_reset);
3241 }
3242}