1#![doc(
20 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
21 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
22 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
23)]
24#![cfg_attr(not(test), warn(unused_crate_dependencies))]
25#![cfg_attr(docsrs, feature(doc_cfg))]
26
27use crate::{
28 error::{DecodePacketError, Discv4Error},
29 proto::{FindNode, Message, Neighbours, Packet, Ping, Pong},
30};
31use alloy_primitives::{bytes::Bytes, hex, B256};
32use discv5::{
33 kbucket,
34 kbucket::{
35 BucketInsertResult, Distance, Entry as BucketEntry, InsertResult, KBucketsTable,
36 NodeStatus, MAX_NODES_PER_BUCKET,
37 },
38 ConnectionDirection, ConnectionState,
39};
40use enr::Enr;
41use itertools::Itertools;
42use parking_lot::Mutex;
43use proto::{EnrRequest, EnrResponse};
44use reth_ethereum_forks::ForkId;
45use reth_network_peers::{pk2id, PeerId};
46use secp256k1::SecretKey;
47use std::{
48 cell::RefCell,
49 collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque},
50 fmt, io,
51 net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
52 pin::Pin,
53 rc::Rc,
54 sync::Arc,
55 task::{ready, Context, Poll},
56 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
57};
58use tokio::{
59 net::UdpSocket,
60 sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::Sender as OneshotSender},
61 task::{JoinHandle, JoinSet},
62 time::Interval,
63};
64use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
65use tracing::{debug, trace};
66
67pub mod error;
68pub mod proto;
69
70mod config;
71pub use config::{Discv4Config, Discv4ConfigBuilder};
72
73mod node;
74use node::{kad_key, NodeKey};
75
76mod table;
77
78pub use reth_network_peers::NodeRecord;
80
81#[cfg(any(test, feature = "test-utils"))]
82pub mod test_utils;
83
84use crate::table::PongTable;
85use reth_net_nat::ResolveNatInterval;
86pub use reth_net_nat::{external_ip, NatResolver};
88
89pub const DEFAULT_DISCOVERY_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
93
94pub const DEFAULT_DISCOVERY_PORT: u16 = 30303;
98
99pub const DEFAULT_DISCOVERY_ADDRESS: SocketAddr =
103 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT));
104
105const MAX_PACKET_SIZE: usize = 1280;
107
108const MIN_PACKET_SIZE: usize = 32 + 65 + 1;
110
111const ALPHA: usize = 3;
113
114const MAX_NODES_PING: usize = 2 * MAX_NODES_PER_BUCKET;
119
120const MAX_QUEUED_PINGS: usize = 2 * MAX_NODES_PER_BUCKET;
128
129const SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS: usize = (MAX_PACKET_SIZE - 109) / 91;
135
136const ENDPOINT_PROOF_EXPIRATION: Duration = Duration::from_secs(24 * 60 * 60);
140
141const EXPIRE_DURATION: Duration = Duration::from_secs(60 * 60);
143
144const UDP_MESSAGE_POLL_LOOP_BUDGET: i32 = 4;
149
150type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>;
151type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>;
152
153pub(crate) type IngressSender = mpsc::Sender<IngressEvent>;
154pub(crate) type IngressReceiver = mpsc::Receiver<IngressEvent>;
155
156type NodeRecordSender = OneshotSender<Vec<NodeRecord>>;
157
158#[derive(Debug, Clone)]
165pub struct Discv4 {
166 local_addr: SocketAddr,
168 to_service: mpsc::UnboundedSender<Discv4Command>,
170 node_record: Arc<Mutex<NodeRecord>>,
174}
175
176impl Discv4 {
177 pub async fn spawn(
181 local_address: SocketAddr,
182 local_enr: NodeRecord,
183 secret_key: SecretKey,
184 config: Discv4Config,
185 ) -> io::Result<Self> {
186 let (discv4, service) = Self::bind(local_address, local_enr, secret_key, config).await?;
187
188 service.spawn();
189
190 Ok(discv4)
191 }
192
193 #[cfg(feature = "test-utils")]
197 pub fn noop() -> Self {
198 let (to_service, _rx) = mpsc::unbounded_channel();
199 let local_addr =
200 (IpAddr::from(std::net::Ipv4Addr::UNSPECIFIED), DEFAULT_DISCOVERY_PORT).into();
201 Self {
202 local_addr,
203 to_service,
204 node_record: Arc::new(Mutex::new(NodeRecord::new(
205 "127.0.0.1:3030".parse().unwrap(),
206 PeerId::random(),
207 ))),
208 }
209 }
210
211 pub async fn bind(
243 local_address: SocketAddr,
244 local_node_record: NodeRecord,
245 secret_key: SecretKey,
246 config: Discv4Config,
247 ) -> io::Result<(Self, Discv4Service)> {
248 let socket = Arc::new(UdpSocket::bind(local_address).await?);
249 trace!(target: "discv4", local_addr=?socket.local_addr(), "opened UDP socket");
250 let (tx, rx) = mpsc::channel(config.udp_ingress_message_buffer);
251
252 Self::bind_with_socket(socket, Some(tx), rx, local_node_record, secret_key, config)
253 }
254
255 pub fn bind_shared(
259 socket: Arc<UdpSocket>,
260 local_node_record: NodeRecord,
261 secret_key: SecretKey,
262 config: Discv4Config,
263 ) -> io::Result<(Self, Discv4Service, IngressHandler)> {
264 let (tx, rx) = mpsc::channel(config.udp_ingress_message_buffer);
265 let local_id = local_node_record.id;
266 let (discv4, service) =
267 Self::bind_with_socket(socket, None, rx, local_node_record, secret_key, config)?;
268
269 let handler = IngressHandler::new(tx, local_id);
270
271 Ok((discv4, service, handler))
272 }
273
274 fn bind_with_socket(
275 socket: Arc<UdpSocket>,
276 ingress_tx: Option<IngressSender>,
277 ingress_rx: IngressReceiver,
278 mut local_node_record: NodeRecord,
279 secret_key: SecretKey,
280 config: Discv4Config,
281 ) -> io::Result<(Self, Discv4Service)> {
282 let local_addr = socket.local_addr()?;
283 local_node_record.udp_port = local_addr.port();
284
285 let mut service = Discv4Service::new(
286 socket,
287 ingress_tx,
288 ingress_rx,
289 local_addr,
290 local_node_record,
291 secret_key,
292 config,
293 );
294
295 service.resolve_external_ip();
297
298 let discv4 = service.handle();
299 Ok((discv4, service))
300 }
301
302 pub const fn local_addr(&self) -> SocketAddr {
304 self.local_addr
305 }
306
307 pub fn node_record(&self) -> NodeRecord {
311 *self.node_record.lock()
312 }
313
314 pub fn external_ip(&self) -> IpAddr {
316 self.node_record.lock().address
317 }
318
319 pub fn set_lookup_interval(&self, duration: Duration) {
321 self.send_to_service(Discv4Command::SetLookupInterval(duration))
322 }
323
324 pub async fn lookup_self(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
339 self.lookup_node(None).await
340 }
341
342 pub async fn lookup(&self, node_id: PeerId) -> Result<Vec<NodeRecord>, Discv4Error> {
346 self.lookup_node(Some(node_id)).await
347 }
348
349 pub async fn lookup_random(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
351 let target = PeerId::random();
352 self.lookup_node(Some(target)).await
353 }
354
355 pub fn send_lookup(&self, node_id: PeerId) {
357 let cmd = Discv4Command::Lookup { node_id: Some(node_id), tx: None };
358 self.send_to_service(cmd);
359 }
360
361 async fn lookup_node(&self, node_id: Option<PeerId>) -> Result<Vec<NodeRecord>, Discv4Error> {
362 let (tx, rx) = oneshot::channel();
363 let cmd = Discv4Command::Lookup { node_id, tx: Some(tx) };
364 self.to_service.send(cmd)?;
365 Ok(rx.await?)
366 }
367
368 pub fn send_lookup_self(&self) {
370 let cmd = Discv4Command::Lookup { node_id: None, tx: None };
371 self.send_to_service(cmd);
372 }
373
374 pub fn remove_peer(&self, node_id: PeerId) {
376 let cmd = Discv4Command::Remove(node_id);
377 self.send_to_service(cmd);
378 }
379
380 pub fn add_node(&self, node_record: NodeRecord) {
382 let cmd = Discv4Command::Add(node_record);
383 self.send_to_service(cmd);
384 }
385
386 pub fn add_boot_node(&self, node_record: NodeRecord) {
391 let cmd = Discv4Command::AddBootNode(node_record);
392 self.send_to_service(cmd);
393 }
394
395 pub fn ban(&self, node_id: PeerId, ip: IpAddr) {
399 let cmd = Discv4Command::Ban(node_id, ip);
400 self.send_to_service(cmd);
401 }
402
403 pub fn ban_ip(&self, ip: IpAddr) {
407 let cmd = Discv4Command::BanIp(ip);
408 self.send_to_service(cmd);
409 }
410
411 pub fn ban_node(&self, node_id: PeerId) {
415 let cmd = Discv4Command::BanPeer(node_id);
416 self.send_to_service(cmd);
417 }
418
419 pub fn set_tcp_port(&self, port: u16) {
423 let cmd = Discv4Command::SetTcpPort(port);
424 self.send_to_service(cmd);
425 }
426
427 pub fn set_eip868_rlp_pair(&self, key: Vec<u8>, rlp: Bytes) {
433 let cmd = Discv4Command::SetEIP868RLPPair { key, rlp };
434 self.send_to_service(cmd);
435 }
436
437 pub fn set_eip868_rlp(&self, key: Vec<u8>, value: impl alloy_rlp::Encodable) {
441 self.set_eip868_rlp_pair(key, Bytes::from(alloy_rlp::encode(&value)))
442 }
443
444 #[inline]
445 fn send_to_service(&self, cmd: Discv4Command) {
446 let _ = self.to_service.send(cmd).map_err(|err| {
447 debug!(
448 target: "discv4",
449 %err,
450 "channel capacity reached, dropping command",
451 )
452 });
453 }
454
455 pub async fn update_stream(&self) -> Result<ReceiverStream<DiscoveryUpdate>, Discv4Error> {
457 let (tx, rx) = oneshot::channel();
458 let cmd = Discv4Command::Updates(tx);
459 self.to_service.send(cmd)?;
460 Ok(rx.await?)
461 }
462
463 pub fn terminate(&self) {
465 self.send_to_service(Discv4Command::Terminated);
466 }
467}
468
469#[must_use = "Stream does nothing unless polled"]
483pub struct Discv4Service {
484 local_address: SocketAddr,
486 local_eip_868_enr: Enr<SecretKey>,
488 local_node_record: NodeRecord,
490 shared_node_record: Arc<Mutex<NodeRecord>>,
492 secret_key: SecretKey,
494 _socket: Arc<UdpSocket>,
496 _tasks: JoinSet<()>,
500 kbuckets: KBucketsTable<NodeKey, NodeEntry>,
502 ingress: IngressReceiver,
506 egress: EgressSender,
510 queued_pings: VecDeque<(NodeRecord, PingReason)>,
517 pending_pings: HashMap<PeerId, PingRequest>,
519 pending_lookup: HashMap<PeerId, (Instant, LookupContext)>,
524 pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
526 pending_enr_requests: HashMap<PeerId, EnrRequestState>,
528 to_service: mpsc::UnboundedSender<Discv4Command>,
530 commands_rx: mpsc::UnboundedReceiver<Discv4Command>,
532 update_listeners: Vec<mpsc::Sender<DiscoveryUpdate>>,
534 lookup_interval: Interval,
536 lookup_rotator: LookupTargetRotator,
538 pending_lookup_reset: bool,
540 evict_expired_requests_interval: Interval,
542 ping_interval: Interval,
544 resolve_external_ip_interval: Option<ResolveNatInterval>,
546 config: Discv4Config,
548 queued_events: VecDeque<Discv4Event>,
550 received_pongs: PongTable,
552 expire_interval: Interval,
554 cached_find_node: Option<CachedFindNode>,
556}
557
558impl Discv4Service {
559 pub(crate) fn new(
564 socket: Arc<UdpSocket>,
565 ingress_tx: Option<IngressSender>,
566 ingress_rx: IngressReceiver,
567 local_address: SocketAddr,
568 local_node_record: NodeRecord,
569 secret_key: SecretKey,
570 config: Discv4Config,
571 ) -> Self {
572 let (egress_tx, egress_rx) = mpsc::channel(config.udp_egress_message_buffer);
573 let mut tasks = JoinSet::<()>::new();
574
575 if let Some(ingress_tx) = ingress_tx {
576 let udp = Arc::clone(&socket);
577 tasks.spawn(receive_loop(udp, ingress_tx, local_node_record.id));
578 }
579
580 let udp = Arc::clone(&socket);
581 tasks.spawn(send_loop(udp, egress_rx));
582
583 let kbuckets = KBucketsTable::new(
584 NodeKey::from(&local_node_record).into(),
585 Duration::from_secs(60),
586 MAX_NODES_PER_BUCKET,
587 None,
588 None,
589 );
590
591 let self_lookup_interval = tokio::time::interval(config.lookup_interval);
592
593 let ping_interval = tokio::time::interval_at(
596 tokio::time::Instant::now() + config.ping_interval,
597 config.ping_interval,
598 );
599
600 let evict_expired_requests_interval = tokio::time::interval_at(
601 tokio::time::Instant::now() + config.request_timeout,
602 config.request_timeout,
603 );
604
605 let lookup_rotator = if config.enable_dht_random_walk {
606 LookupTargetRotator::default()
607 } else {
608 LookupTargetRotator::local_only()
609 };
610
611 let local_eip_868_enr = {
613 let mut builder = Enr::builder();
614 builder.ip(local_node_record.address);
615 if local_node_record.address.is_ipv4() {
616 builder.udp4(local_node_record.udp_port);
617 builder.tcp4(local_node_record.tcp_port);
618 } else {
619 builder.udp6(local_node_record.udp_port);
620 builder.tcp6(local_node_record.tcp_port);
621 }
622
623 for (key, val) in &config.additional_eip868_rlp_pairs {
624 builder.add_value_rlp(key, val.clone());
625 }
626 builder.build(&secret_key).expect("v4 is set")
627 };
628
629 let (to_service, commands_rx) = mpsc::unbounded_channel();
630
631 let shared_node_record = Arc::new(Mutex::new(local_node_record));
632
633 Self {
634 local_address,
635 local_eip_868_enr,
636 local_node_record,
637 shared_node_record,
638 _socket: socket,
639 kbuckets,
640 secret_key,
641 _tasks: tasks,
642 ingress: ingress_rx,
643 egress: egress_tx,
644 queued_pings: VecDeque::with_capacity(MAX_QUEUED_PINGS),
645 pending_pings: Default::default(),
646 pending_lookup: Default::default(),
647 pending_find_nodes: Default::default(),
648 pending_enr_requests: Default::default(),
649 commands_rx,
650 to_service,
651 update_listeners: Vec::with_capacity(1),
652 lookup_interval: self_lookup_interval,
653 ping_interval,
654 evict_expired_requests_interval,
655 lookup_rotator,
656 pending_lookup_reset: config.enable_lookup,
657 resolve_external_ip_interval: config.resolve_external_ip_interval(),
658 config,
659 queued_events: Default::default(),
660 received_pongs: Default::default(),
661 expire_interval: tokio::time::interval(EXPIRE_DURATION),
662 cached_find_node: None,
663 }
664 }
665
666 pub fn handle(&self) -> Discv4 {
668 Discv4 {
669 local_addr: self.local_address,
670 to_service: self.to_service.clone(),
671 node_record: self.shared_node_record.clone(),
672 }
673 }
674
675 fn enr_seq(&self) -> Option<u64> {
677 self.config.enable_eip868.then(|| self.local_eip_868_enr.seq())
678 }
679
680 pub fn set_lookup_interval(&mut self, duration: Duration) {
682 self.lookup_interval = tokio::time::interval(duration);
683 }
684
685 fn resolve_external_ip(&mut self) {
689 if let Some(r) = &self.resolve_external_ip_interval &&
690 let Some(external_ip) =
691 r.resolver().clone().as_external_ip(self.local_node_record.udp_port)
692 {
693 self.set_external_ip_addr(external_ip);
694 }
695 }
696
697 pub fn set_external_ip_addr(&mut self, external_ip: IpAddr) {
700 if self.local_node_record.address != external_ip {
701 debug!(target: "discv4", ?external_ip, "Updating external ip");
702 self.local_node_record.address = external_ip;
703 let _ = self.local_eip_868_enr.set_ip(external_ip, &self.secret_key);
704 let mut lock = self.shared_node_record.lock();
705 *lock = self.local_node_record;
706 debug!(target: "discv4", enr=?self.local_eip_868_enr, "Updated local ENR");
707 }
708 }
709
710 pub const fn local_peer_id(&self) -> &PeerId {
712 &self.local_node_record.id
713 }
714
715 pub const fn local_addr(&self) -> SocketAddr {
717 self.local_address
718 }
719
720 pub const fn local_enr(&self) -> NodeRecord {
724 self.local_node_record
725 }
726
727 #[cfg(test)]
729 pub const fn local_enr_mut(&mut self) -> &mut NodeRecord {
730 &mut self.local_node_record
731 }
732
733 pub fn contains_node(&self, id: PeerId) -> bool {
735 let key = kad_key(id);
736 self.kbuckets.get_index(&key).is_some()
737 }
738
739 pub fn bootstrap(&mut self) {
751 for record in self.config.bootstrap_nodes.clone() {
752 debug!(target: "discv4", ?record, "pinging boot node");
753 let key = kad_key(record.id);
754 let entry = NodeEntry::new(record);
755
756 match self.kbuckets.insert_or_update(
758 &key,
759 entry,
760 NodeStatus {
761 state: ConnectionState::Disconnected,
762 direction: ConnectionDirection::Outgoing,
763 },
764 ) {
765 InsertResult::Failed(_) => {}
766 _ => {
767 self.try_ping(record, PingReason::InitialInsert);
768 }
769 }
770 }
771 }
772
773 pub fn add_boot_node(&mut self, record: NodeRecord) -> bool {
778 self.config.bootstrap_nodes.insert(record);
779 self.add_node(record)
780 }
781
782 pub fn spawn(mut self) -> JoinHandle<()> {
786 tokio::task::spawn(async move {
787 self.bootstrap();
788
789 while let Some(event) = self.next().await {
790 trace!(target: "discv4", ?event, "processed");
791 }
792 trace!(target: "discv4", "service terminated");
793 })
794 }
795
796 pub fn update_stream(&mut self) -> ReceiverStream<DiscoveryUpdate> {
798 let (tx, rx) = mpsc::channel(512);
799 self.update_listeners.push(tx);
800 ReceiverStream::new(rx)
801 }
802
803 pub fn lookup_self(&mut self) {
805 self.lookup(self.local_node_record.id)
806 }
807
808 pub fn lookup(&mut self, target: PeerId) {
818 self.lookup_with(target, None)
819 }
820
821 fn lookup_with(&mut self, target: PeerId, tx: Option<NodeRecordSender>) {
831 trace!(target: "discv4", ?target, "Starting lookup");
832 let target_key = kad_key(target);
833
834 let ctx = LookupContext::new(
837 target_key.clone(),
838 self.kbuckets
839 .closest_values(&target_key)
840 .filter(|node| {
841 node.value.has_endpoint_proof &&
842 !self.pending_find_nodes.contains_key(&node.key.preimage().0)
843 })
844 .take(MAX_NODES_PER_BUCKET)
845 .map(|n| (target_key.distance(&n.key), n.value.record)),
846 tx,
847 );
848
849 let closest = ctx.closest(ALPHA);
851
852 if closest.is_empty() && self.pending_find_nodes.is_empty() {
853 self.bootstrap();
858 return
859 }
860
861 trace!(target: "discv4", ?target, num = closest.len(), "Start lookup closest nodes");
862
863 for node in closest {
864 self.find_node_checked(&node, ctx.clone());
868 }
869 }
870
871 fn find_node(&mut self, node: &NodeRecord, ctx: LookupContext) {
875 trace!(target: "discv4", ?node, lookup=?ctx.target(), "Sending FindNode");
876 ctx.mark_queried(node.id);
877 let (payload, hash) = self.find_node_packet(ctx.target());
878 let to = node.udp_addr();
879 trace!(target: "discv4", ?to, ?hash, "sending FindNode packet");
880 let _ = self.egress.try_send((payload, to)).map_err(|err| {
881 debug!(target: "discv4", %err, "dropped outgoing packet");
882 });
883 self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx));
884 }
885
886 fn find_node_checked(&mut self, node: &NodeRecord, ctx: LookupContext) {
891 let max_failures = self.config.max_find_node_failures;
892 let needs_ping = self
893 .on_entry(node.id, |entry| entry.exceeds_find_node_failures(max_failures))
894 .unwrap_or(true);
895 if needs_ping {
896 self.try_ping(*node, PingReason::Lookup(*node, ctx))
897 } else {
898 self.find_node(node, ctx)
899 }
900 }
901
902 fn notify(&mut self, update: DiscoveryUpdate) {
906 self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) {
907 Ok(()) => true,
908 Err(err) => match err {
909 TrySendError::Full(_) => true,
910 TrySendError::Closed(_) => false,
911 },
912 });
913 }
914
915 pub fn ban_ip(&mut self, ip: IpAddr) {
917 self.config.ban_list.ban_ip(ip);
918 }
919
920 pub fn ban_node(&mut self, node_id: PeerId) {
922 self.remove_node(node_id);
923 self.config.ban_list.ban_peer(node_id);
924 }
925
926 pub fn ban_ip_until(&mut self, ip: IpAddr, until: Instant) {
928 self.config.ban_list.ban_ip_until(ip, until);
929 }
930
931 pub fn ban_node_until(&mut self, node_id: PeerId, until: Instant) {
933 self.remove_node(node_id);
934 self.config.ban_list.ban_peer_until(node_id, until);
935 }
936
937 pub fn remove_node(&mut self, node_id: PeerId) -> bool {
942 let key = kad_key(node_id);
943 self.remove_key(node_id, key)
944 }
945
946 pub fn soft_remove_node(&mut self, node_id: PeerId) -> bool {
951 let key = kad_key(node_id);
952 let Some(bucket) = self.kbuckets.get_bucket(&key) else { return false };
953 if bucket.num_entries() < MAX_NODES_PER_BUCKET / 2 {
954 return false
956 }
957 self.remove_key(node_id, key)
958 }
959
960 fn remove_key(&mut self, node_id: PeerId, key: discv5::Key<NodeKey>) -> bool {
961 let removed = self.kbuckets.remove(&key);
962 if removed {
963 trace!(target: "discv4", ?node_id, "removed node");
964 self.notify(DiscoveryUpdate::Removed(node_id));
965 }
966 removed
967 }
968
969 pub fn num_connected(&self) -> usize {
971 self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected())
972 }
973
974 fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool {
976 if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) &&
977 timestamp.elapsed() < self.config.bond_expiration
978 {
979 return true
980 }
981 false
982 }
983
984 fn on_entry<F, R>(&mut self, peer_id: PeerId, f: F) -> Option<R>
986 where
987 F: FnOnce(&NodeEntry) -> R,
988 {
989 let key = kad_key(peer_id);
990 match self.kbuckets.entry(&key) {
991 BucketEntry::Present(entry, _) => Some(f(entry.value())),
992 BucketEntry::Pending(entry, _) => Some(f(entry.value())),
993 _ => None,
994 }
995 }
996
997 fn update_on_reping(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
1004 if record.id == self.local_node_record.id {
1005 return
1006 }
1007
1008 if !self.config.enable_eip868 {
1010 last_enr_seq = None;
1011 }
1012
1013 let key = kad_key(record.id);
1014 let old_enr = match self.kbuckets.entry(&key) {
1015 kbucket::Entry::Present(mut entry, _) => {
1016 entry.value_mut().update_with_enr(last_enr_seq)
1017 }
1018 kbucket::Entry::Pending(mut entry, _) => {
1019 entry.value_mut().update_with_enr(last_enr_seq)
1020 }
1021 _ => return,
1022 };
1023
1024 match (last_enr_seq, old_enr) {
1026 (Some(new), Some(old)) if new > old => {
1027 self.send_enr_request(record);
1028 }
1029 (Some(_), None) => {
1030 self.send_enr_request(record);
1032 }
1033 _ => {}
1034 };
1035 }
1036
1037 fn update_on_pong(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
1039 if record.id == *self.local_peer_id() {
1040 return
1041 }
1042
1043 if !self.config.enable_eip868 {
1045 last_enr_seq = None;
1046 }
1047
1048 let has_enr_seq = last_enr_seq.is_some();
1051
1052 let key = kad_key(record.id);
1053 match self.kbuckets.entry(&key) {
1054 kbucket::Entry::Present(mut entry, old_status) => {
1055 entry.value_mut().establish_proof();
1057 entry.value_mut().update_with_enr(last_enr_seq);
1058
1059 if !old_status.is_connected() {
1060 let _ = entry.update(ConnectionState::Connected, Some(old_status.direction));
1061 trace!(target: "discv4", ?record, "added after successful endpoint proof");
1062 self.notify(DiscoveryUpdate::Added(record));
1063
1064 if has_enr_seq {
1065 self.send_enr_request(record);
1067 }
1068 }
1069 }
1070 kbucket::Entry::Pending(mut entry, mut status) => {
1071 entry.value_mut().establish_proof();
1073 entry.value_mut().update_with_enr(last_enr_seq);
1074
1075 if !status.is_connected() {
1076 status.state = ConnectionState::Connected;
1077 let _ = entry.update(status);
1078 trace!(target: "discv4", ?record, "added after successful endpoint proof");
1079 self.notify(DiscoveryUpdate::Added(record));
1080
1081 if has_enr_seq {
1082 self.send_enr_request(record);
1084 }
1085 }
1086 }
1087 _ => {}
1088 };
1089 }
1090
1091 pub fn add_all_nodes(&mut self, records: impl IntoIterator<Item = NodeRecord>) {
1095 for record in records {
1096 self.add_node(record);
1097 }
1098 }
1099
1100 pub fn add_node(&mut self, record: NodeRecord) -> bool {
1106 let key = kad_key(record.id);
1107 match self.kbuckets.entry(&key) {
1108 kbucket::Entry::Absent(entry) => {
1109 let node = NodeEntry::new(record);
1110 match entry.insert(
1111 node,
1112 NodeStatus {
1113 direction: ConnectionDirection::Outgoing,
1114 state: ConnectionState::Disconnected,
1115 },
1116 ) {
1117 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1118 trace!(target: "discv4", ?record, "inserted new record");
1119 }
1120 _ => return false,
1121 }
1122 }
1123 _ => return false,
1124 }
1125
1126 self.try_ping(record, PingReason::InitialInsert);
1128 true
1129 }
1130
1131 pub(crate) fn send_packet(&self, msg: Message, to: SocketAddr) -> B256 {
1133 let (payload, hash) = msg.encode(&self.secret_key);
1134 trace!(target: "discv4", r#type=?msg.msg_type(), ?to, ?hash, "sending packet");
1135 let _ = self.egress.try_send((payload, to)).map_err(|err| {
1136 debug!(
1137 target: "discv4",
1138 %err,
1139 "dropped outgoing packet",
1140 );
1141 });
1142 hash
1143 }
1144
1145 fn find_node_packet(&mut self, target: PeerId) -> (Bytes, B256) {
1147 let expire = self.find_node_expiration();
1148 let cache_ttl = self.config.request_timeout / 4;
1149 CachedFindNode::get_or_sign(
1150 &mut self.cached_find_node,
1151 target,
1152 cache_ttl,
1153 &self.secret_key,
1154 expire,
1155 )
1156 }
1157
1158 fn on_ping(&mut self, ping: Ping, remote_addr: SocketAddr, remote_id: PeerId, hash: B256) {
1160 if self.is_expired(ping.expire) {
1161 return
1163 }
1164
1165 let record = NodeRecord {
1167 address: remote_addr.ip(),
1168 udp_port: remote_addr.port(),
1169 tcp_port: ping.from.tcp_port,
1170 id: remote_id,
1171 }
1172 .into_ipv4_mapped();
1173
1174 let key = kad_key(record.id);
1175
1176 let mut is_new_insert = false;
1183 let mut needs_bond = false;
1184 let mut is_proven = false;
1185
1186 let old_enr = match self.kbuckets.entry(&key) {
1187 kbucket::Entry::Present(mut entry, _) => {
1188 if entry.value().is_expired() {
1189 needs_bond = true;
1192 } else {
1193 is_proven = entry.value().has_endpoint_proof;
1194 }
1195 entry.value_mut().update_with_enr(ping.enr_sq)
1196 }
1197 kbucket::Entry::Pending(mut entry, _) => {
1198 if entry.value().is_expired() {
1199 needs_bond = true;
1202 } else {
1203 is_proven = entry.value().has_endpoint_proof;
1204 }
1205 entry.value_mut().update_with_enr(ping.enr_sq)
1206 }
1207 kbucket::Entry::Absent(entry) => {
1208 let mut node = NodeEntry::new(record);
1209 node.last_enr_seq = ping.enr_sq;
1210
1211 match entry.insert(
1212 node,
1213 NodeStatus {
1214 direction: ConnectionDirection::Incoming,
1215 state: ConnectionState::Disconnected,
1217 },
1218 ) {
1219 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1220 is_new_insert = true;
1222 }
1223 BucketInsertResult::Full => {
1224 trace!(target: "discv4", ?record, "discovered new record but bucket is full");
1228 self.notify(DiscoveryUpdate::DiscoveredAtCapacity(record));
1229 needs_bond = true;
1230 }
1231 BucketInsertResult::TooManyIncoming | BucketInsertResult::NodeExists => {
1232 needs_bond = true;
1233 }
1235 BucketInsertResult::FailedFilter => return,
1236 }
1237
1238 None
1239 }
1240 kbucket::Entry::SelfEntry => return,
1241 };
1242
1243 let pong = Message::Pong(Pong {
1246 to: record.into(),
1248 echo: hash,
1249 expire: ping.expire,
1250 enr_sq: self.enr_seq(),
1251 });
1252 self.send_packet(pong, remote_addr);
1253
1254 if is_new_insert {
1256 self.try_ping(record, PingReason::InitialInsert);
1257 } else if needs_bond {
1258 self.try_ping(record, PingReason::EstablishBond);
1259 } else if is_proven {
1260 if let Some((_, ctx)) = self.pending_lookup.remove(&record.id) {
1264 if self.pending_find_nodes.contains_key(&record.id) {
1265 ctx.unmark_queried(record.id);
1268 } else {
1269 self.find_node(&record, ctx);
1272 }
1273 }
1274 } else {
1275 match (ping.enr_sq, old_enr) {
1277 (Some(new), Some(old)) if new > old => {
1278 self.send_enr_request(record);
1279 }
1280 (Some(_), None) => {
1281 self.send_enr_request(record);
1282 }
1283 _ => {}
1284 };
1285 }
1286 }
1287
1288 fn try_ping(&mut self, node: NodeRecord, reason: PingReason) {
1290 if node.id == *self.local_peer_id() {
1291 return
1293 }
1294
1295 if self.pending_pings.contains_key(&node.id) ||
1296 self.pending_find_nodes.contains_key(&node.id)
1297 {
1298 return
1299 }
1300
1301 if self.queued_pings.iter().any(|(n, _)| n.id == node.id) {
1302 return
1303 }
1304
1305 if self.pending_pings.len() < MAX_NODES_PING {
1306 self.send_ping(node, reason);
1307 } else if self.queued_pings.len() < MAX_QUEUED_PINGS {
1308 self.queued_pings.push_back((node, reason));
1309 }
1310 }
1311
1312 pub(crate) fn send_ping(&mut self, node: NodeRecord, reason: PingReason) -> B256 {
1316 let remote_addr = node.udp_addr();
1317 let id = node.id;
1318 let ping = Ping {
1319 from: self.local_node_record.into(),
1320 to: node.into(),
1321 expire: self.ping_expiration(),
1322 enr_sq: self.enr_seq(),
1323 };
1324 trace!(target: "discv4", ?ping, "sending ping");
1325 let echo_hash = self.send_packet(Message::Ping(ping), remote_addr);
1326
1327 self.pending_pings
1328 .insert(id, PingRequest { sent_at: Instant::now(), node, echo_hash, reason });
1329 echo_hash
1330 }
1331
1332 pub(crate) fn send_enr_request(&mut self, node: NodeRecord) {
1336 if !self.config.enable_eip868 {
1337 return
1338 }
1339 let remote_addr = node.udp_addr();
1340 let enr_request = EnrRequest { expire: self.enr_request_expiration() };
1341
1342 trace!(target: "discv4", ?enr_request, "sending enr request");
1343 let echo_hash = self.send_packet(Message::EnrRequest(enr_request), remote_addr);
1344
1345 self.pending_enr_requests
1346 .insert(node.id, EnrRequestState { sent_at: Instant::now(), echo_hash });
1347 }
1348
1349 fn on_pong(&mut self, pong: Pong, remote_addr: SocketAddr, remote_id: PeerId) {
1351 if self.is_expired(pong.expire) {
1352 return
1353 }
1354
1355 let PingRequest { node, reason, .. } = match self.pending_pings.entry(remote_id) {
1356 Entry::Occupied(entry) => {
1357 {
1358 let request = entry.get();
1359 if request.echo_hash != pong.echo {
1360 trace!(target: "discv4", from=?remote_addr, expected=?request.echo_hash, echo_hash=?pong.echo,"Got unexpected Pong");
1361 return
1362 }
1363 }
1364 entry.remove()
1365 }
1366 Entry::Vacant(_) => return,
1367 };
1368
1369 self.received_pongs.on_pong(remote_id, remote_addr.ip());
1371
1372 match reason {
1373 PingReason::InitialInsert => {
1374 self.update_on_pong(node, pong.enr_sq);
1375 if self.pending_lookup_reset && self.config.bootstrap_nodes.contains(&node) {
1378 self.pending_lookup_reset = false;
1379 self.lookup_interval.reset();
1380 }
1381 }
1382 PingReason::EstablishBond => {
1383 self.update_on_pong(node, pong.enr_sq);
1385 }
1386 PingReason::RePing => {
1387 self.update_on_reping(node, pong.enr_sq);
1388 }
1389 PingReason::Lookup(node, ctx) => {
1390 self.update_on_pong(node, pong.enr_sq);
1391 self.pending_lookup.insert(node.id, (Instant::now(), ctx));
1396 }
1397 }
1398 }
1399
1400 fn on_find_node(&mut self, msg: FindNode, remote_addr: SocketAddr, node_id: PeerId) {
1402 if self.is_expired(msg.expire) {
1403 return
1405 }
1406 if node_id == *self.local_peer_id() {
1407 return
1409 }
1410
1411 if self.has_bond(node_id, remote_addr.ip()) {
1412 self.respond_closest(msg.id, remote_addr)
1413 }
1414 }
1415
1416 fn on_enr_response(&mut self, msg: EnrResponse, remote_addr: SocketAddr, id: PeerId) {
1418 trace!(target: "discv4", ?remote_addr, ?msg, "received ENR response");
1419 if let Some(resp) = self.pending_enr_requests.remove(&id) {
1420 let enr_id = pk2id(&msg.enr.public_key());
1422 if id != enr_id {
1423 return
1424 }
1425
1426 if resp.echo_hash == msg.request_hash {
1427 let key = kad_key(id);
1428 let fork_id = msg.eth_fork_id();
1429 let (record, old_fork_id) = match self.kbuckets.entry(&key) {
1430 kbucket::Entry::Present(mut entry, _) => {
1431 let id = entry.value_mut().update_with_fork_id(fork_id);
1432 (entry.value().record, id)
1433 }
1434 kbucket::Entry::Pending(mut entry, _) => {
1435 let id = entry.value_mut().update_with_fork_id(fork_id);
1436 (entry.value().record, id)
1437 }
1438 _ => return,
1439 };
1440 match (fork_id, old_fork_id) {
1441 (Some(new), Some(old)) if new != old => {
1442 self.notify(DiscoveryUpdate::EnrForkId(record, new))
1443 }
1444 (Some(new), None) => self.notify(DiscoveryUpdate::EnrForkId(record, new)),
1445 _ => {}
1446 }
1447 }
1448 }
1449 }
1450
1451 fn on_enr_request(
1453 &self,
1454 msg: EnrRequest,
1455 remote_addr: SocketAddr,
1456 id: PeerId,
1457 request_hash: B256,
1458 ) {
1459 if !self.config.enable_eip868 || self.is_expired(msg.expire) {
1460 return
1461 }
1462
1463 if self.has_bond(id, remote_addr.ip()) {
1464 self.send_packet(
1465 Message::EnrResponse(EnrResponse {
1466 request_hash,
1467 enr: self.local_eip_868_enr.clone(),
1468 }),
1469 remote_addr,
1470 );
1471 }
1472 }
1473
1474 fn on_neighbours(&mut self, msg: Neighbours, remote_addr: SocketAddr, node_id: PeerId) {
1477 if self.is_expired(msg.expire) {
1478 return
1480 }
1481 let ctx = match self.pending_find_nodes.entry(node_id) {
1483 Entry::Occupied(mut entry) => {
1484 {
1485 let request = entry.get_mut();
1486 request.answered = true;
1488 let total = request.response_count + msg.nodes.len();
1489
1490 if total <= MAX_NODES_PER_BUCKET {
1492 request.response_count = total;
1493 } else {
1494 trace!(target: "discv4", total, from=?remote_addr, "Received neighbors packet entries exceeds max nodes per bucket");
1495 return
1496 }
1497 };
1498
1499 if entry.get().response_count == MAX_NODES_PER_BUCKET {
1500 let ctx = entry.remove().lookup_context;
1502 ctx.mark_responded(node_id);
1503 ctx
1504 } else {
1505 entry.get().lookup_context.clone()
1506 }
1507 }
1508 Entry::Vacant(_) => {
1509 trace!(target: "discv4", from=?remote_addr, "Received unsolicited Neighbours");
1511 return
1512 }
1513 };
1514
1515 trace!(target: "discv4",
1517 target=format!("{:#?}", node_id),
1518 peers_count=msg.nodes.len(),
1519 peers=format!("[{:#}]", msg.nodes.iter()
1520 .map(|node_rec| node_rec.id
1521 ).format(", ")),
1522 "Received peers from Neighbours packet"
1523 );
1524
1525 for node in msg.nodes.into_iter().map(NodeRecord::into_ipv4_mapped) {
1528 if self.config.ban_list.is_banned(&node.id, &node.address) {
1530 trace!(target: "discv4", peer_id=?node.id, ip=?node.address, "ignoring banned record");
1531 continue
1532 }
1533
1534 ctx.add_node(node);
1535 }
1536
1537 let closest =
1539 ctx.filter_closest(ALPHA, |node| !self.pending_find_nodes.contains_key(&node.id));
1540
1541 for closest in closest {
1542 let key = kad_key(closest.id);
1543 match self.kbuckets.entry(&key) {
1544 BucketEntry::Absent(entry) => {
1545 ctx.mark_queried(closest.id);
1551 let node = NodeEntry::new(closest);
1552 match entry.insert(
1553 node,
1554 NodeStatus {
1555 direction: ConnectionDirection::Outgoing,
1556 state: ConnectionState::Disconnected,
1557 },
1558 ) {
1559 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1560 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1562 }
1563 BucketInsertResult::Full => {
1564 self.notify(DiscoveryUpdate::DiscoveredAtCapacity(closest))
1566 }
1567 _ => {}
1568 }
1569 }
1570 BucketEntry::SelfEntry => {
1571 }
1573 BucketEntry::Present(entry, _) => {
1574 if entry.value().has_endpoint_proof {
1575 if entry
1576 .value()
1577 .exceeds_find_node_failures(self.config.max_find_node_failures)
1578 {
1579 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1580 } else {
1581 self.find_node(&closest, ctx.clone());
1582 }
1583 }
1584 }
1585 BucketEntry::Pending(entry, _) => {
1586 if entry.value().has_endpoint_proof {
1587 if entry
1588 .value()
1589 .exceeds_find_node_failures(self.config.max_find_node_failures)
1590 {
1591 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1592 } else {
1593 self.find_node(&closest, ctx.clone());
1594 }
1595 }
1596 }
1597 }
1598 }
1599 }
1600
1601 fn respond_closest(&mut self, target: PeerId, to: SocketAddr) {
1603 let key = kad_key(target);
1604 let expire = self.send_neighbours_expiration();
1605
1606 let closest_nodes =
1608 self.kbuckets.closest_values(&key).take(MAX_NODES_PER_BUCKET).collect::<Vec<_>>();
1609
1610 for nodes in closest_nodes.chunks(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) {
1611 let nodes = nodes.iter().map(|node| node.value.record).collect::<Vec<NodeRecord>>();
1612 trace!(target: "discv4", len = nodes.len(), to=?to,"Sent neighbours packet");
1613 let msg = Message::Neighbours(Neighbours { nodes, expire });
1614 self.send_packet(msg, to);
1615 }
1616 }
1617
1618 fn evict_expired_requests(&mut self, now: Instant) {
1619 self.pending_enr_requests.retain(|_node_id, enr_request| {
1620 now.duration_since(enr_request.sent_at) < self.config.enr_expiration
1621 });
1622
1623 let mut failed_pings = Vec::new();
1624 self.pending_pings.retain(|node_id, ping_request| {
1625 if now.duration_since(ping_request.sent_at) > self.config.ping_expiration {
1626 failed_pings.push(*node_id);
1627 return false
1628 }
1629 true
1630 });
1631
1632 if !failed_pings.is_empty() {
1633 trace!(target: "discv4", num=%failed_pings.len(), "evicting nodes due to failed pong");
1635 for node_id in failed_pings {
1636 self.remove_node(node_id);
1637 }
1638 }
1639
1640 let mut failed_lookups = Vec::new();
1641 self.pending_lookup.retain(|node_id, (lookup_sent_at, _)| {
1642 if now.duration_since(*lookup_sent_at) > self.config.request_timeout {
1643 failed_lookups.push(*node_id);
1644 return false
1645 }
1646 true
1647 });
1648
1649 if !failed_lookups.is_empty() {
1650 trace!(target: "discv4", num=%failed_lookups.len(), "evicting nodes due to failed lookup");
1652 for node_id in failed_lookups {
1653 self.remove_node(node_id);
1654 }
1655 }
1656
1657 self.evict_failed_find_nodes(now);
1658 }
1659
1660 fn evict_failed_find_nodes(&mut self, now: Instant) {
1662 let mut failed_find_nodes = Vec::new();
1663 self.pending_find_nodes.retain(|node_id, find_node_request| {
1664 if now.duration_since(find_node_request.sent_at) > self.config.neighbours_expiration {
1665 if !find_node_request.answered {
1666 failed_find_nodes.push(*node_id);
1669 }
1670 return false
1671 }
1672 true
1673 });
1674
1675 if failed_find_nodes.is_empty() {
1676 return
1677 }
1678
1679 trace!(target: "discv4", num=%failed_find_nodes.len(), "processing failed find nodes");
1680
1681 for node_id in failed_find_nodes {
1682 let key = kad_key(node_id);
1683 let failures = match self.kbuckets.entry(&key) {
1684 kbucket::Entry::Present(mut entry, _) => {
1685 entry.value_mut().inc_failed_request();
1686 entry.value().find_node_failures
1687 }
1688 kbucket::Entry::Pending(mut entry, _) => {
1689 entry.value_mut().inc_failed_request();
1690 entry.value().find_node_failures
1691 }
1692 _ => continue,
1693 };
1694
1695 if failures > self.config.max_find_node_failures {
1699 self.soft_remove_node(node_id);
1700 }
1701 }
1702 }
1703
1704 fn re_ping_oldest(&mut self) {
1709 let mut nodes = self
1710 .kbuckets
1711 .iter_ref()
1712 .filter(|entry| entry.node.value.is_expired())
1713 .map(|n| n.node.value)
1714 .collect::<Vec<_>>();
1715 nodes.sort_unstable_by_key(|a| a.last_seen);
1716 let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::<Vec<_>>();
1717 for node in to_ping {
1718 self.try_ping(node, PingReason::RePing)
1719 }
1720 }
1721
1722 fn is_expired(&self, expiration: u64) -> bool {
1724 self.ensure_not_expired(expiration).is_err()
1725 }
1726
1727 fn ensure_not_expired(&self, timestamp: u64) -> Result<(), ()> {
1737 let _ = i64::try_from(timestamp).map_err(drop)?;
1739
1740 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
1741 if self.config.enforce_expiration_timestamps && timestamp < now {
1742 trace!(target: "discv4", "Expired packet");
1743 return Err(())
1744 }
1745 Ok(())
1746 }
1747
1748 fn ping_buffered(&mut self) {
1750 while self.pending_pings.len() < MAX_NODES_PING {
1751 match self.queued_pings.pop_front() {
1752 Some((next, reason)) => self.try_ping(next, reason),
1753 None => break,
1754 }
1755 }
1756 }
1757
1758 fn ping_expiration(&self) -> u64 {
1759 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.ping_expiration)
1760 .as_secs()
1761 }
1762
1763 fn find_node_expiration(&self) -> u64 {
1764 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.request_timeout)
1765 .as_secs()
1766 }
1767
1768 fn enr_request_expiration(&self) -> u64 {
1769 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.enr_expiration)
1770 .as_secs()
1771 }
1772
1773 fn send_neighbours_expiration(&self) -> u64 {
1774 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.neighbours_expiration)
1775 .as_secs()
1776 }
1777
1778 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Discv4Event> {
1784 loop {
1785 if let Some(event) = self.queued_events.pop_front() {
1787 return Poll::Ready(event)
1788 }
1789
1790 if self.config.enable_lookup {
1792 while self.lookup_interval.poll_tick(cx).is_ready() {
1793 let target = self.lookup_rotator.next(&self.local_node_record.id);
1794 self.lookup_with(target, None);
1795 }
1796 }
1797
1798 while self.ping_interval.poll_tick(cx).is_ready() {
1800 self.re_ping_oldest();
1801 }
1802
1803 if let Some(Poll::Ready(Some(ip))) =
1804 self.resolve_external_ip_interval.as_mut().map(|r| r.poll_tick(cx))
1805 {
1806 self.set_external_ip_addr(ip);
1807 }
1808
1809 while let Poll::Ready(Some(cmd)) = self.commands_rx.poll_recv(cx) {
1811 match cmd {
1812 Discv4Command::Add(enr) => {
1813 self.add_node(enr);
1814 }
1815 Discv4Command::AddBootNode(record) => {
1816 self.add_boot_node(record);
1817 }
1818 Discv4Command::Lookup { node_id, tx } => {
1819 let node_id = node_id.unwrap_or(self.local_node_record.id);
1820 self.lookup_with(node_id, tx);
1821 }
1822 Discv4Command::SetLookupInterval(duration) => {
1823 self.set_lookup_interval(duration);
1824 }
1825 Discv4Command::Updates(tx) => {
1826 let rx = self.update_stream();
1827 let _ = tx.send(rx);
1828 }
1829 Discv4Command::BanPeer(node_id) => self.ban_node(node_id),
1830 Discv4Command::Remove(node_id) => {
1831 self.remove_node(node_id);
1832 }
1833 Discv4Command::Ban(node_id, ip) => {
1834 self.ban_node(node_id);
1835 self.ban_ip(ip);
1836 }
1837 Discv4Command::BanIp(ip) => {
1838 self.ban_ip(ip);
1839 }
1840 Discv4Command::SetEIP868RLPPair { key, rlp } => {
1841 debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair");
1842
1843 let _ = self.local_eip_868_enr.insert_raw_rlp(key, rlp, &self.secret_key);
1844 }
1845 Discv4Command::SetTcpPort(port) => {
1846 debug!(target: "discv4", %port, "Update tcp port");
1847 self.local_node_record.tcp_port = port;
1848 if self.local_node_record.address.is_ipv4() {
1849 let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key);
1850 } else {
1851 let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key);
1852 }
1853 }
1854
1855 Discv4Command::Terminated => {
1856 self.queued_events.push_back(Discv4Event::Terminated);
1858 }
1859 }
1860 }
1861
1862 let mut udp_message_budget = UDP_MESSAGE_POLL_LOOP_BUDGET;
1864
1865 while let Poll::Ready(Some(event)) = self.ingress.poll_recv(cx) {
1867 match event {
1868 IngressEvent::RecvError(err) => {
1869 debug!(target: "discv4", %err, "failed to read datagram");
1870 }
1871 IngressEvent::BadPacket(from, err, data) => {
1872 trace!(target: "discv4", ?from, %err, packet=?hex::encode(&data), "bad packet");
1873 }
1874 IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => {
1875 trace!(target: "discv4", r#type=?msg.msg_type(), from=?remote_addr,"received packet");
1876 let event = match msg {
1877 Message::Ping(ping) => {
1878 self.on_ping(ping, remote_addr, node_id, hash);
1879 Discv4Event::Ping
1880 }
1881 Message::Pong(pong) => {
1882 self.on_pong(pong, remote_addr, node_id);
1883 Discv4Event::Pong
1884 }
1885 Message::FindNode(msg) => {
1886 self.on_find_node(msg, remote_addr, node_id);
1887 Discv4Event::FindNode
1888 }
1889 Message::Neighbours(msg) => {
1890 self.on_neighbours(msg, remote_addr, node_id);
1891 Discv4Event::Neighbours
1892 }
1893 Message::EnrRequest(msg) => {
1894 self.on_enr_request(msg, remote_addr, node_id, hash);
1895 Discv4Event::EnrRequest
1896 }
1897 Message::EnrResponse(msg) => {
1898 self.on_enr_response(msg, remote_addr, node_id);
1899 Discv4Event::EnrResponse
1900 }
1901 };
1902
1903 self.queued_events.push_back(event);
1904 }
1905 }
1906
1907 udp_message_budget -= 1;
1908 if udp_message_budget < 0 {
1909 trace!(target: "discv4", budget=UDP_MESSAGE_POLL_LOOP_BUDGET, "exhausted message poll budget");
1910 if self.queued_events.is_empty() {
1911 cx.waker().wake_by_ref();
1914 }
1915 break
1916 }
1917 }
1918
1919 self.ping_buffered();
1921
1922 while self.evict_expired_requests_interval.poll_tick(cx).is_ready() {
1924 self.evict_expired_requests(Instant::now());
1925 }
1926
1927 while self.expire_interval.poll_tick(cx).is_ready() {
1929 self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION);
1930 }
1931
1932 if self.queued_events.is_empty() {
1933 return Poll::Pending
1934 }
1935 }
1936 }
1937}
1938
1939impl Stream for Discv4Service {
1941 type Item = Discv4Event;
1942
1943 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1944 match ready!(self.get_mut().poll(cx)) {
1946 Discv4Event::Terminated => Poll::Ready(None),
1948 ev => Poll::Ready(Some(ev)),
1950 }
1951 }
1952}
1953
1954impl fmt::Debug for Discv4Service {
1955 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1956 f.debug_struct("Discv4Service")
1957 .field("local_address", &self.local_address)
1958 .field("local_peer_id", &self.local_peer_id())
1959 .field("local_node_record", &self.local_node_record)
1960 .field("queued_pings", &self.queued_pings)
1961 .field("pending_lookup", &self.pending_lookup)
1962 .field("pending_find_nodes", &self.pending_find_nodes)
1963 .field("lookup_interval", &self.lookup_interval)
1964 .finish_non_exhaustive()
1965 }
1966}
1967
1968#[derive(Debug, Eq, PartialEq)]
1972pub enum Discv4Event {
1973 Ping,
1975 Pong,
1977 FindNode,
1979 Neighbours,
1981 EnrRequest,
1983 EnrResponse,
1985 Terminated,
1987}
1988
1989pub(crate) async fn send_loop(udp: Arc<UdpSocket>, rx: EgressReceiver) {
1991 let mut stream = ReceiverStream::new(rx);
1992 while let Some((payload, to)) = stream.next().await {
1993 match udp.send_to(&payload, to).await {
1994 Ok(size) => {
1995 trace!(target: "discv4", ?to, ?size,"sent payload");
1996 }
1997 Err(err) => {
1998 debug!(target: "discv4", ?to, %err,"Failed to send datagram.");
1999 }
2000 }
2001 }
2002}
2003
2004const MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP: usize = 60usize;
2006
2007pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_id: PeerId) {
2012 let mut handler = IngressHandler::new(tx, local_id);
2013 let mut buf = [0; MAX_PACKET_SIZE];
2014 loop {
2015 let res = udp.recv_from(&mut buf).await;
2016 match res {
2017 Err(err) => {
2018 debug!(target: "discv4", %err, "Failed to read datagram.");
2019 handler.send(IngressEvent::RecvError(err)).await;
2020 }
2021 Ok((read, remote_addr)) => {
2022 handler.handle_packet(&buf[..read], remote_addr).await;
2023 }
2024 }
2025 }
2026}
2027
2028#[derive(Debug)]
2033pub struct IngressHandler {
2034 tx: IngressSender,
2035 local_id: PeerId,
2036 tick: usize,
2037 tick_interval: Duration,
2038 cache: ReceiveCache,
2039 last_tick: Instant,
2040}
2041
2042impl IngressHandler {
2043 fn new(tx: IngressSender, local_id: PeerId) -> Self {
2044 let tick = MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP / 2;
2045 Self {
2046 tx,
2047 local_id,
2048 tick,
2049 tick_interval: Duration::from_secs(tick as u64),
2050 cache: ReceiveCache::default(),
2051 last_tick: Instant::now(),
2052 }
2053 }
2054
2055 async fn send(&self, event: IngressEvent) {
2056 let _ = self.tx.send(event).await.map_err(|err| {
2057 debug!(target: "discv4", %err, "failed send incoming packet");
2058 });
2059 }
2060
2061 pub async fn handle_packet(&mut self, data: &[u8], src: SocketAddr) {
2064 if self.last_tick.elapsed() >= self.tick_interval {
2065 self.cache.tick_ips(self.tick);
2066 self.last_tick = Instant::now();
2067 }
2068
2069 if self.cache.inc_ip(src.ip()) > MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP {
2071 trace!(target: "discv4", ?src, "Too many incoming packets from IP.");
2072 return
2073 }
2074
2075 let event = match Message::decode(data) {
2076 Ok(packet) => {
2077 if packet.node_id == self.local_id {
2078 debug!(target: "discv4", ?src, "Received own packet.");
2079 return
2080 }
2081
2082 if self.cache.contains_packet(packet.hash) {
2083 debug!(target: "discv4", ?src, "Received duplicate packet.");
2084 return
2085 }
2086
2087 IngressEvent::Packet(src, packet)
2088 }
2089 Err(err) => {
2090 trace!(target: "discv4", %err, "Failed to decode packet");
2091 IngressEvent::BadPacket(src, err, data.to_vec())
2092 }
2093 };
2094
2095 self.send(event).await;
2096 }
2097}
2098
2099#[derive(Debug)]
2103struct ReceiveCache {
2104 ip_messages: HashMap<IpAddr, usize>,
2110 unique_packets: schnellru::LruMap<B256, ()>,
2112}
2113
2114impl ReceiveCache {
2115 fn tick_ips(&mut self, tick: usize) {
2119 self.ip_messages.retain(|_, count| {
2120 if let Some(reset) = count.checked_sub(tick) {
2121 *count = reset;
2122 true
2123 } else {
2124 false
2125 }
2126 });
2127 }
2128
2129 fn inc_ip(&mut self, ip: IpAddr) -> usize {
2131 let ctn = self.ip_messages.entry(ip).or_default();
2132 *ctn = ctn.saturating_add(1);
2133 *ctn
2134 }
2135
2136 fn contains_packet(&mut self, hash: B256) -> bool {
2138 !self.unique_packets.insert(hash, ())
2139 }
2140}
2141
2142impl Default for ReceiveCache {
2143 fn default() -> Self {
2144 Self {
2145 ip_messages: Default::default(),
2146 unique_packets: schnellru::LruMap::new(schnellru::ByLength::new(32)),
2147 }
2148 }
2149}
2150
2151enum Discv4Command {
2153 Add(NodeRecord),
2154 AddBootNode(NodeRecord),
2155 SetTcpPort(u16),
2156 SetEIP868RLPPair { key: Vec<u8>, rlp: Bytes },
2157 Ban(PeerId, IpAddr),
2158 BanPeer(PeerId),
2159 BanIp(IpAddr),
2160 Remove(PeerId),
2161 Lookup { node_id: Option<PeerId>, tx: Option<NodeRecordSender> },
2162 SetLookupInterval(Duration),
2163 Updates(OneshotSender<ReceiverStream<DiscoveryUpdate>>),
2164 Terminated,
2165}
2166
2167#[derive(Debug)]
2169pub(crate) enum IngressEvent {
2170 RecvError(io::Error),
2172 BadPacket(SocketAddr, DecodePacketError, Vec<u8>),
2174 Packet(SocketAddr, Packet),
2176}
2177
2178#[derive(Debug)]
2180struct PingRequest {
2181 sent_at: Instant,
2183 node: NodeRecord,
2185 echo_hash: B256,
2187 reason: PingReason,
2189}
2190
2191#[derive(Debug)]
2195struct LookupTargetRotator {
2196 interval: usize,
2197 counter: usize,
2198}
2199
2200impl LookupTargetRotator {
2203 const fn local_only() -> Self {
2205 Self { interval: 1, counter: 0 }
2206 }
2207}
2208
2209impl Default for LookupTargetRotator {
2210 fn default() -> Self {
2211 Self {
2212 interval: 4,
2214 counter: 3,
2215 }
2216 }
2217}
2218
2219impl LookupTargetRotator {
2220 fn next(&mut self, local: &PeerId) -> PeerId {
2222 self.counter += 1;
2223 self.counter %= self.interval;
2224 if self.counter == 0 {
2225 return *local
2226 }
2227 PeerId::random()
2228 }
2229}
2230
2231#[derive(Clone, Debug)]
2236struct LookupContext {
2237 inner: Rc<LookupContextInner>,
2238}
2239
2240impl LookupContext {
2241 fn new(
2243 target: discv5::Key<NodeKey>,
2244 nearest_nodes: impl IntoIterator<Item = (Distance, NodeRecord)>,
2245 listener: Option<NodeRecordSender>,
2246 ) -> Self {
2247 let closest_nodes = nearest_nodes
2248 .into_iter()
2249 .map(|(distance, record)| {
2250 (distance, QueryNode { record, queried: false, responded: false })
2251 })
2252 .collect();
2253
2254 let inner = Rc::new(LookupContextInner {
2255 target,
2256 closest_nodes: RefCell::new(closest_nodes),
2257 listener,
2258 });
2259 Self { inner }
2260 }
2261
2262 fn target(&self) -> PeerId {
2264 self.inner.target.preimage().0
2265 }
2266
2267 fn closest(&self, num: usize) -> Vec<NodeRecord> {
2268 self.inner
2269 .closest_nodes
2270 .borrow()
2271 .iter()
2272 .filter(|(_, node)| !node.queried)
2273 .map(|(_, n)| n.record)
2274 .take(num)
2275 .collect()
2276 }
2277
2278 fn filter_closest<P>(&self, num: usize, filter: P) -> Vec<NodeRecord>
2280 where
2281 P: FnMut(&NodeRecord) -> bool,
2282 {
2283 self.inner
2284 .closest_nodes
2285 .borrow()
2286 .iter()
2287 .filter(|(_, node)| !node.queried)
2288 .map(|(_, n)| n.record)
2289 .filter(filter)
2290 .take(num)
2291 .collect()
2292 }
2293
2294 fn add_node(&self, record: NodeRecord) {
2296 let distance = self.inner.target.distance(&kad_key(record.id));
2297 let mut closest = self.inner.closest_nodes.borrow_mut();
2298 if let btree_map::Entry::Vacant(entry) = closest.entry(distance) {
2299 entry.insert(QueryNode { record, queried: false, responded: false });
2300 }
2301 }
2302
2303 fn set_queried(&self, id: PeerId, val: bool) {
2304 if let Some((_, node)) =
2305 self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2306 {
2307 node.queried = val;
2308 }
2309 }
2310
2311 fn mark_queried(&self, id: PeerId) {
2313 self.set_queried(id, true)
2314 }
2315
2316 fn unmark_queried(&self, id: PeerId) {
2318 self.set_queried(id, false)
2319 }
2320
2321 fn mark_responded(&self, id: PeerId) {
2323 if let Some((_, node)) =
2324 self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2325 {
2326 node.responded = true;
2327 }
2328 }
2329}
2330
2331unsafe impl Send for LookupContext {}
2338#[derive(Debug)]
2339struct LookupContextInner {
2340 target: discv5::Key<NodeKey>,
2342 closest_nodes: RefCell<BTreeMap<Distance, QueryNode>>,
2344 listener: Option<NodeRecordSender>,
2349}
2350
2351impl Drop for LookupContextInner {
2352 fn drop(&mut self) {
2353 if let Some(tx) = self.listener.take() {
2354 let nodes = self
2357 .closest_nodes
2358 .take()
2359 .into_values()
2360 .filter(|node| node.responded)
2361 .map(|node| node.record)
2362 .collect();
2363 let _ = tx.send(nodes);
2364 }
2365 }
2366}
2367
2368#[derive(Debug, Clone, Copy)]
2370struct QueryNode {
2371 record: NodeRecord,
2372 queried: bool,
2373 responded: bool,
2374}
2375
2376#[derive(Debug)]
2377struct FindNodeRequest {
2378 sent_at: Instant,
2380 response_count: usize,
2382 answered: bool,
2384 lookup_context: LookupContext,
2386}
2387
2388impl FindNodeRequest {
2391 fn new(resp: LookupContext) -> Self {
2392 Self { sent_at: Instant::now(), response_count: 0, answered: false, lookup_context: resp }
2393 }
2394}
2395
2396#[derive(Debug)]
2398struct CachedFindNode {
2399 target: PeerId,
2400 payload: Bytes,
2401 hash: B256,
2402 cached_at: Instant,
2403}
2404
2405impl CachedFindNode {
2406 fn get_or_sign(
2409 cache: &mut Option<Self>,
2410 target: PeerId,
2411 ttl: Duration,
2412 secret_key: &secp256k1::SecretKey,
2413 expire: u64,
2414 ) -> (Bytes, B256) {
2415 if let Some(c) = cache.as_ref() &&
2416 c.target == target &&
2417 c.cached_at.elapsed() < ttl
2418 {
2419 return (c.payload.clone(), c.hash);
2420 }
2421
2422 let msg = Message::FindNode(FindNode { id: target, expire });
2423 let (payload, hash) = msg.encode(secret_key);
2424
2425 *cache = Some(Self { target, payload: payload.clone(), hash, cached_at: Instant::now() });
2426
2427 (payload, hash)
2428 }
2429}
2430
2431#[derive(Debug)]
2432struct EnrRequestState {
2433 sent_at: Instant,
2435 echo_hash: B256,
2437}
2438
2439#[derive(Debug, Clone, Eq, PartialEq)]
2441struct NodeEntry {
2442 record: NodeRecord,
2444 last_seen: Instant,
2446 last_enr_seq: Option<u64>,
2448 fork_id: Option<ForkId>,
2450 find_node_failures: u8,
2452 has_endpoint_proof: bool,
2454}
2455
2456impl NodeEntry {
2459 fn new(record: NodeRecord) -> Self {
2461 Self {
2462 record,
2463 last_seen: Instant::now(),
2464 last_enr_seq: None,
2465 fork_id: None,
2466 find_node_failures: 0,
2467 has_endpoint_proof: false,
2468 }
2469 }
2470
2471 #[cfg(test)]
2472 fn new_proven(record: NodeRecord) -> Self {
2473 let mut node = Self::new(record);
2474 node.has_endpoint_proof = true;
2475 node
2476 }
2477
2478 const fn establish_proof(&mut self) {
2480 self.has_endpoint_proof = true;
2481 self.find_node_failures = 0;
2482 }
2483
2484 const fn exceeds_find_node_failures(&self, max_failures: u8) -> bool {
2486 self.find_node_failures >= max_failures
2487 }
2488
2489 fn update_with_enr(&mut self, last_enr_seq: Option<u64>) -> Option<u64> {
2491 self.update_now(|s| std::mem::replace(&mut s.last_enr_seq, last_enr_seq))
2492 }
2493
2494 const fn inc_failed_request(&mut self) {
2496 self.find_node_failures += 1;
2497 }
2498
2499 fn update_with_fork_id(&mut self, fork_id: Option<ForkId>) -> Option<ForkId> {
2501 self.update_now(|s| std::mem::replace(&mut s.fork_id, fork_id))
2502 }
2503
2504 fn update_now<F, R>(&mut self, f: F) -> R
2506 where
2507 F: FnOnce(&mut Self) -> R,
2508 {
2509 self.last_seen = Instant::now();
2510 f(self)
2511 }
2512}
2513
2514impl NodeEntry {
2517 fn is_expired(&self) -> bool {
2519 self.last_seen.elapsed() > (ENDPOINT_PROOF_EXPIRATION / 2)
2520 }
2521}
2522
2523#[derive(Debug)]
2525enum PingReason {
2526 InitialInsert,
2528 EstablishBond,
2530 RePing,
2532 Lookup(NodeRecord, LookupContext),
2534}
2535
2536#[derive(Debug, Clone)]
2538pub enum DiscoveryUpdate {
2539 Added(NodeRecord),
2541 DiscoveredAtCapacity(NodeRecord),
2543 EnrForkId(NodeRecord, ForkId),
2545 Removed(PeerId),
2547 Batch(Vec<Self>),
2549}
2550
2551#[cfg(test)]
2552mod tests {
2553 use super::*;
2554 use crate::test_utils::{create_discv4, create_discv4_with_config, rng_endpoint, rng_record};
2555 use alloy_primitives::hex;
2556 use alloy_rlp::{Decodable, Encodable};
2557 use rand_08::Rng;
2558 use reth_ethereum_forks::{EnrForkIdEntry, ForkHash};
2559 use reth_network_peers::mainnet_nodes;
2560 use std::future::poll_fn;
2561
2562 #[tokio::test]
2563 async fn test_configured_enr_forkid_entry() {
2564 let fork: ForkId = ForkId { hash: ForkHash([220, 233, 108, 45]), next: 0u64 };
2565 let mut disc_conf = Discv4Config::default();
2566 disc_conf.add_eip868_pair("eth", EnrForkIdEntry::from(fork));
2567 let (_discv4, service) = create_discv4_with_config(disc_conf).await;
2568 let eth = service.local_eip_868_enr.get_raw_rlp(b"eth").unwrap();
2569 let fork_entry_id = EnrForkIdEntry::decode(&mut ð[..]).unwrap();
2570
2571 let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2572 let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2573 let expected = EnrForkIdEntry {
2574 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2575 };
2576 assert_eq!(expected, fork_entry_id);
2577 assert_eq!(expected, decoded);
2578 }
2579
2580 #[test]
2581 fn test_enr_forkid_entry_decode() {
2582 let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2583 let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2584 let expected = EnrForkIdEntry {
2585 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2586 };
2587 assert_eq!(expected, decoded);
2588 }
2589
2590 #[test]
2591 fn test_enr_forkid_entry_encode() {
2592 let original = EnrForkIdEntry {
2593 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2594 };
2595 let expected: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2596 let mut encoded = Vec::with_capacity(expected.len());
2597 original.encode(&mut encoded);
2598 assert_eq!(&expected[..], encoded.as_slice());
2599 }
2600
2601 #[test]
2602 fn test_local_rotator() {
2603 let id = PeerId::random();
2604 let mut rotator = LookupTargetRotator::local_only();
2605 assert_eq!(rotator.next(&id), id);
2606 assert_eq!(rotator.next(&id), id);
2607 }
2608
2609 #[test]
2610 fn test_rotator() {
2611 let id = PeerId::random();
2612 let mut rotator = LookupTargetRotator::default();
2613 assert_eq!(rotator.next(&id), id);
2614 assert_ne!(rotator.next(&id), id);
2615 assert_ne!(rotator.next(&id), id);
2616 assert_ne!(rotator.next(&id), id);
2617 assert_eq!(rotator.next(&id), id);
2618 }
2619
2620 #[tokio::test]
2621 async fn test_pending_ping() {
2622 let (_, mut service) = create_discv4().await;
2623
2624 let local_addr = service.local_addr();
2625
2626 let mut num_inserted = 0;
2627 loop {
2628 let node = NodeRecord::new(local_addr, PeerId::random());
2629 if service.add_node(node) {
2630 num_inserted += 1;
2631 assert!(service.pending_pings.contains_key(&node.id));
2632 assert_eq!(service.pending_pings.len(), num_inserted);
2633 if num_inserted == MAX_NODES_PING {
2634 break
2635 }
2636 }
2637 }
2638
2639 num_inserted = 0;
2641 for _ in 0..MAX_NODES_PING {
2642 let node = NodeRecord::new(local_addr, PeerId::random());
2643 if service.add_node(node) {
2644 num_inserted += 1;
2645 assert!(!service.pending_pings.contains_key(&node.id));
2646 assert_eq!(service.pending_pings.len(), MAX_NODES_PING);
2647 assert_eq!(service.queued_pings.len(), num_inserted);
2648 }
2649 }
2650 }
2651
2652 #[tokio::test(flavor = "multi_thread")]
2654 #[ignore]
2655 async fn test_mainnet_lookup() {
2656 reth_tracing::init_test_tracing();
2657 let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2658
2659 let all_nodes = mainnet_nodes();
2660 let config = Discv4Config::builder()
2661 .add_boot_nodes(all_nodes)
2662 .lookup_interval(Duration::from_secs(1))
2663 .add_eip868_pair("eth", fork_id)
2664 .build();
2665 let (_discv4, mut service) = create_discv4_with_config(config).await;
2666
2667 let mut updates = service.update_stream();
2668
2669 let _handle = service.spawn();
2670
2671 let mut table = HashMap::new();
2672 while let Some(update) = updates.next().await {
2673 match update {
2674 DiscoveryUpdate::EnrForkId(record, fork_id) => {
2675 println!("{record:?}, {fork_id:?}");
2676 }
2677 DiscoveryUpdate::Added(record) => {
2678 table.insert(record.id, record);
2679 }
2680 DiscoveryUpdate::Removed(id) => {
2681 table.remove(&id);
2682 }
2683 _ => {}
2684 }
2685 println!("total peers {}", table.len());
2686 }
2687 }
2688
2689 #[tokio::test]
2690 async fn test_mapped_ipv4() {
2691 reth_tracing::init_test_tracing();
2692 let mut rng = rand_08::thread_rng();
2693 let config = Discv4Config::builder().build();
2694 let (_discv4, mut service) = create_discv4_with_config(config).await;
2695
2696 let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2697 let v6 = v4.to_ipv6_mapped();
2698 let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2699
2700 let ping = Ping {
2701 from: rng_endpoint(&mut rng),
2702 to: rng_endpoint(&mut rng),
2703 expire: service.ping_expiration(),
2704 enr_sq: Some(rng.r#gen()),
2705 };
2706
2707 let id = PeerId::random();
2708 service.on_ping(ping, addr, id, B256::random());
2709
2710 let key = kad_key(id);
2711 match service.kbuckets.entry(&key) {
2712 kbucket::Entry::Present(entry, _) => {
2713 let node_addr = entry.value().record.address;
2714 assert!(node_addr.is_ipv4());
2715 assert_eq!(node_addr, IpAddr::from(v4));
2716 }
2717 _ => unreachable!(),
2718 };
2719 }
2720
2721 #[tokio::test]
2722 async fn test_respect_ping_expiration() {
2723 reth_tracing::init_test_tracing();
2724 let mut rng = rand_08::thread_rng();
2725 let config = Discv4Config::builder().build();
2726 let (_discv4, mut service) = create_discv4_with_config(config).await;
2727
2728 let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2729 let v6 = v4.to_ipv6_mapped();
2730 let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2731
2732 let ping = Ping {
2733 from: rng_endpoint(&mut rng),
2734 to: rng_endpoint(&mut rng),
2735 expire: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() - 1,
2736 enr_sq: Some(rng.r#gen()),
2737 };
2738
2739 let id = PeerId::random();
2740 service.on_ping(ping, addr, id, B256::random());
2741
2742 let key = kad_key(id);
2743 match service.kbuckets.entry(&key) {
2744 kbucket::Entry::Absent(_) => {}
2745 _ => unreachable!(),
2746 };
2747 }
2748
2749 #[tokio::test]
2750 async fn test_single_lookups() {
2751 reth_tracing::init_test_tracing();
2752
2753 let config = Discv4Config::builder().build();
2754 let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2755
2756 let id = PeerId::random();
2757 let key = kad_key(id);
2758 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2759
2760 let _ = service.kbuckets.insert_or_update(
2761 &key,
2762 NodeEntry::new_proven(record),
2763 NodeStatus {
2764 direction: ConnectionDirection::Incoming,
2765 state: ConnectionState::Connected,
2766 },
2767 );
2768
2769 service.lookup_self();
2770 assert_eq!(service.pending_find_nodes.len(), 1);
2771
2772 poll_fn(|cx| {
2773 let _ = service.poll(cx);
2774 assert_eq!(service.pending_find_nodes.len(), 1);
2775
2776 Poll::Ready(())
2777 })
2778 .await;
2779 }
2780
2781 #[tokio::test]
2782 async fn test_on_neighbours_recursive_lookup() {
2783 reth_tracing::init_test_tracing();
2784
2785 let config = Discv4Config::builder().build();
2786 let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2787 let (_discv4, mut service2) = create_discv4_with_config(config).await;
2788
2789 let id = PeerId::random();
2790 let key = kad_key(id);
2791 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2792
2793 let _ = service.kbuckets.insert_or_update(
2794 &key,
2795 NodeEntry::new_proven(record),
2796 NodeStatus {
2797 direction: ConnectionDirection::Incoming,
2798 state: ConnectionState::Connected,
2799 },
2800 );
2801 service.lookup_self();
2804 assert_eq!(service.pending_find_nodes.len(), 1);
2805
2806 poll_fn(|cx| {
2807 let _ = service.poll(cx);
2808 assert_eq!(service.pending_find_nodes.len(), 1);
2809
2810 Poll::Ready(())
2811 })
2812 .await;
2813
2814 let expiry = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() +
2815 10000000000000;
2816 let msg = Neighbours { nodes: vec![service2.local_node_record], expire: expiry };
2817 service.on_neighbours(msg, record.tcp_addr(), id);
2818 let event = poll_fn(|cx| service2.poll(cx)).await;
2820 assert_eq!(event, Discv4Event::Ping);
2821 assert_eq!(service.pending_find_nodes.len(), 1);
2824 let event = poll_fn(|cx| service.poll(cx)).await;
2826 assert_eq!(event, Discv4Event::Pong);
2827 let event = poll_fn(|cx| service.poll(cx)).await;
2832 assert_eq!(event, Discv4Event::Ping);
2833 assert_eq!(service.pending_find_nodes.len(), 2);
2836 }
2837
2838 #[tokio::test]
2839 async fn test_no_local_in_closest() {
2840 reth_tracing::init_test_tracing();
2841
2842 let config = Discv4Config::builder().build();
2843 let (_discv4, mut service) = create_discv4_with_config(config).await;
2844
2845 let target_key = kad_key(PeerId::random());
2846
2847 let id = PeerId::random();
2848 let key = kad_key(id);
2849 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2850
2851 let _ = service.kbuckets.insert_or_update(
2852 &key,
2853 NodeEntry::new(record),
2854 NodeStatus {
2855 direction: ConnectionDirection::Incoming,
2856 state: ConnectionState::Connected,
2857 },
2858 );
2859
2860 let closest = service
2861 .kbuckets
2862 .closest_values(&target_key)
2863 .map(|n| n.value.record)
2864 .take(MAX_NODES_PER_BUCKET)
2865 .collect::<Vec<_>>();
2866
2867 assert_eq!(closest.len(), 1);
2868 assert!(!closest.iter().any(|r| r.id == *service.local_peer_id()));
2869 }
2870
2871 #[tokio::test]
2872 async fn test_random_lookup() {
2873 reth_tracing::init_test_tracing();
2874
2875 let config = Discv4Config::builder().build();
2876 let (_discv4, mut service) = create_discv4_with_config(config).await;
2877
2878 let target = PeerId::random();
2879
2880 let id = PeerId::random();
2881 let key = kad_key(id);
2882 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2883
2884 let _ = service.kbuckets.insert_or_update(
2885 &key,
2886 NodeEntry::new_proven(record),
2887 NodeStatus {
2888 direction: ConnectionDirection::Incoming,
2889 state: ConnectionState::Connected,
2890 },
2891 );
2892
2893 service.lookup(target);
2894 assert_eq!(service.pending_find_nodes.len(), 1);
2895
2896 let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2897
2898 assert_eq!(ctx.target(), target);
2899 assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2900
2901 ctx.add_node(record);
2902 assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2903 }
2904
2905 #[tokio::test]
2906 async fn test_reping_on_find_node_failures() {
2907 reth_tracing::init_test_tracing();
2908
2909 let config = Discv4Config::builder().build();
2910 let (_discv4, mut service) = create_discv4_with_config(config).await;
2911
2912 let target = PeerId::random();
2913
2914 let id = PeerId::random();
2915 let key = kad_key(id);
2916 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2917
2918 let mut entry = NodeEntry::new_proven(record);
2919 entry.find_node_failures = u8::MAX;
2920 let _ = service.kbuckets.insert_or_update(
2921 &key,
2922 entry,
2923 NodeStatus {
2924 direction: ConnectionDirection::Incoming,
2925 state: ConnectionState::Connected,
2926 },
2927 );
2928
2929 service.lookup(target);
2930 assert_eq!(service.pending_find_nodes.len(), 0);
2931 assert_eq!(service.pending_pings.len(), 1);
2932
2933 service.update_on_pong(record, None);
2934
2935 service
2936 .on_entry(record.id, |entry| {
2937 assert_eq!(entry.find_node_failures, 0);
2939 assert!(entry.has_endpoint_proof);
2940 })
2941 .unwrap();
2942 }
2943
2944 #[tokio::test]
2945 async fn test_service_commands() {
2946 reth_tracing::init_test_tracing();
2947
2948 let config = Discv4Config::builder().build();
2949 let (discv4, mut service) = create_discv4_with_config(config).await;
2950
2951 service.lookup_self();
2952
2953 let _handle = service.spawn();
2954 discv4.send_lookup_self();
2955 let _ = discv4.lookup_self().await;
2956 }
2957
2958 #[tokio::test]
2959 async fn test_requests_timeout() {
2960 reth_tracing::init_test_tracing();
2961 let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2962
2963 let config = Discv4Config::builder()
2964 .request_timeout(Duration::from_millis(200))
2965 .ping_expiration(Duration::from_millis(200))
2966 .lookup_neighbours_expiration(Duration::from_millis(200))
2967 .add_eip868_pair("eth", fork_id)
2968 .build();
2969 let (_disv4, mut service) = create_discv4_with_config(config).await;
2970
2971 let id = PeerId::random();
2972 let key = kad_key(id);
2973 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2974
2975 let _ = service.kbuckets.insert_or_update(
2976 &key,
2977 NodeEntry::new_proven(record),
2978 NodeStatus {
2979 direction: ConnectionDirection::Incoming,
2980 state: ConnectionState::Connected,
2981 },
2982 );
2983
2984 service.lookup_self();
2985 assert_eq!(service.pending_find_nodes.len(), 1);
2986
2987 let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2988
2989 service.pending_lookup.insert(record.id, (Instant::now(), ctx));
2990
2991 assert_eq!(service.pending_lookup.len(), 1);
2992
2993 let ping = Ping {
2994 from: service.local_node_record.into(),
2995 to: record.into(),
2996 expire: service.ping_expiration(),
2997 enr_sq: service.enr_seq(),
2998 };
2999 let echo_hash = service.send_packet(Message::Ping(ping), record.udp_addr());
3000 let ping_request = PingRequest {
3001 sent_at: Instant::now(),
3002 node: record,
3003 echo_hash,
3004 reason: PingReason::InitialInsert,
3005 };
3006 service.pending_pings.insert(record.id, ping_request);
3007
3008 assert_eq!(service.pending_pings.len(), 1);
3009
3010 tokio::time::sleep(Duration::from_secs(1)).await;
3011
3012 poll_fn(|cx| {
3013 let _ = service.poll(cx);
3014
3015 assert_eq!(service.pending_find_nodes.len(), 0);
3016 assert_eq!(service.pending_lookup.len(), 0);
3017 assert_eq!(service.pending_pings.len(), 0);
3018
3019 Poll::Ready(())
3020 })
3021 .await;
3022 }
3023
3024 #[tokio::test(flavor = "multi_thread")]
3026 async fn test_check_wrong_to() {
3027 reth_tracing::init_test_tracing();
3028
3029 let config = Discv4Config::builder().external_ip_resolver(None).build();
3030 let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
3031 let (_discv4, mut service_2) = create_discv4_with_config(config).await;
3032
3033 let mut ping = Ping {
3035 from: service_1.local_node_record.into(),
3036 to: service_2.local_node_record.into(),
3037 expire: service_1.ping_expiration(),
3038 enr_sq: service_1.enr_seq(),
3039 };
3040 ping.to.address = "192.0.2.0".parse().unwrap();
3041
3042 let echo_hash = service_1.send_packet(Message::Ping(ping), service_2.local_addr());
3043 let ping_request = PingRequest {
3044 sent_at: Instant::now(),
3045 node: service_2.local_node_record,
3046 echo_hash,
3047 reason: PingReason::InitialInsert,
3048 };
3049 service_1.pending_pings.insert(*service_2.local_peer_id(), ping_request);
3050
3051 let event = poll_fn(|cx| service_2.poll(cx)).await;
3053 assert_eq!(event, Discv4Event::Ping);
3054
3055 let event = poll_fn(|cx| service_1.poll(cx)).await;
3057 assert_eq!(event, Discv4Event::Pong);
3058 let event = poll_fn(|cx| service_1.poll(cx)).await;
3060 assert_eq!(event, Discv4Event::Ping);
3061 }
3062
3063 #[tokio::test(flavor = "multi_thread")]
3064 async fn test_check_ping_pong() {
3065 reth_tracing::init_test_tracing();
3066
3067 let config = Discv4Config::builder().external_ip_resolver(None).build();
3068 let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
3069 let (_discv4, mut service_2) = create_discv4_with_config(config).await;
3070
3071 service_1.add_node(service_2.local_node_record);
3073
3074 let event = poll_fn(|cx| service_2.poll(cx)).await;
3076 assert_eq!(event, Discv4Event::Ping);
3077
3078 let key1 = kad_key(*service_1.local_peer_id());
3080 match service_2.kbuckets.entry(&key1) {
3081 kbucket::Entry::Present(_entry, status) => {
3082 assert!(!status.is_connected());
3083 }
3084 _ => unreachable!(),
3085 }
3086
3087 let event = poll_fn(|cx| service_1.poll(cx)).await;
3089 assert_eq!(event, Discv4Event::Pong);
3090
3091 let key2 = kad_key(*service_2.local_peer_id());
3093 match service_1.kbuckets.entry(&key2) {
3094 kbucket::Entry::Present(_entry, status) => {
3095 assert!(status.is_connected());
3096 }
3097 _ => unreachable!(),
3098 }
3099
3100 let event = poll_fn(|cx| service_1.poll(cx)).await;
3102 assert_eq!(event, Discv4Event::Ping);
3103
3104 tokio::time::timeout(Duration::from_secs(5), async {
3108 loop {
3109 let event = poll_fn(|cx| service_2.poll(cx)).await;
3110 match event {
3111 Discv4Event::Pong => break,
3112 Discv4Event::EnrRequest | Discv4Event::FindNode => {}
3113 ev => unreachable!("{ev:?}"),
3114 }
3115 }
3116 })
3117 .await
3118 .expect("timed out waiting for Pong from service_2");
3119
3120 match service_2.kbuckets.entry(&key1) {
3122 kbucket::Entry::Present(_entry, status) => {
3123 assert!(status.is_connected());
3124 }
3125 ev => unreachable!("{ev:?}"),
3126 }
3127 }
3128
3129 #[test]
3130 fn test_insert() {
3131 let local_node_record = rng_record(&mut rand_08::thread_rng());
3132 let mut kbuckets: KBucketsTable<NodeKey, NodeEntry> = KBucketsTable::new(
3133 NodeKey::from(&local_node_record).into(),
3134 Duration::from_secs(60),
3135 MAX_NODES_PER_BUCKET,
3136 None,
3137 None,
3138 );
3139
3140 let new_record = rng_record(&mut rand_08::thread_rng());
3141 let key = kad_key(new_record.id);
3142 match kbuckets.entry(&key) {
3143 kbucket::Entry::Absent(entry) => {
3144 let node = NodeEntry::new(new_record);
3145 let _ = entry.insert(
3146 node,
3147 NodeStatus {
3148 direction: ConnectionDirection::Outgoing,
3149 state: ConnectionState::Disconnected,
3150 },
3151 );
3152 }
3153 _ => {
3154 unreachable!()
3155 }
3156 };
3157 match kbuckets.entry(&key) {
3158 kbucket::Entry::Present(_, _) => {}
3159 _ => {
3160 unreachable!()
3161 }
3162 }
3163 }
3164
3165 #[tokio::test]
3166 async fn test_bootnode_not_in_update_stream() {
3167 reth_tracing::init_test_tracing();
3168 let (_, service_1) = create_discv4().await;
3169 let peerid_1 = *service_1.local_peer_id();
3170
3171 let config = Discv4Config::builder().add_boot_node(service_1.local_node_record).build();
3172 service_1.spawn();
3173
3174 let (_, mut service_2) = create_discv4_with_config(config).await;
3175
3176 let mut updates = service_2.update_stream();
3177
3178 service_2.spawn();
3179
3180 let mut bootnode_appeared = false;
3182 let timeout = tokio::time::sleep(Duration::from_secs(1));
3183 tokio::pin!(timeout);
3184
3185 loop {
3186 tokio::select! {
3187 Some(update) = updates.next() => {
3188 if let DiscoveryUpdate::Added(record) = update
3189 && record.id == peerid_1 {
3190 bootnode_appeared = true;
3191 break;
3192 }
3193 }
3194 _ = &mut timeout => break,
3195 }
3196 }
3197
3198 assert!(bootnode_appeared, "Bootnode should appear in update stream");
3200 }
3201
3202 fn insert_proven_node(service: &mut Discv4Service, record: NodeRecord) {
3203 let key = kad_key(record.id);
3204 let _ = service.kbuckets.insert_or_update(
3205 &key,
3206 NodeEntry::new_proven(record),
3207 NodeStatus {
3208 direction: ConnectionDirection::Incoming,
3209 state: ConnectionState::Connected,
3210 },
3211 );
3212 }
3213
3214 fn insert_initial_ping(service: &mut Discv4Service, record: NodeRecord) -> B256 {
3215 let echo_hash = B256::random();
3216 service.pending_pings.insert(
3217 record.id,
3218 PingRequest {
3219 sent_at: Instant::now(),
3220 node: record,
3221 echo_hash,
3222 reason: PingReason::InitialInsert,
3223 },
3224 );
3225 echo_hash
3226 }
3227
3228 fn make_pong(service: &Discv4Service, echo_hash: B256) -> Pong {
3229 Pong {
3230 to: rng_endpoint(&mut rand_08::thread_rng()),
3231 echo: echo_hash,
3232 expire: service.ping_expiration(),
3233 enr_sq: None,
3234 }
3235 }
3236
3237 #[tokio::test]
3238 async fn test_lookup_reset_on_first_bootnode_pong() {
3239 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
3240 let config = Discv4Config::builder().add_boot_node(record).build();
3241 let (_discv4, mut service) = create_discv4_with_config(config).await;
3242
3243 assert!(service.pending_lookup_reset);
3245
3246 insert_proven_node(&mut service, record);
3248 let echo_hash = insert_initial_ping(&mut service, record);
3249
3250 service.on_pong(make_pong(&service, echo_hash), record.udp_addr(), record.id);
3252
3253 assert!(!service.pending_lookup_reset, "flag should be consumed");
3255 }
3256
3257 #[tokio::test]
3258 async fn test_lookup_reset_fires_only_once() {
3259 let records: Vec<_> = (0..2)
3260 .map(|_| NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random()))
3261 .collect();
3262 let config = Discv4Config::builder().add_boot_nodes(records.clone()).build();
3263 let (_discv4, mut service) = create_discv4_with_config(config).await;
3264
3265 for &r in &records {
3267 insert_proven_node(&mut service, r);
3268 }
3269 let hashes: Vec<_> =
3270 records.iter().map(|r| insert_initial_ping(&mut service, *r)).collect();
3271
3272 service.on_pong(make_pong(&service, hashes[0]), records[0].udp_addr(), records[0].id);
3274 assert!(!service.pending_lookup_reset);
3275
3276 service.on_pong(make_pong(&service, hashes[1]), records[1].udp_addr(), records[1].id);
3278 assert!(!service.pending_lookup_reset);
3279 }
3280
3281 #[tokio::test]
3282 async fn test_lookup_reset_not_triggered_by_non_bootnode() {
3283 let bootnode = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
3284 let config = Discv4Config::builder().add_boot_node(bootnode).build();
3285 let (_discv4, mut service) = create_discv4_with_config(config).await;
3286
3287 assert!(service.pending_lookup_reset);
3288
3289 let stranger = NodeRecord::new("0.0.0.0:0".parse().unwrap(), PeerId::random());
3291 insert_proven_node(&mut service, stranger);
3292 let echo_hash = insert_initial_ping(&mut service, stranger);
3293 service.on_pong(make_pong(&service, echo_hash), stranger.udp_addr(), stranger.id);
3294
3295 assert!(service.pending_lookup_reset, "flag should not be consumed by non-bootnode");
3296 }
3297
3298 #[tokio::test]
3299 async fn test_lookup_reset_disabled_when_lookup_disabled() {
3300 let config = Discv4Config::builder().enable_lookup(false).build();
3301 let (_discv4, service) = create_discv4_with_config(config).await;
3302
3303 assert!(!service.pending_lookup_reset);
3305 }
3306}