1#![doc(
20 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
21 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
22 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
23)]
24#![cfg_attr(not(test), warn(unused_crate_dependencies))]
25#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
26
27use crate::{
28 error::{DecodePacketError, Discv4Error},
29 proto::{FindNode, Message, Neighbours, Packet, Ping, Pong},
30};
31use alloy_primitives::{bytes::Bytes, hex, B256};
32use discv5::{
33 kbucket,
34 kbucket::{
35 BucketInsertResult, Distance, Entry as BucketEntry, InsertResult, KBucketsTable,
36 NodeStatus, MAX_NODES_PER_BUCKET,
37 },
38 ConnectionDirection, ConnectionState,
39};
40use enr::Enr;
41use itertools::Itertools;
42use parking_lot::Mutex;
43use proto::{EnrRequest, EnrResponse};
44use reth_ethereum_forks::ForkId;
45use reth_network_peers::{pk2id, PeerId};
46use secp256k1::SecretKey;
47use std::{
48 cell::RefCell,
49 collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque},
50 fmt,
51 future::poll_fn,
52 io,
53 net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
54 pin::Pin,
55 rc::Rc,
56 sync::Arc,
57 task::{ready, Context, Poll},
58 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
59};
60use tokio::{
61 net::UdpSocket,
62 sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::Sender as OneshotSender},
63 task::{JoinHandle, JoinSet},
64 time::Interval,
65};
66use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
67use tracing::{debug, trace};
68
69pub mod error;
70pub mod proto;
71
72mod config;
73pub use config::{Discv4Config, Discv4ConfigBuilder};
74
75mod node;
76use node::{kad_key, NodeKey};
77
78mod table;
79
80pub use reth_network_peers::NodeRecord;
82
83#[cfg(any(test, feature = "test-utils"))]
84pub mod test_utils;
85
86use crate::table::PongTable;
87use reth_net_nat::ResolveNatInterval;
88pub use reth_net_nat::{external_ip, NatResolver};
90
91pub const DEFAULT_DISCOVERY_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
95
96pub const DEFAULT_DISCOVERY_PORT: u16 = 30303;
100
101pub const DEFAULT_DISCOVERY_ADDRESS: SocketAddr =
105 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT));
106
107const MAX_PACKET_SIZE: usize = 1280;
109
110const MIN_PACKET_SIZE: usize = 32 + 65 + 1;
112
113const ALPHA: usize = 3;
115
116const MAX_NODES_PING: usize = 2 * MAX_NODES_PER_BUCKET;
121
122const MAX_QUEUED_PINGS: usize = 2 * MAX_NODES_PER_BUCKET;
130
131const SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS: usize = (MAX_PACKET_SIZE - 109) / 91;
137
138const ENDPOINT_PROOF_EXPIRATION: Duration = Duration::from_secs(24 * 60 * 60);
142
143const EXPIRE_DURATION: Duration = Duration::from_secs(60 * 60);
145
146const UDP_MESSAGE_POLL_LOOP_BUDGET: i32 = 4;
151
152type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>;
153type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>;
154
155pub(crate) type IngressSender = mpsc::Sender<IngressEvent>;
156pub(crate) type IngressReceiver = mpsc::Receiver<IngressEvent>;
157
158type NodeRecordSender = OneshotSender<Vec<NodeRecord>>;
159
160#[derive(Debug, Clone)]
167pub struct Discv4 {
168 local_addr: SocketAddr,
170 to_service: mpsc::UnboundedSender<Discv4Command>,
172 node_record: Arc<Mutex<NodeRecord>>,
176}
177
178impl Discv4 {
179 pub async fn spawn(
183 local_address: SocketAddr,
184 local_enr: NodeRecord,
185 secret_key: SecretKey,
186 config: Discv4Config,
187 ) -> io::Result<Self> {
188 let (discv4, service) = Self::bind(local_address, local_enr, secret_key, config).await?;
189
190 service.spawn();
191
192 Ok(discv4)
193 }
194
195 #[cfg(feature = "test-utils")]
199 pub fn noop() -> Self {
200 let (to_service, _rx) = mpsc::unbounded_channel();
201 let local_addr =
202 (IpAddr::from(std::net::Ipv4Addr::UNSPECIFIED), DEFAULT_DISCOVERY_PORT).into();
203 Self {
204 local_addr,
205 to_service,
206 node_record: Arc::new(Mutex::new(NodeRecord::new(
207 "127.0.0.1:3030".parse().unwrap(),
208 PeerId::random(),
209 ))),
210 }
211 }
212
213 pub async fn bind(
247 local_address: SocketAddr,
248 mut local_node_record: NodeRecord,
249 secret_key: SecretKey,
250 config: Discv4Config,
251 ) -> io::Result<(Self, Discv4Service)> {
252 let socket = UdpSocket::bind(local_address).await?;
253 let local_addr = socket.local_addr()?;
254 local_node_record.udp_port = local_addr.port();
255 trace!(target: "discv4", ?local_addr,"opened UDP socket");
256
257 let service = Discv4Service::new(socket, local_addr, local_node_record, secret_key, config);
258 let discv4 = service.handle();
259 Ok((discv4, service))
260 }
261
262 pub const fn local_addr(&self) -> SocketAddr {
264 self.local_addr
265 }
266
267 pub fn node_record(&self) -> NodeRecord {
271 *self.node_record.lock()
272 }
273
274 pub fn external_ip(&self) -> IpAddr {
276 self.node_record.lock().address
277 }
278
279 pub fn set_lookup_interval(&self, duration: Duration) {
281 self.send_to_service(Discv4Command::SetLookupInterval(duration))
282 }
283
284 pub async fn lookup_self(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
299 self.lookup_node(None).await
300 }
301
302 pub async fn lookup(&self, node_id: PeerId) -> Result<Vec<NodeRecord>, Discv4Error> {
306 self.lookup_node(Some(node_id)).await
307 }
308
309 pub async fn lookup_random(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
311 let target = PeerId::random();
312 self.lookup_node(Some(target)).await
313 }
314
315 pub fn send_lookup(&self, node_id: PeerId) {
317 let cmd = Discv4Command::Lookup { node_id: Some(node_id), tx: None };
318 self.send_to_service(cmd);
319 }
320
321 async fn lookup_node(&self, node_id: Option<PeerId>) -> Result<Vec<NodeRecord>, Discv4Error> {
322 let (tx, rx) = oneshot::channel();
323 let cmd = Discv4Command::Lookup { node_id, tx: Some(tx) };
324 self.to_service.send(cmd)?;
325 Ok(rx.await?)
326 }
327
328 pub fn send_lookup_self(&self) {
330 let cmd = Discv4Command::Lookup { node_id: None, tx: None };
331 self.send_to_service(cmd);
332 }
333
334 pub fn remove_peer(&self, node_id: PeerId) {
336 let cmd = Discv4Command::Remove(node_id);
337 self.send_to_service(cmd);
338 }
339
340 pub fn add_node(&self, node_record: NodeRecord) {
342 let cmd = Discv4Command::Add(node_record);
343 self.send_to_service(cmd);
344 }
345
346 pub fn ban(&self, node_id: PeerId, ip: IpAddr) {
350 let cmd = Discv4Command::Ban(node_id, ip);
351 self.send_to_service(cmd);
352 }
353
354 pub fn ban_ip(&self, ip: IpAddr) {
358 let cmd = Discv4Command::BanIp(ip);
359 self.send_to_service(cmd);
360 }
361
362 pub fn ban_node(&self, node_id: PeerId) {
366 let cmd = Discv4Command::BanPeer(node_id);
367 self.send_to_service(cmd);
368 }
369
370 pub fn set_tcp_port(&self, port: u16) {
374 let cmd = Discv4Command::SetTcpPort(port);
375 self.send_to_service(cmd);
376 }
377
378 pub fn set_eip868_rlp_pair(&self, key: Vec<u8>, rlp: Bytes) {
384 let cmd = Discv4Command::SetEIP868RLPPair { key, rlp };
385 self.send_to_service(cmd);
386 }
387
388 pub fn set_eip868_rlp(&self, key: Vec<u8>, value: impl alloy_rlp::Encodable) {
392 self.set_eip868_rlp_pair(key, Bytes::from(alloy_rlp::encode(&value)))
393 }
394
395 #[inline]
396 fn send_to_service(&self, cmd: Discv4Command) {
397 let _ = self.to_service.send(cmd).map_err(|err| {
398 debug!(
399 target: "discv4",
400 %err,
401 "channel capacity reached, dropping command",
402 )
403 });
404 }
405
406 pub async fn update_stream(&self) -> Result<ReceiverStream<DiscoveryUpdate>, Discv4Error> {
408 let (tx, rx) = oneshot::channel();
409 let cmd = Discv4Command::Updates(tx);
410 self.to_service.send(cmd)?;
411 Ok(rx.await?)
412 }
413
414 pub fn terminate(&self) {
416 self.send_to_service(Discv4Command::Terminated);
417 }
418}
419
420#[must_use = "Stream does nothing unless polled"]
434pub struct Discv4Service {
435 local_address: SocketAddr,
437 local_eip_868_enr: Enr<SecretKey>,
439 local_node_record: NodeRecord,
441 shared_node_record: Arc<Mutex<NodeRecord>>,
443 secret_key: SecretKey,
445 _socket: Arc<UdpSocket>,
447 _tasks: JoinSet<()>,
451 kbuckets: KBucketsTable<NodeKey, NodeEntry>,
453 ingress: IngressReceiver,
457 egress: EgressSender,
461 queued_pings: VecDeque<(NodeRecord, PingReason)>,
468 pending_pings: HashMap<PeerId, PingRequest>,
470 pending_lookup: HashMap<PeerId, (Instant, LookupContext)>,
475 pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
477 pending_enr_requests: HashMap<PeerId, EnrRequestState>,
479 to_service: mpsc::UnboundedSender<Discv4Command>,
481 commands_rx: mpsc::UnboundedReceiver<Discv4Command>,
483 update_listeners: Vec<mpsc::Sender<DiscoveryUpdate>>,
485 lookup_interval: Interval,
487 lookup_rotator: LookupTargetRotator,
489 evict_expired_requests_interval: Interval,
491 ping_interval: Interval,
493 resolve_external_ip_interval: Option<ResolveNatInterval>,
495 config: Discv4Config,
497 queued_events: VecDeque<Discv4Event>,
499 received_pongs: PongTable,
501 expire_interval: Interval,
503}
504
505impl Discv4Service {
506 pub(crate) fn new(
508 socket: UdpSocket,
509 local_address: SocketAddr,
510 local_node_record: NodeRecord,
511 secret_key: SecretKey,
512 config: Discv4Config,
513 ) -> Self {
514 let socket = Arc::new(socket);
515 let (ingress_tx, ingress_rx) = mpsc::channel(config.udp_ingress_message_buffer);
516 let (egress_tx, egress_rx) = mpsc::channel(config.udp_egress_message_buffer);
517 let mut tasks = JoinSet::<()>::new();
518
519 let udp = Arc::clone(&socket);
520 tasks.spawn(receive_loop(udp, ingress_tx, local_node_record.id));
521
522 let udp = Arc::clone(&socket);
523 tasks.spawn(send_loop(udp, egress_rx));
524
525 let kbuckets = KBucketsTable::new(
526 NodeKey::from(&local_node_record).into(),
527 Duration::from_secs(60),
528 MAX_NODES_PER_BUCKET,
529 None,
530 None,
531 );
532
533 let self_lookup_interval = tokio::time::interval(config.lookup_interval);
534
535 let ping_interval = tokio::time::interval_at(
538 tokio::time::Instant::now() + config.ping_interval,
539 config.ping_interval,
540 );
541
542 let evict_expired_requests_interval = tokio::time::interval_at(
543 tokio::time::Instant::now() + config.request_timeout,
544 config.request_timeout,
545 );
546
547 let lookup_rotator = if config.enable_dht_random_walk {
548 LookupTargetRotator::default()
549 } else {
550 LookupTargetRotator::local_only()
551 };
552
553 let local_eip_868_enr = {
555 let mut builder = Enr::builder();
556 builder.ip(local_node_record.address);
557 if local_node_record.address.is_ipv4() {
558 builder.udp4(local_node_record.udp_port);
559 builder.tcp4(local_node_record.tcp_port);
560 } else {
561 builder.udp6(local_node_record.udp_port);
562 builder.tcp6(local_node_record.tcp_port);
563 }
564
565 for (key, val) in &config.additional_eip868_rlp_pairs {
566 builder.add_value_rlp(key, val.clone());
567 }
568 builder.build(&secret_key).expect("v4 is set")
569 };
570
571 let (to_service, commands_rx) = mpsc::unbounded_channel();
572
573 let shared_node_record = Arc::new(Mutex::new(local_node_record));
574
575 Self {
576 local_address,
577 local_eip_868_enr,
578 local_node_record,
579 shared_node_record,
580 _socket: socket,
581 kbuckets,
582 secret_key,
583 _tasks: tasks,
584 ingress: ingress_rx,
585 egress: egress_tx,
586 queued_pings: VecDeque::with_capacity(MAX_QUEUED_PINGS),
587 pending_pings: Default::default(),
588 pending_lookup: Default::default(),
589 pending_find_nodes: Default::default(),
590 pending_enr_requests: Default::default(),
591 commands_rx,
592 to_service,
593 update_listeners: Vec::with_capacity(1),
594 lookup_interval: self_lookup_interval,
595 ping_interval,
596 evict_expired_requests_interval,
597 lookup_rotator,
598 resolve_external_ip_interval: config.resolve_external_ip_interval(),
599 config,
600 queued_events: Default::default(),
601 received_pongs: Default::default(),
602 expire_interval: tokio::time::interval(EXPIRE_DURATION),
603 }
604 }
605
606 pub fn handle(&self) -> Discv4 {
608 Discv4 {
609 local_addr: self.local_address,
610 to_service: self.to_service.clone(),
611 node_record: self.shared_node_record.clone(),
612 }
613 }
614
615 fn enr_seq(&self) -> Option<u64> {
617 self.config.enable_eip868.then(|| self.local_eip_868_enr.seq())
618 }
619
620 pub fn set_lookup_interval(&mut self, duration: Duration) {
622 self.lookup_interval = tokio::time::interval(duration);
623 }
624
625 pub fn set_external_ip_addr(&mut self, external_ip: IpAddr) {
628 if self.local_node_record.address != external_ip {
629 debug!(target: "discv4", ?external_ip, "Updating external ip");
630 self.local_node_record.address = external_ip;
631 let _ = self.local_eip_868_enr.set_ip(external_ip, &self.secret_key);
632 let mut lock = self.shared_node_record.lock();
633 *lock = self.local_node_record;
634 debug!(target: "discv4", enr=?self.local_eip_868_enr, "Updated local ENR");
635 }
636 }
637
638 pub const fn local_peer_id(&self) -> &PeerId {
640 &self.local_node_record.id
641 }
642
643 pub const fn local_addr(&self) -> SocketAddr {
645 self.local_address
646 }
647
648 pub const fn local_enr(&self) -> NodeRecord {
652 self.local_node_record
653 }
654
655 #[cfg(test)]
657 pub fn local_enr_mut(&mut self) -> &mut NodeRecord {
658 &mut self.local_node_record
659 }
660
661 pub fn contains_node(&self, id: PeerId) -> bool {
663 let key = kad_key(id);
664 self.kbuckets.get_index(&key).is_some()
665 }
666
667 pub fn bootstrap(&mut self) {
682 for record in self.config.bootstrap_nodes.clone() {
683 debug!(target: "discv4", ?record, "pinging boot node");
684 let key = kad_key(record.id);
685 let entry = NodeEntry::new(record);
686
687 match self.kbuckets.insert_or_update(
689 &key,
690 entry,
691 NodeStatus {
692 state: ConnectionState::Disconnected,
693 direction: ConnectionDirection::Outgoing,
694 },
695 ) {
696 InsertResult::Failed(_) => {}
697 _ => {
698 self.try_ping(record, PingReason::InitialInsert);
699 }
700 }
701 }
702 }
703
704 pub fn spawn(mut self) -> JoinHandle<()> {
708 tokio::task::spawn(async move {
709 self.bootstrap();
710
711 while let Some(event) = self.next().await {
712 trace!(target: "discv4", ?event, "processed");
713 }
714 trace!(target: "discv4", "service terminated");
715 })
716 }
717
718 pub fn update_stream(&mut self) -> ReceiverStream<DiscoveryUpdate> {
720 let (tx, rx) = mpsc::channel(512);
721 self.update_listeners.push(tx);
722 ReceiverStream::new(rx)
723 }
724
725 pub fn lookup_self(&mut self) {
727 self.lookup(self.local_node_record.id)
728 }
729
730 pub fn lookup(&mut self, target: PeerId) {
740 self.lookup_with(target, None)
741 }
742
743 fn lookup_with(&mut self, target: PeerId, tx: Option<NodeRecordSender>) {
753 trace!(target: "discv4", ?target, "Starting lookup");
754 let target_key = kad_key(target);
755
756 let ctx = LookupContext::new(
759 target_key.clone(),
760 self.kbuckets
761 .closest_values(&target_key)
762 .filter(|node| {
763 node.value.has_endpoint_proof &&
764 !self.pending_find_nodes.contains_key(&node.key.preimage().0)
765 })
766 .take(MAX_NODES_PER_BUCKET)
767 .map(|n| (target_key.distance(&n.key), n.value.record)),
768 tx,
769 );
770
771 let closest = ctx.closest(ALPHA);
773
774 if closest.is_empty() && self.pending_find_nodes.is_empty() {
775 self.bootstrap();
780 return
781 }
782
783 trace!(target: "discv4", ?target, num = closest.len(), "Start lookup closest nodes");
784
785 for node in closest {
786 self.find_node_checked(&node, ctx.clone());
790 }
791 }
792
793 fn find_node(&mut self, node: &NodeRecord, ctx: LookupContext) {
797 trace!(target: "discv4", ?node, lookup=?ctx.target(), "Sending FindNode");
798 ctx.mark_queried(node.id);
799 let id = ctx.target();
800 let msg = Message::FindNode(FindNode { id, expire: self.find_node_expiration() });
801 self.send_packet(msg, node.udp_addr());
802 self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx));
803 }
804
805 fn find_node_checked(&mut self, node: &NodeRecord, ctx: LookupContext) {
810 let max_failures = self.config.max_find_node_failures;
811 let needs_ping = self
812 .on_entry(node.id, |entry| entry.exceeds_find_node_failures(max_failures))
813 .unwrap_or(true);
814 if needs_ping {
815 self.try_ping(*node, PingReason::Lookup(*node, ctx))
816 } else {
817 self.find_node(node, ctx)
818 }
819 }
820
821 fn notify(&mut self, update: DiscoveryUpdate) {
825 self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) {
826 Ok(()) => true,
827 Err(err) => match err {
828 TrySendError::Full(_) => true,
829 TrySendError::Closed(_) => false,
830 },
831 });
832 }
833
834 pub fn ban_ip(&mut self, ip: IpAddr) {
836 self.config.ban_list.ban_ip(ip);
837 }
838
839 pub fn ban_node(&mut self, node_id: PeerId) {
841 self.remove_node(node_id);
842 self.config.ban_list.ban_peer(node_id);
843 }
844
845 pub fn ban_ip_until(&mut self, ip: IpAddr, until: Instant) {
847 self.config.ban_list.ban_ip_until(ip, until);
848 }
849
850 pub fn ban_node_until(&mut self, node_id: PeerId, until: Instant) {
852 self.remove_node(node_id);
853 self.config.ban_list.ban_peer_until(node_id, until);
854 }
855
856 pub fn remove_node(&mut self, node_id: PeerId) -> bool {
861 let key = kad_key(node_id);
862 self.remove_key(node_id, key)
863 }
864
865 pub fn soft_remove_node(&mut self, node_id: PeerId) -> bool {
870 let key = kad_key(node_id);
871 let Some(bucket) = self.kbuckets.get_bucket(&key) else { return false };
872 if bucket.num_entries() < MAX_NODES_PER_BUCKET / 2 {
873 return false
875 }
876 self.remove_key(node_id, key)
877 }
878
879 fn remove_key(&mut self, node_id: PeerId, key: discv5::Key<NodeKey>) -> bool {
880 let removed = self.kbuckets.remove(&key);
881 if removed {
882 trace!(target: "discv4", ?node_id, "removed node");
883 self.notify(DiscoveryUpdate::Removed(node_id));
884 }
885 removed
886 }
887
888 pub fn num_connected(&self) -> usize {
890 self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected())
891 }
892
893 fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool {
895 if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) {
896 if timestamp.elapsed() < self.config.bond_expiration {
897 return true
898 }
899 }
900 false
901 }
902
903 fn on_entry<F, R>(&mut self, peer_id: PeerId, f: F) -> Option<R>
905 where
906 F: FnOnce(&NodeEntry) -> R,
907 {
908 let key = kad_key(peer_id);
909 match self.kbuckets.entry(&key) {
910 BucketEntry::Present(entry, _) => Some(f(entry.value())),
911 BucketEntry::Pending(mut entry, _) => Some(f(entry.value())),
912 _ => None,
913 }
914 }
915
916 fn update_on_reping(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
923 if record.id == self.local_node_record.id {
924 return
925 }
926
927 if !self.config.enable_eip868 {
929 last_enr_seq = None;
930 }
931
932 let key = kad_key(record.id);
933 let old_enr = match self.kbuckets.entry(&key) {
934 kbucket::Entry::Present(mut entry, _) => {
935 entry.value_mut().update_with_enr(last_enr_seq)
936 }
937 kbucket::Entry::Pending(mut entry, _) => entry.value().update_with_enr(last_enr_seq),
938 _ => return,
939 };
940
941 match (last_enr_seq, old_enr) {
943 (Some(new), Some(old)) => {
944 if new > old {
945 self.send_enr_request(record);
946 }
947 }
948 (Some(_), None) => {
949 self.send_enr_request(record);
951 }
952 _ => {}
953 };
954 }
955
956 fn update_on_pong(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
958 if record.id == *self.local_peer_id() {
959 return
960 }
961
962 if !self.config.enable_eip868 {
964 last_enr_seq = None;
965 }
966
967 let has_enr_seq = last_enr_seq.is_some();
970
971 let key = kad_key(record.id);
972 match self.kbuckets.entry(&key) {
973 kbucket::Entry::Present(mut entry, old_status) => {
974 entry.value_mut().establish_proof();
976 entry.value_mut().update_with_enr(last_enr_seq);
977
978 if !old_status.is_connected() {
979 let _ = entry.update(ConnectionState::Connected, Some(old_status.direction));
980 trace!(target: "discv4", ?record, "added after successful endpoint proof");
981 self.notify(DiscoveryUpdate::Added(record));
982
983 if has_enr_seq {
984 self.send_enr_request(record);
986 }
987 }
988 }
989 kbucket::Entry::Pending(mut entry, mut status) => {
990 entry.value().establish_proof();
992 entry.value().update_with_enr(last_enr_seq);
993
994 if !status.is_connected() {
995 status.state = ConnectionState::Connected;
996 let _ = entry.update(status);
997 trace!(target: "discv4", ?record, "added after successful endpoint proof");
998 self.notify(DiscoveryUpdate::Added(record));
999
1000 if has_enr_seq {
1001 self.send_enr_request(record);
1003 }
1004 }
1005 }
1006 _ => {}
1007 };
1008 }
1009
1010 pub fn add_all_nodes(&mut self, records: impl IntoIterator<Item = NodeRecord>) {
1014 for record in records {
1015 self.add_node(record);
1016 }
1017 }
1018
1019 pub fn add_node(&mut self, record: NodeRecord) -> bool {
1025 let key = kad_key(record.id);
1026 match self.kbuckets.entry(&key) {
1027 kbucket::Entry::Absent(entry) => {
1028 let node = NodeEntry::new(record);
1029 match entry.insert(
1030 node,
1031 NodeStatus {
1032 direction: ConnectionDirection::Outgoing,
1033 state: ConnectionState::Disconnected,
1034 },
1035 ) {
1036 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1037 trace!(target: "discv4", ?record, "inserted new record");
1038 }
1039 _ => return false,
1040 }
1041 }
1042 _ => return false,
1043 }
1044
1045 self.try_ping(record, PingReason::InitialInsert);
1047 true
1048 }
1049
1050 pub(crate) fn send_packet(&self, msg: Message, to: SocketAddr) -> B256 {
1052 let (payload, hash) = msg.encode(&self.secret_key);
1053 trace!(target: "discv4", r#type=?msg.msg_type(), ?to, ?hash, "sending packet");
1054 let _ = self.egress.try_send((payload, to)).map_err(|err| {
1055 debug!(
1056 target: "discv4",
1057 %err,
1058 "dropped outgoing packet",
1059 );
1060 });
1061 hash
1062 }
1063
1064 fn on_ping(&mut self, ping: Ping, remote_addr: SocketAddr, remote_id: PeerId, hash: B256) {
1066 if self.is_expired(ping.expire) {
1067 return
1069 }
1070
1071 let record = NodeRecord {
1073 address: remote_addr.ip(),
1074 udp_port: remote_addr.port(),
1075 tcp_port: ping.from.tcp_port,
1076 id: remote_id,
1077 }
1078 .into_ipv4_mapped();
1079
1080 let key = kad_key(record.id);
1081
1082 let mut is_new_insert = false;
1089 let mut needs_bond = false;
1090 let mut is_proven = false;
1091
1092 let old_enr = match self.kbuckets.entry(&key) {
1093 kbucket::Entry::Present(mut entry, _) => {
1094 if entry.value().is_expired() {
1095 needs_bond = true;
1098 } else {
1099 is_proven = entry.value().has_endpoint_proof;
1100 }
1101 entry.value_mut().update_with_enr(ping.enr_sq)
1102 }
1103 kbucket::Entry::Pending(mut entry, _) => {
1104 if entry.value().is_expired() {
1105 needs_bond = true;
1108 } else {
1109 is_proven = entry.value().has_endpoint_proof;
1110 }
1111 entry.value().update_with_enr(ping.enr_sq)
1112 }
1113 kbucket::Entry::Absent(entry) => {
1114 let mut node = NodeEntry::new(record);
1115 node.last_enr_seq = ping.enr_sq;
1116
1117 match entry.insert(
1118 node,
1119 NodeStatus {
1120 direction: ConnectionDirection::Incoming,
1121 state: ConnectionState::Disconnected,
1123 },
1124 ) {
1125 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1126 is_new_insert = true;
1128 }
1129 BucketInsertResult::Full => {
1130 trace!(target: "discv4", ?record, "discovered new record but bucket is full");
1134 self.notify(DiscoveryUpdate::DiscoveredAtCapacity(record));
1135 needs_bond = true;
1136 }
1137 BucketInsertResult::TooManyIncoming | BucketInsertResult::NodeExists => {
1138 needs_bond = true;
1139 }
1141 BucketInsertResult::FailedFilter => return,
1142 }
1143
1144 None
1145 }
1146 kbucket::Entry::SelfEntry => return,
1147 };
1148
1149 let pong = Message::Pong(Pong {
1152 to: record.into(),
1154 echo: hash,
1155 expire: ping.expire,
1156 enr_sq: self.enr_seq(),
1157 });
1158 self.send_packet(pong, remote_addr);
1159
1160 if is_new_insert {
1162 self.try_ping(record, PingReason::InitialInsert);
1163 } else if needs_bond {
1164 self.try_ping(record, PingReason::EstablishBond);
1165 } else if is_proven {
1166 if let Some((_, ctx)) = self.pending_lookup.remove(&record.id) {
1170 if self.pending_find_nodes.contains_key(&record.id) {
1171 ctx.unmark_queried(record.id);
1174 } else {
1175 self.find_node(&record, ctx);
1178 }
1179 }
1180 } else {
1181 match (ping.enr_sq, old_enr) {
1183 (Some(new), Some(old)) => {
1184 if new > old {
1185 self.send_enr_request(record);
1186 }
1187 }
1188 (Some(_), None) => {
1189 self.send_enr_request(record);
1190 }
1191 _ => {}
1192 };
1193 }
1194 }
1195
1196 fn try_ping(&mut self, node: NodeRecord, reason: PingReason) {
1198 if node.id == *self.local_peer_id() {
1199 return
1201 }
1202
1203 if self.pending_pings.contains_key(&node.id) ||
1204 self.pending_find_nodes.contains_key(&node.id)
1205 {
1206 return
1207 }
1208
1209 if self.queued_pings.iter().any(|(n, _)| n.id == node.id) {
1210 return
1211 }
1212
1213 if self.pending_pings.len() < MAX_NODES_PING {
1214 self.send_ping(node, reason);
1215 } else if self.queued_pings.len() < MAX_QUEUED_PINGS {
1216 self.queued_pings.push_back((node, reason));
1217 }
1218 }
1219
1220 pub(crate) fn send_ping(&mut self, node: NodeRecord, reason: PingReason) -> B256 {
1224 let remote_addr = node.udp_addr();
1225 let id = node.id;
1226 let ping = Ping {
1227 from: self.local_node_record.into(),
1228 to: node.into(),
1229 expire: self.ping_expiration(),
1230 enr_sq: self.enr_seq(),
1231 };
1232 trace!(target: "discv4", ?ping, "sending ping");
1233 let echo_hash = self.send_packet(Message::Ping(ping), remote_addr);
1234
1235 self.pending_pings
1236 .insert(id, PingRequest { sent_at: Instant::now(), node, echo_hash, reason });
1237 echo_hash
1238 }
1239
1240 pub(crate) fn send_enr_request(&mut self, node: NodeRecord) {
1244 if !self.config.enable_eip868 {
1245 return
1246 }
1247 let remote_addr = node.udp_addr();
1248 let enr_request = EnrRequest { expire: self.enr_request_expiration() };
1249
1250 trace!(target: "discv4", ?enr_request, "sending enr request");
1251 let echo_hash = self.send_packet(Message::EnrRequest(enr_request), remote_addr);
1252
1253 self.pending_enr_requests
1254 .insert(node.id, EnrRequestState { sent_at: Instant::now(), echo_hash });
1255 }
1256
1257 fn on_pong(&mut self, pong: Pong, remote_addr: SocketAddr, remote_id: PeerId) {
1259 if self.is_expired(pong.expire) {
1260 return
1261 }
1262
1263 let PingRequest { node, reason, .. } = match self.pending_pings.entry(remote_id) {
1264 Entry::Occupied(entry) => {
1265 {
1266 let request = entry.get();
1267 if request.echo_hash != pong.echo {
1268 trace!(target: "discv4", from=?remote_addr, expected=?request.echo_hash, echo_hash=?pong.echo,"Got unexpected Pong");
1269 return
1270 }
1271 }
1272 entry.remove()
1273 }
1274 Entry::Vacant(_) => return,
1275 };
1276
1277 self.received_pongs.on_pong(remote_id, remote_addr.ip());
1279
1280 match reason {
1281 PingReason::InitialInsert => {
1282 self.update_on_pong(node, pong.enr_sq);
1283 }
1284 PingReason::EstablishBond => {
1285 self.update_on_pong(node, pong.enr_sq);
1287 }
1288 PingReason::RePing => {
1289 self.update_on_reping(node, pong.enr_sq);
1290 }
1291 PingReason::Lookup(node, ctx) => {
1292 self.update_on_pong(node, pong.enr_sq);
1293 self.pending_lookup.insert(node.id, (Instant::now(), ctx));
1298 }
1299 }
1300 }
1301
1302 fn on_find_node(&mut self, msg: FindNode, remote_addr: SocketAddr, node_id: PeerId) {
1304 if self.is_expired(msg.expire) {
1305 return
1307 }
1308 if node_id == *self.local_peer_id() {
1309 return
1311 }
1312
1313 if self.has_bond(node_id, remote_addr.ip()) {
1314 self.respond_closest(msg.id, remote_addr)
1315 }
1316 }
1317
1318 fn on_enr_response(&mut self, msg: EnrResponse, remote_addr: SocketAddr, id: PeerId) {
1320 trace!(target: "discv4", ?remote_addr, ?msg, "received ENR response");
1321 if let Some(resp) = self.pending_enr_requests.remove(&id) {
1322 let enr_id = pk2id(&msg.enr.public_key());
1324 if id != enr_id {
1325 return
1326 }
1327
1328 if resp.echo_hash == msg.request_hash {
1329 let key = kad_key(id);
1330 let fork_id = msg.eth_fork_id();
1331 let (record, old_fork_id) = match self.kbuckets.entry(&key) {
1332 kbucket::Entry::Present(mut entry, _) => {
1333 let id = entry.value_mut().update_with_fork_id(fork_id);
1334 (entry.value().record, id)
1335 }
1336 kbucket::Entry::Pending(mut entry, _) => {
1337 let id = entry.value().update_with_fork_id(fork_id);
1338 (entry.value().record, id)
1339 }
1340 _ => return,
1341 };
1342 match (fork_id, old_fork_id) {
1343 (Some(new), Some(old)) => {
1344 if new != old {
1345 self.notify(DiscoveryUpdate::EnrForkId(record, new))
1346 }
1347 }
1348 (Some(new), None) => self.notify(DiscoveryUpdate::EnrForkId(record, new)),
1349 _ => {}
1350 }
1351 }
1352 }
1353 }
1354
1355 fn on_enr_request(
1357 &self,
1358 msg: EnrRequest,
1359 remote_addr: SocketAddr,
1360 id: PeerId,
1361 request_hash: B256,
1362 ) {
1363 if !self.config.enable_eip868 || self.is_expired(msg.expire) {
1364 return
1365 }
1366
1367 if self.has_bond(id, remote_addr.ip()) {
1368 self.send_packet(
1369 Message::EnrResponse(EnrResponse {
1370 request_hash,
1371 enr: self.local_eip_868_enr.clone(),
1372 }),
1373 remote_addr,
1374 );
1375 }
1376 }
1377
1378 fn on_neighbours(&mut self, msg: Neighbours, remote_addr: SocketAddr, node_id: PeerId) {
1381 if self.is_expired(msg.expire) {
1382 return
1384 }
1385 let ctx = match self.pending_find_nodes.entry(node_id) {
1387 Entry::Occupied(mut entry) => {
1388 {
1389 let request = entry.get_mut();
1390 request.answered = true;
1392 let total = request.response_count + msg.nodes.len();
1393
1394 if total <= MAX_NODES_PER_BUCKET {
1396 request.response_count = total;
1397 } else {
1398 trace!(target: "discv4", total, from=?remote_addr, "Received neighbors packet entries exceeds max nodes per bucket");
1399 return
1400 }
1401 };
1402
1403 if entry.get().response_count == MAX_NODES_PER_BUCKET {
1404 let ctx = entry.remove().lookup_context;
1406 ctx.mark_responded(node_id);
1407 ctx
1408 } else {
1409 entry.get().lookup_context.clone()
1410 }
1411 }
1412 Entry::Vacant(_) => {
1413 trace!(target: "discv4", from=?remote_addr, "Received unsolicited Neighbours");
1415 return
1416 }
1417 };
1418
1419 trace!(target: "discv4",
1421 target=format!("{:#?}", node_id),
1422 peers_count=msg.nodes.len(),
1423 peers=format!("[{:#}]", msg.nodes.iter()
1424 .map(|node_rec| node_rec.id
1425 ).format(", ")),
1426 "Received peers from Neighbours packet"
1427 );
1428
1429 for node in msg.nodes.into_iter().map(NodeRecord::into_ipv4_mapped) {
1432 if self.config.ban_list.is_banned(&node.id, &node.address) {
1434 trace!(target: "discv4", peer_id=?node.id, ip=?node.address, "ignoring banned record");
1435 continue
1436 }
1437
1438 ctx.add_node(node);
1439 }
1440
1441 let closest =
1443 ctx.filter_closest(ALPHA, |node| !self.pending_find_nodes.contains_key(&node.id));
1444
1445 for closest in closest {
1446 let key = kad_key(closest.id);
1447 match self.kbuckets.entry(&key) {
1448 BucketEntry::Absent(entry) => {
1449 ctx.mark_queried(closest.id);
1455 let node = NodeEntry::new(closest);
1456 match entry.insert(
1457 node,
1458 NodeStatus {
1459 direction: ConnectionDirection::Outgoing,
1460 state: ConnectionState::Disconnected,
1461 },
1462 ) {
1463 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1464 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1466 }
1467 BucketInsertResult::Full => {
1468 self.notify(DiscoveryUpdate::DiscoveredAtCapacity(closest))
1470 }
1471 _ => {}
1472 }
1473 }
1474 BucketEntry::SelfEntry => {
1475 }
1477 BucketEntry::Present(entry, _) => {
1478 if entry.value().has_endpoint_proof {
1479 if entry
1480 .value()
1481 .exceeds_find_node_failures(self.config.max_find_node_failures)
1482 {
1483 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1484 } else {
1485 self.find_node(&closest, ctx.clone());
1486 }
1487 }
1488 }
1489 BucketEntry::Pending(mut entry, _) => {
1490 if entry.value().has_endpoint_proof {
1491 if entry
1492 .value()
1493 .exceeds_find_node_failures(self.config.max_find_node_failures)
1494 {
1495 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1496 } else {
1497 self.find_node(&closest, ctx.clone());
1498 }
1499 }
1500 }
1501 }
1502 }
1503 }
1504
1505 fn respond_closest(&mut self, target: PeerId, to: SocketAddr) {
1507 let key = kad_key(target);
1508 let expire = self.send_neighbours_expiration();
1509
1510 let closest_nodes =
1512 self.kbuckets.closest_values(&key).take(MAX_NODES_PER_BUCKET).collect::<Vec<_>>();
1513
1514 for nodes in closest_nodes.chunks(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) {
1515 let nodes = nodes.iter().map(|node| node.value.record).collect::<Vec<NodeRecord>>();
1516 trace!(target: "discv4", len = nodes.len(), to=?to,"Sent neighbours packet");
1517 let msg = Message::Neighbours(Neighbours { nodes, expire });
1518 self.send_packet(msg, to);
1519 }
1520 }
1521
1522 fn evict_expired_requests(&mut self, now: Instant) {
1523 self.pending_enr_requests.retain(|_node_id, enr_request| {
1524 now.duration_since(enr_request.sent_at) < self.config.enr_expiration
1525 });
1526
1527 let mut failed_pings = Vec::new();
1528 self.pending_pings.retain(|node_id, ping_request| {
1529 if now.duration_since(ping_request.sent_at) > self.config.ping_expiration {
1530 failed_pings.push(*node_id);
1531 return false
1532 }
1533 true
1534 });
1535
1536 if !failed_pings.is_empty() {
1537 trace!(target: "discv4", num=%failed_pings.len(), "evicting nodes due to failed pong");
1539 for node_id in failed_pings {
1540 self.remove_node(node_id);
1541 }
1542 }
1543
1544 let mut failed_lookups = Vec::new();
1545 self.pending_lookup.retain(|node_id, (lookup_sent_at, _)| {
1546 if now.duration_since(*lookup_sent_at) > self.config.request_timeout {
1547 failed_lookups.push(*node_id);
1548 return false
1549 }
1550 true
1551 });
1552
1553 if !failed_lookups.is_empty() {
1554 trace!(target: "discv4", num=%failed_lookups.len(), "evicting nodes due to failed lookup");
1556 for node_id in failed_lookups {
1557 self.remove_node(node_id);
1558 }
1559 }
1560
1561 self.evict_failed_find_nodes(now);
1562 }
1563
1564 fn evict_failed_find_nodes(&mut self, now: Instant) {
1566 let mut failed_find_nodes = Vec::new();
1567 self.pending_find_nodes.retain(|node_id, find_node_request| {
1568 if now.duration_since(find_node_request.sent_at) > self.config.neighbours_expiration {
1569 if !find_node_request.answered {
1570 failed_find_nodes.push(*node_id);
1573 }
1574 return false
1575 }
1576 true
1577 });
1578
1579 if failed_find_nodes.is_empty() {
1580 return
1581 }
1582
1583 trace!(target: "discv4", num=%failed_find_nodes.len(), "processing failed find nodes");
1584
1585 for node_id in failed_find_nodes {
1586 let key = kad_key(node_id);
1587 let failures = match self.kbuckets.entry(&key) {
1588 kbucket::Entry::Present(mut entry, _) => {
1589 entry.value_mut().inc_failed_request();
1590 entry.value().find_node_failures
1591 }
1592 kbucket::Entry::Pending(mut entry, _) => {
1593 entry.value().inc_failed_request();
1594 entry.value().find_node_failures
1595 }
1596 _ => continue,
1597 };
1598
1599 if failures > self.config.max_find_node_failures {
1603 self.soft_remove_node(node_id);
1604 }
1605 }
1606 }
1607
1608 fn re_ping_oldest(&mut self) {
1613 let mut nodes = self
1614 .kbuckets
1615 .iter_ref()
1616 .filter(|entry| entry.node.value.is_expired())
1617 .map(|n| n.node.value)
1618 .collect::<Vec<_>>();
1619 nodes.sort_by(|a, b| a.last_seen.cmp(&b.last_seen));
1620 let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::<Vec<_>>();
1621 for node in to_ping {
1622 self.try_ping(node, PingReason::RePing)
1623 }
1624 }
1625
1626 fn is_expired(&self, expiration: u64) -> bool {
1628 self.ensure_not_expired(expiration).is_err()
1629 }
1630
1631 fn ensure_not_expired(&self, timestamp: u64) -> Result<(), ()> {
1641 let _ = i64::try_from(timestamp).map_err(drop)?;
1643
1644 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
1645 if self.config.enforce_expiration_timestamps && timestamp < now {
1646 trace!(target: "discv4", "Expired packet");
1647 return Err(())
1648 }
1649 Ok(())
1650 }
1651
1652 fn ping_buffered(&mut self) {
1654 while self.pending_pings.len() < MAX_NODES_PING {
1655 match self.queued_pings.pop_front() {
1656 Some((next, reason)) => self.try_ping(next, reason),
1657 None => break,
1658 }
1659 }
1660 }
1661
1662 fn ping_expiration(&self) -> u64 {
1663 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.ping_expiration)
1664 .as_secs()
1665 }
1666
1667 fn find_node_expiration(&self) -> u64 {
1668 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.request_timeout)
1669 .as_secs()
1670 }
1671
1672 fn enr_request_expiration(&self) -> u64 {
1673 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.enr_expiration)
1674 .as_secs()
1675 }
1676
1677 fn send_neighbours_expiration(&self) -> u64 {
1678 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.neighbours_expiration)
1679 .as_secs()
1680 }
1681
1682 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Discv4Event> {
1688 loop {
1689 if let Some(event) = self.queued_events.pop_front() {
1691 return Poll::Ready(event)
1692 }
1693
1694 if self.config.enable_lookup {
1696 while self.lookup_interval.poll_tick(cx).is_ready() {
1697 let target = self.lookup_rotator.next(&self.local_node_record.id);
1698 self.lookup_with(target, None);
1699 }
1700 }
1701
1702 while self.ping_interval.poll_tick(cx).is_ready() {
1704 self.re_ping_oldest();
1705 }
1706
1707 if let Some(Poll::Ready(Some(ip))) =
1708 self.resolve_external_ip_interval.as_mut().map(|r| r.poll_tick(cx))
1709 {
1710 self.set_external_ip_addr(ip);
1711 }
1712
1713 while let Poll::Ready(Some(cmd)) = self.commands_rx.poll_recv(cx) {
1715 match cmd {
1716 Discv4Command::Add(enr) => {
1717 self.add_node(enr);
1718 }
1719 Discv4Command::Lookup { node_id, tx } => {
1720 let node_id = node_id.unwrap_or(self.local_node_record.id);
1721 self.lookup_with(node_id, tx);
1722 }
1723 Discv4Command::SetLookupInterval(duration) => {
1724 self.set_lookup_interval(duration);
1725 }
1726 Discv4Command::Updates(tx) => {
1727 let rx = self.update_stream();
1728 let _ = tx.send(rx);
1729 }
1730 Discv4Command::BanPeer(node_id) => self.ban_node(node_id),
1731 Discv4Command::Remove(node_id) => {
1732 self.remove_node(node_id);
1733 }
1734 Discv4Command::Ban(node_id, ip) => {
1735 self.ban_node(node_id);
1736 self.ban_ip(ip);
1737 }
1738 Discv4Command::BanIp(ip) => {
1739 self.ban_ip(ip);
1740 }
1741 Discv4Command::SetEIP868RLPPair { key, rlp } => {
1742 debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair");
1743
1744 let _ = self.local_eip_868_enr.insert_raw_rlp(key, rlp, &self.secret_key);
1745 }
1746 Discv4Command::SetTcpPort(port) => {
1747 debug!(target: "discv4", %port, "Update tcp port");
1748 self.local_node_record.tcp_port = port;
1749 if self.local_node_record.address.is_ipv4() {
1750 let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key);
1751 } else {
1752 let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key);
1753 }
1754 }
1755
1756 Discv4Command::Terminated => {
1757 self.queued_events.push_back(Discv4Event::Terminated);
1759 }
1760 }
1761 }
1762
1763 let mut udp_message_budget = UDP_MESSAGE_POLL_LOOP_BUDGET;
1765
1766 while let Poll::Ready(Some(event)) = self.ingress.poll_recv(cx) {
1768 match event {
1769 IngressEvent::RecvError(err) => {
1770 debug!(target: "discv4", %err, "failed to read datagram");
1771 }
1772 IngressEvent::BadPacket(from, err, data) => {
1773 trace!(target: "discv4", ?from, %err, packet=?hex::encode(&data), "bad packet");
1774 }
1775 IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => {
1776 trace!(target: "discv4", r#type=?msg.msg_type(), from=?remote_addr,"received packet");
1777 let event = match msg {
1778 Message::Ping(ping) => {
1779 self.on_ping(ping, remote_addr, node_id, hash);
1780 Discv4Event::Ping
1781 }
1782 Message::Pong(pong) => {
1783 self.on_pong(pong, remote_addr, node_id);
1784 Discv4Event::Pong
1785 }
1786 Message::FindNode(msg) => {
1787 self.on_find_node(msg, remote_addr, node_id);
1788 Discv4Event::FindNode
1789 }
1790 Message::Neighbours(msg) => {
1791 self.on_neighbours(msg, remote_addr, node_id);
1792 Discv4Event::Neighbours
1793 }
1794 Message::EnrRequest(msg) => {
1795 self.on_enr_request(msg, remote_addr, node_id, hash);
1796 Discv4Event::EnrRequest
1797 }
1798 Message::EnrResponse(msg) => {
1799 self.on_enr_response(msg, remote_addr, node_id);
1800 Discv4Event::EnrResponse
1801 }
1802 };
1803
1804 self.queued_events.push_back(event);
1805 }
1806 }
1807
1808 udp_message_budget -= 1;
1809 if udp_message_budget < 0 {
1810 trace!(target: "discv4", budget=UDP_MESSAGE_POLL_LOOP_BUDGET, "exhausted message poll budget");
1811 if self.queued_events.is_empty() {
1812 cx.waker().wake_by_ref();
1815 }
1816 break
1817 }
1818 }
1819
1820 self.ping_buffered();
1822
1823 while self.evict_expired_requests_interval.poll_tick(cx).is_ready() {
1825 self.evict_expired_requests(Instant::now());
1826 }
1827
1828 while self.expire_interval.poll_tick(cx).is_ready() {
1830 self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION);
1831 }
1832
1833 if self.queued_events.is_empty() {
1834 return Poll::Pending
1835 }
1836 }
1837 }
1838}
1839
1840impl Stream for Discv4Service {
1842 type Item = Discv4Event;
1843
1844 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1845 match ready!(self.get_mut().poll(cx)) {
1847 Discv4Event::Terminated => Poll::Ready(None),
1849 ev => Poll::Ready(Some(ev)),
1851 }
1852 }
1853}
1854
1855impl fmt::Debug for Discv4Service {
1856 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1857 f.debug_struct("Discv4Service")
1858 .field("local_address", &self.local_address)
1859 .field("local_peer_id", &self.local_peer_id())
1860 .field("local_node_record", &self.local_node_record)
1861 .field("queued_pings", &self.queued_pings)
1862 .field("pending_lookup", &self.pending_lookup)
1863 .field("pending_find_nodes", &self.pending_find_nodes)
1864 .field("lookup_interval", &self.lookup_interval)
1865 .finish_non_exhaustive()
1866 }
1867}
1868
1869#[derive(Debug, Eq, PartialEq)]
1873pub enum Discv4Event {
1874 Ping,
1876 Pong,
1878 FindNode,
1880 Neighbours,
1882 EnrRequest,
1884 EnrResponse,
1886 Terminated,
1888}
1889
1890pub(crate) async fn send_loop(udp: Arc<UdpSocket>, rx: EgressReceiver) {
1892 let mut stream = ReceiverStream::new(rx);
1893 while let Some((payload, to)) = stream.next().await {
1894 match udp.send_to(&payload, to).await {
1895 Ok(size) => {
1896 trace!(target: "discv4", ?to, ?size,"sent payload");
1897 }
1898 Err(err) => {
1899 debug!(target: "discv4", ?to, %err,"Failed to send datagram.");
1900 }
1901 }
1902 }
1903}
1904
1905const MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP: usize = 60usize;
1907
1908pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_id: PeerId) {
1913 let send = |event: IngressEvent| async {
1914 let _ = tx.send(event).await.map_err(|err| {
1915 debug!(
1916 target: "discv4",
1917 %err,
1918 "failed send incoming packet",
1919 )
1920 });
1921 };
1922
1923 let mut cache = ReceiveCache::default();
1924
1925 let tick = MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP / 2;
1927 let mut interval = tokio::time::interval(Duration::from_secs(tick as u64));
1928
1929 let mut buf = [0; MAX_PACKET_SIZE];
1930 loop {
1931 let res = udp.recv_from(&mut buf).await;
1932 match res {
1933 Err(err) => {
1934 debug!(target: "discv4", %err, "Failed to read datagram.");
1935 send(IngressEvent::RecvError(err)).await;
1936 }
1937 Ok((read, remote_addr)) => {
1938 if cache.inc_ip(remote_addr.ip()) > MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP {
1940 trace!(target: "discv4", ?remote_addr, "Too many incoming packets from IP.");
1941 continue
1942 }
1943
1944 let packet = &buf[..read];
1945 match Message::decode(packet) {
1946 Ok(packet) => {
1947 if packet.node_id == local_id {
1948 debug!(target: "discv4", ?remote_addr, "Received own packet.");
1950 continue
1951 }
1952
1953 if cache.contains_packet(packet.hash) {
1955 debug!(target: "discv4", ?remote_addr, "Received duplicate packet.");
1956 continue
1957 }
1958
1959 send(IngressEvent::Packet(remote_addr, packet)).await;
1960 }
1961 Err(err) => {
1962 trace!(target: "discv4", %err,"Failed to decode packet");
1963 send(IngressEvent::BadPacket(remote_addr, err, packet.to_vec())).await
1964 }
1965 }
1966 }
1967 }
1968
1969 if poll_fn(|cx| match interval.poll_tick(cx) {
1971 Poll::Ready(_) => Poll::Ready(true),
1972 Poll::Pending => Poll::Ready(false),
1973 })
1974 .await
1975 {
1976 cache.tick_ips(tick);
1977 }
1978 }
1979}
1980
1981struct ReceiveCache {
1985 ip_messages: HashMap<IpAddr, usize>,
1991 unique_packets: schnellru::LruMap<B256, ()>,
1993}
1994
1995impl ReceiveCache {
1996 fn tick_ips(&mut self, tick: usize) {
2000 self.ip_messages.retain(|_, count| {
2001 if let Some(reset) = count.checked_sub(tick) {
2002 *count = reset;
2003 true
2004 } else {
2005 false
2006 }
2007 });
2008 }
2009
2010 fn inc_ip(&mut self, ip: IpAddr) -> usize {
2012 let ctn = self.ip_messages.entry(ip).or_default();
2013 *ctn = ctn.saturating_add(1);
2014 *ctn
2015 }
2016
2017 fn contains_packet(&mut self, hash: B256) -> bool {
2019 !self.unique_packets.insert(hash, ())
2020 }
2021}
2022
2023impl Default for ReceiveCache {
2024 fn default() -> Self {
2025 Self {
2026 ip_messages: Default::default(),
2027 unique_packets: schnellru::LruMap::new(schnellru::ByLength::new(32)),
2028 }
2029 }
2030}
2031
2032enum Discv4Command {
2034 Add(NodeRecord),
2035 SetTcpPort(u16),
2036 SetEIP868RLPPair { key: Vec<u8>, rlp: Bytes },
2037 Ban(PeerId, IpAddr),
2038 BanPeer(PeerId),
2039 BanIp(IpAddr),
2040 Remove(PeerId),
2041 Lookup { node_id: Option<PeerId>, tx: Option<NodeRecordSender> },
2042 SetLookupInterval(Duration),
2043 Updates(OneshotSender<ReceiverStream<DiscoveryUpdate>>),
2044 Terminated,
2045}
2046
2047#[derive(Debug)]
2049pub(crate) enum IngressEvent {
2050 RecvError(io::Error),
2052 BadPacket(SocketAddr, DecodePacketError, Vec<u8>),
2054 Packet(SocketAddr, Packet),
2056}
2057
2058#[derive(Debug)]
2060struct PingRequest {
2061 sent_at: Instant,
2063 node: NodeRecord,
2065 echo_hash: B256,
2067 reason: PingReason,
2069}
2070
2071#[derive(Debug)]
2075struct LookupTargetRotator {
2076 interval: usize,
2077 counter: usize,
2078}
2079
2080impl LookupTargetRotator {
2083 const fn local_only() -> Self {
2085 Self { interval: 1, counter: 0 }
2086 }
2087}
2088
2089impl Default for LookupTargetRotator {
2090 fn default() -> Self {
2091 Self {
2092 interval: 4,
2094 counter: 3,
2095 }
2096 }
2097}
2098
2099impl LookupTargetRotator {
2100 fn next(&mut self, local: &PeerId) -> PeerId {
2102 self.counter += 1;
2103 self.counter %= self.interval;
2104 if self.counter == 0 {
2105 return *local
2106 }
2107 PeerId::random()
2108 }
2109}
2110
2111#[derive(Clone, Debug)]
2116struct LookupContext {
2117 inner: Rc<LookupContextInner>,
2118}
2119
2120impl LookupContext {
2121 fn new(
2123 target: discv5::Key<NodeKey>,
2124 nearest_nodes: impl IntoIterator<Item = (Distance, NodeRecord)>,
2125 listener: Option<NodeRecordSender>,
2126 ) -> Self {
2127 let closest_nodes = nearest_nodes
2128 .into_iter()
2129 .map(|(distance, record)| {
2130 (distance, QueryNode { record, queried: false, responded: false })
2131 })
2132 .collect();
2133
2134 let inner = Rc::new(LookupContextInner {
2135 target,
2136 closest_nodes: RefCell::new(closest_nodes),
2137 listener,
2138 });
2139 Self { inner }
2140 }
2141
2142 fn target(&self) -> PeerId {
2144 self.inner.target.preimage().0
2145 }
2146
2147 fn closest(&self, num: usize) -> Vec<NodeRecord> {
2148 self.inner
2149 .closest_nodes
2150 .borrow()
2151 .iter()
2152 .filter(|(_, node)| !node.queried)
2153 .map(|(_, n)| n.record)
2154 .take(num)
2155 .collect()
2156 }
2157
2158 fn filter_closest<P>(&self, num: usize, filter: P) -> Vec<NodeRecord>
2160 where
2161 P: FnMut(&NodeRecord) -> bool,
2162 {
2163 self.inner
2164 .closest_nodes
2165 .borrow()
2166 .iter()
2167 .filter(|(_, node)| !node.queried)
2168 .map(|(_, n)| n.record)
2169 .filter(filter)
2170 .take(num)
2171 .collect()
2172 }
2173
2174 fn add_node(&self, record: NodeRecord) {
2176 let distance = self.inner.target.distance(&kad_key(record.id));
2177 let mut closest = self.inner.closest_nodes.borrow_mut();
2178 if let btree_map::Entry::Vacant(entry) = closest.entry(distance) {
2179 entry.insert(QueryNode { record, queried: false, responded: false });
2180 }
2181 }
2182
2183 fn set_queried(&self, id: PeerId, val: bool) {
2184 if let Some((_, node)) =
2185 self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2186 {
2187 node.queried = val;
2188 }
2189 }
2190
2191 fn mark_queried(&self, id: PeerId) {
2193 self.set_queried(id, true)
2194 }
2195
2196 fn unmark_queried(&self, id: PeerId) {
2198 self.set_queried(id, false)
2199 }
2200
2201 fn mark_responded(&self, id: PeerId) {
2203 if let Some((_, node)) =
2204 self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2205 {
2206 node.responded = true;
2207 }
2208 }
2209}
2210
2211unsafe impl Send for LookupContext {}
2218#[derive(Debug)]
2219struct LookupContextInner {
2220 target: discv5::Key<NodeKey>,
2222 closest_nodes: RefCell<BTreeMap<Distance, QueryNode>>,
2224 listener: Option<NodeRecordSender>,
2229}
2230
2231impl Drop for LookupContextInner {
2232 fn drop(&mut self) {
2233 if let Some(tx) = self.listener.take() {
2234 let nodes = self
2237 .closest_nodes
2238 .take()
2239 .into_values()
2240 .filter(|node| node.responded)
2241 .map(|node| node.record)
2242 .collect();
2243 let _ = tx.send(nodes);
2244 }
2245 }
2246}
2247
2248#[derive(Debug, Clone, Copy)]
2250struct QueryNode {
2251 record: NodeRecord,
2252 queried: bool,
2253 responded: bool,
2254}
2255
2256#[derive(Debug)]
2257struct FindNodeRequest {
2258 sent_at: Instant,
2260 response_count: usize,
2262 answered: bool,
2264 lookup_context: LookupContext,
2266}
2267
2268impl FindNodeRequest {
2271 fn new(resp: LookupContext) -> Self {
2272 Self { sent_at: Instant::now(), response_count: 0, answered: false, lookup_context: resp }
2273 }
2274}
2275
2276#[derive(Debug)]
2277struct EnrRequestState {
2278 sent_at: Instant,
2280 echo_hash: B256,
2282}
2283
2284#[derive(Debug, Clone, Eq, PartialEq)]
2286struct NodeEntry {
2287 record: NodeRecord,
2289 last_seen: Instant,
2291 last_enr_seq: Option<u64>,
2293 fork_id: Option<ForkId>,
2295 find_node_failures: u8,
2297 has_endpoint_proof: bool,
2299}
2300
2301impl NodeEntry {
2304 fn new(record: NodeRecord) -> Self {
2306 Self {
2307 record,
2308 last_seen: Instant::now(),
2309 last_enr_seq: None,
2310 fork_id: None,
2311 find_node_failures: 0,
2312 has_endpoint_proof: false,
2313 }
2314 }
2315
2316 #[cfg(test)]
2317 fn new_proven(record: NodeRecord) -> Self {
2318 let mut node = Self::new(record);
2319 node.has_endpoint_proof = true;
2320 node
2321 }
2322
2323 fn establish_proof(&mut self) {
2325 self.has_endpoint_proof = true;
2326 self.find_node_failures = 0;
2327 }
2328
2329 const fn exceeds_find_node_failures(&self, max_failures: u8) -> bool {
2331 self.find_node_failures >= max_failures
2332 }
2333
2334 fn update_with_enr(&mut self, last_enr_seq: Option<u64>) -> Option<u64> {
2336 self.update_now(|s| std::mem::replace(&mut s.last_enr_seq, last_enr_seq))
2337 }
2338
2339 fn inc_failed_request(&mut self) {
2341 self.find_node_failures += 1;
2342 }
2343
2344 fn update_with_fork_id(&mut self, fork_id: Option<ForkId>) -> Option<ForkId> {
2346 self.update_now(|s| std::mem::replace(&mut s.fork_id, fork_id))
2347 }
2348
2349 fn update_now<F, R>(&mut self, f: F) -> R
2351 where
2352 F: FnOnce(&mut Self) -> R,
2353 {
2354 self.last_seen = Instant::now();
2355 f(self)
2356 }
2357}
2358
2359impl NodeEntry {
2362 fn is_expired(&self) -> bool {
2364 self.last_seen.elapsed() > (ENDPOINT_PROOF_EXPIRATION / 2)
2365 }
2366}
2367
2368#[derive(Debug)]
2370enum PingReason {
2371 InitialInsert,
2373 EstablishBond,
2375 RePing,
2377 Lookup(NodeRecord, LookupContext),
2379}
2380
2381#[derive(Debug, Clone)]
2383pub enum DiscoveryUpdate {
2384 Added(NodeRecord),
2386 DiscoveredAtCapacity(NodeRecord),
2388 EnrForkId(NodeRecord, ForkId),
2390 Removed(PeerId),
2392 Batch(Vec<DiscoveryUpdate>),
2394}
2395
2396#[cfg(test)]
2397mod tests {
2398 use super::*;
2399 use crate::test_utils::{create_discv4, create_discv4_with_config, rng_endpoint, rng_record};
2400 use alloy_primitives::hex;
2401 use alloy_rlp::{Decodable, Encodable};
2402 use rand::{thread_rng, Rng};
2403 use reth_ethereum_forks::{EnrForkIdEntry, ForkHash};
2404 use reth_network_peers::mainnet_nodes;
2405 use std::future::poll_fn;
2406
2407 #[tokio::test]
2408 async fn test_configured_enr_forkid_entry() {
2409 let fork: ForkId = ForkId { hash: ForkHash([220, 233, 108, 45]), next: 0u64 };
2410 let mut disc_conf = Discv4Config::default();
2411 disc_conf.add_eip868_pair("eth", EnrForkIdEntry::from(fork));
2412 let (_discv4, service) = create_discv4_with_config(disc_conf).await;
2413 let eth = service.local_eip_868_enr.get_raw_rlp(b"eth").unwrap();
2414 let fork_entry_id = EnrForkIdEntry::decode(&mut ð[..]).unwrap();
2415
2416 let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2417 let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2418 let expected = EnrForkIdEntry {
2419 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2420 };
2421 assert_eq!(expected, fork_entry_id);
2422 assert_eq!(expected, decoded);
2423 }
2424
2425 #[test]
2426 fn test_enr_forkid_entry_decode() {
2427 let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2428 let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2429 let expected = EnrForkIdEntry {
2430 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2431 };
2432 assert_eq!(expected, decoded);
2433 }
2434
2435 #[test]
2436 fn test_enr_forkid_entry_encode() {
2437 let original = EnrForkIdEntry {
2438 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2439 };
2440 let expected: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2441 let mut encoded = Vec::with_capacity(expected.len());
2442 original.encode(&mut encoded);
2443 assert_eq!(&expected[..], encoded.as_slice());
2444 }
2445
2446 #[test]
2447 fn test_local_rotator() {
2448 let id = PeerId::random();
2449 let mut rotator = LookupTargetRotator::local_only();
2450 assert_eq!(rotator.next(&id), id);
2451 assert_eq!(rotator.next(&id), id);
2452 }
2453
2454 #[test]
2455 fn test_rotator() {
2456 let id = PeerId::random();
2457 let mut rotator = LookupTargetRotator::default();
2458 assert_eq!(rotator.next(&id), id);
2459 assert_ne!(rotator.next(&id), id);
2460 assert_ne!(rotator.next(&id), id);
2461 assert_ne!(rotator.next(&id), id);
2462 assert_eq!(rotator.next(&id), id);
2463 }
2464
2465 #[tokio::test]
2466 async fn test_pending_ping() {
2467 let (_, mut service) = create_discv4().await;
2468
2469 let local_addr = service.local_addr();
2470
2471 let mut num_inserted = 0;
2472 loop {
2473 let node = NodeRecord::new(local_addr, PeerId::random());
2474 if service.add_node(node) {
2475 num_inserted += 1;
2476 assert!(service.pending_pings.contains_key(&node.id));
2477 assert_eq!(service.pending_pings.len(), num_inserted);
2478 if num_inserted == MAX_NODES_PING {
2479 break
2480 }
2481 }
2482 }
2483
2484 num_inserted = 0;
2486 for _ in 0..MAX_NODES_PING {
2487 let node = NodeRecord::new(local_addr, PeerId::random());
2488 if service.add_node(node) {
2489 num_inserted += 1;
2490 assert!(!service.pending_pings.contains_key(&node.id));
2491 assert_eq!(service.pending_pings.len(), MAX_NODES_PING);
2492 assert_eq!(service.queued_pings.len(), num_inserted);
2493 }
2494 }
2495 }
2496
2497 #[tokio::test(flavor = "multi_thread")]
2499 #[ignore]
2500 async fn test_mainnet_lookup() {
2501 reth_tracing::init_test_tracing();
2502 let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2503
2504 let all_nodes = mainnet_nodes();
2505 let config = Discv4Config::builder()
2506 .add_boot_nodes(all_nodes)
2507 .lookup_interval(Duration::from_secs(1))
2508 .add_eip868_pair("eth", fork_id)
2509 .build();
2510 let (_discv4, mut service) = create_discv4_with_config(config).await;
2511
2512 let mut updates = service.update_stream();
2513
2514 let _handle = service.spawn();
2515
2516 let mut table = HashMap::new();
2517 while let Some(update) = updates.next().await {
2518 match update {
2519 DiscoveryUpdate::EnrForkId(record, fork_id) => {
2520 println!("{record:?}, {fork_id:?}");
2521 }
2522 DiscoveryUpdate::Added(record) => {
2523 table.insert(record.id, record);
2524 }
2525 DiscoveryUpdate::Removed(id) => {
2526 table.remove(&id);
2527 }
2528 _ => {}
2529 }
2530 println!("total peers {}", table.len());
2531 }
2532 }
2533
2534 #[tokio::test]
2535 async fn test_mapped_ipv4() {
2536 reth_tracing::init_test_tracing();
2537 let mut rng = thread_rng();
2538 let config = Discv4Config::builder().build();
2539 let (_discv4, mut service) = create_discv4_with_config(config).await;
2540
2541 let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2542 let v6 = v4.to_ipv6_mapped();
2543 let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2544
2545 let ping = Ping {
2546 from: rng_endpoint(&mut rng),
2547 to: rng_endpoint(&mut rng),
2548 expire: service.ping_expiration(),
2549 enr_sq: Some(rng.gen()),
2550 };
2551
2552 let id = PeerId::random_with(&mut rng);
2553 service.on_ping(ping, addr, id, rng.gen());
2554
2555 let key = kad_key(id);
2556 match service.kbuckets.entry(&key) {
2557 kbucket::Entry::Present(entry, _) => {
2558 let node_addr = entry.value().record.address;
2559 assert!(node_addr.is_ipv4());
2560 assert_eq!(node_addr, IpAddr::from(v4));
2561 }
2562 _ => unreachable!(),
2563 };
2564 }
2565
2566 #[tokio::test]
2567 async fn test_respect_ping_expiration() {
2568 reth_tracing::init_test_tracing();
2569 let mut rng = thread_rng();
2570 let config = Discv4Config::builder().build();
2571 let (_discv4, mut service) = create_discv4_with_config(config).await;
2572
2573 let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2574 let v6 = v4.to_ipv6_mapped();
2575 let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2576
2577 let ping = Ping {
2578 from: rng_endpoint(&mut rng),
2579 to: rng_endpoint(&mut rng),
2580 expire: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() - 1,
2581 enr_sq: Some(rng.gen()),
2582 };
2583
2584 let id = PeerId::random_with(&mut rng);
2585 service.on_ping(ping, addr, id, rng.gen());
2586
2587 let key = kad_key(id);
2588 match service.kbuckets.entry(&key) {
2589 kbucket::Entry::Absent(_) => {}
2590 _ => unreachable!(),
2591 };
2592 }
2593
2594 #[tokio::test]
2595 async fn test_single_lookups() {
2596 reth_tracing::init_test_tracing();
2597
2598 let config = Discv4Config::builder().build();
2599 let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2600
2601 let id = PeerId::random();
2602 let key = kad_key(id);
2603 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2604
2605 let _ = service.kbuckets.insert_or_update(
2606 &key,
2607 NodeEntry::new_proven(record),
2608 NodeStatus {
2609 direction: ConnectionDirection::Incoming,
2610 state: ConnectionState::Connected,
2611 },
2612 );
2613
2614 service.lookup_self();
2615 assert_eq!(service.pending_find_nodes.len(), 1);
2616
2617 poll_fn(|cx| {
2618 let _ = service.poll(cx);
2619 assert_eq!(service.pending_find_nodes.len(), 1);
2620
2621 Poll::Ready(())
2622 })
2623 .await;
2624 }
2625
2626 #[tokio::test]
2627 async fn test_on_neighbours_recursive_lookup() {
2628 reth_tracing::init_test_tracing();
2629
2630 let config = Discv4Config::builder().build();
2631 let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2632 let (_discv4, mut service2) = create_discv4_with_config(config).await;
2633
2634 let id = PeerId::random();
2635 let key = kad_key(id);
2636 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2637
2638 let _ = service.kbuckets.insert_or_update(
2639 &key,
2640 NodeEntry::new_proven(record),
2641 NodeStatus {
2642 direction: ConnectionDirection::Incoming,
2643 state: ConnectionState::Connected,
2644 },
2645 );
2646 service.lookup_self();
2649 assert_eq!(service.pending_find_nodes.len(), 1);
2650
2651 poll_fn(|cx| {
2652 let _ = service.poll(cx);
2653 assert_eq!(service.pending_find_nodes.len(), 1);
2654
2655 Poll::Ready(())
2656 })
2657 .await;
2658
2659 let expiry = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() +
2660 10000000000000;
2661 let msg = Neighbours { nodes: vec![service2.local_node_record], expire: expiry };
2662 service.on_neighbours(msg, record.tcp_addr(), id);
2663 let event = poll_fn(|cx| service2.poll(cx)).await;
2665 assert_eq!(event, Discv4Event::Ping);
2666 assert_eq!(service.pending_find_nodes.len(), 1);
2669 let event = poll_fn(|cx| service.poll(cx)).await;
2671 assert_eq!(event, Discv4Event::Pong);
2672 let event = poll_fn(|cx| service.poll(cx)).await;
2677 assert_eq!(event, Discv4Event::Ping);
2678 assert_eq!(service.pending_find_nodes.len(), 2);
2681 }
2682
2683 #[tokio::test]
2684 async fn test_no_local_in_closest() {
2685 reth_tracing::init_test_tracing();
2686
2687 let config = Discv4Config::builder().build();
2688 let (_discv4, mut service) = create_discv4_with_config(config).await;
2689
2690 let target_key = kad_key(PeerId::random());
2691
2692 let id = PeerId::random();
2693 let key = kad_key(id);
2694 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2695
2696 let _ = service.kbuckets.insert_or_update(
2697 &key,
2698 NodeEntry::new(record),
2699 NodeStatus {
2700 direction: ConnectionDirection::Incoming,
2701 state: ConnectionState::Connected,
2702 },
2703 );
2704
2705 let closest = service
2706 .kbuckets
2707 .closest_values(&target_key)
2708 .map(|n| n.value.record)
2709 .take(MAX_NODES_PER_BUCKET)
2710 .collect::<Vec<_>>();
2711
2712 assert_eq!(closest.len(), 1);
2713 assert!(!closest.iter().any(|r| r.id == *service.local_peer_id()));
2714 }
2715
2716 #[tokio::test]
2717 async fn test_random_lookup() {
2718 reth_tracing::init_test_tracing();
2719
2720 let config = Discv4Config::builder().build();
2721 let (_discv4, mut service) = create_discv4_with_config(config).await;
2722
2723 let target = PeerId::random();
2724
2725 let id = PeerId::random();
2726 let key = kad_key(id);
2727 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2728
2729 let _ = service.kbuckets.insert_or_update(
2730 &key,
2731 NodeEntry::new_proven(record),
2732 NodeStatus {
2733 direction: ConnectionDirection::Incoming,
2734 state: ConnectionState::Connected,
2735 },
2736 );
2737
2738 service.lookup(target);
2739 assert_eq!(service.pending_find_nodes.len(), 1);
2740
2741 let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2742
2743 assert_eq!(ctx.target(), target);
2744 assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2745
2746 ctx.add_node(record);
2747 assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2748 }
2749
2750 #[tokio::test]
2751 async fn test_reping_on_find_node_failures() {
2752 reth_tracing::init_test_tracing();
2753
2754 let config = Discv4Config::builder().build();
2755 let (_discv4, mut service) = create_discv4_with_config(config).await;
2756
2757 let target = PeerId::random();
2758
2759 let id = PeerId::random();
2760 let key = kad_key(id);
2761 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2762
2763 let mut entry = NodeEntry::new_proven(record);
2764 entry.find_node_failures = u8::MAX;
2765 let _ = service.kbuckets.insert_or_update(
2766 &key,
2767 entry,
2768 NodeStatus {
2769 direction: ConnectionDirection::Incoming,
2770 state: ConnectionState::Connected,
2771 },
2772 );
2773
2774 service.lookup(target);
2775 assert_eq!(service.pending_find_nodes.len(), 0);
2776 assert_eq!(service.pending_pings.len(), 1);
2777
2778 service.update_on_pong(record, None);
2779
2780 service
2781 .on_entry(record.id, |entry| {
2782 assert_eq!(entry.find_node_failures, 0);
2784 assert!(entry.has_endpoint_proof);
2785 })
2786 .unwrap();
2787 }
2788
2789 #[tokio::test]
2790 async fn test_service_commands() {
2791 reth_tracing::init_test_tracing();
2792
2793 let config = Discv4Config::builder().build();
2794 let (discv4, mut service) = create_discv4_with_config(config).await;
2795
2796 service.lookup_self();
2797
2798 let _handle = service.spawn();
2799 discv4.send_lookup_self();
2800 let _ = discv4.lookup_self().await;
2801 }
2802
2803 #[tokio::test]
2804 async fn test_requests_timeout() {
2805 reth_tracing::init_test_tracing();
2806 let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2807
2808 let config = Discv4Config::builder()
2809 .request_timeout(Duration::from_millis(200))
2810 .ping_expiration(Duration::from_millis(200))
2811 .lookup_neighbours_expiration(Duration::from_millis(200))
2812 .add_eip868_pair("eth", fork_id)
2813 .build();
2814 let (_disv4, mut service) = create_discv4_with_config(config).await;
2815
2816 let id = PeerId::random();
2817 let key = kad_key(id);
2818 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2819
2820 let _ = service.kbuckets.insert_or_update(
2821 &key,
2822 NodeEntry::new_proven(record),
2823 NodeStatus {
2824 direction: ConnectionDirection::Incoming,
2825 state: ConnectionState::Connected,
2826 },
2827 );
2828
2829 service.lookup_self();
2830 assert_eq!(service.pending_find_nodes.len(), 1);
2831
2832 let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2833
2834 service.pending_lookup.insert(record.id, (Instant::now(), ctx));
2835
2836 assert_eq!(service.pending_lookup.len(), 1);
2837
2838 let ping = Ping {
2839 from: service.local_node_record.into(),
2840 to: record.into(),
2841 expire: service.ping_expiration(),
2842 enr_sq: service.enr_seq(),
2843 };
2844 let echo_hash = service.send_packet(Message::Ping(ping), record.udp_addr());
2845 let ping_request = PingRequest {
2846 sent_at: Instant::now(),
2847 node: record,
2848 echo_hash,
2849 reason: PingReason::InitialInsert,
2850 };
2851 service.pending_pings.insert(record.id, ping_request);
2852
2853 assert_eq!(service.pending_pings.len(), 1);
2854
2855 tokio::time::sleep(Duration::from_secs(1)).await;
2856
2857 poll_fn(|cx| {
2858 let _ = service.poll(cx);
2859
2860 assert_eq!(service.pending_find_nodes.len(), 0);
2861 assert_eq!(service.pending_lookup.len(), 0);
2862 assert_eq!(service.pending_pings.len(), 0);
2863
2864 Poll::Ready(())
2865 })
2866 .await;
2867 }
2868
2869 #[tokio::test(flavor = "multi_thread")]
2871 async fn test_check_wrong_to() {
2872 reth_tracing::init_test_tracing();
2873
2874 let config = Discv4Config::builder().external_ip_resolver(None).build();
2875 let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2876 let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2877
2878 let mut ping = Ping {
2880 from: service_1.local_node_record.into(),
2881 to: service_2.local_node_record.into(),
2882 expire: service_1.ping_expiration(),
2883 enr_sq: service_1.enr_seq(),
2884 };
2885 ping.to.address = "192.0.2.0".parse().unwrap();
2886
2887 let echo_hash = service_1.send_packet(Message::Ping(ping), service_2.local_addr());
2888 let ping_request = PingRequest {
2889 sent_at: Instant::now(),
2890 node: service_2.local_node_record,
2891 echo_hash,
2892 reason: PingReason::InitialInsert,
2893 };
2894 service_1.pending_pings.insert(*service_2.local_peer_id(), ping_request);
2895
2896 let event = poll_fn(|cx| service_2.poll(cx)).await;
2898 assert_eq!(event, Discv4Event::Ping);
2899
2900 let event = poll_fn(|cx| service_1.poll(cx)).await;
2902 assert_eq!(event, Discv4Event::Pong);
2903 let event = poll_fn(|cx| service_1.poll(cx)).await;
2905 assert_eq!(event, Discv4Event::Ping);
2906 }
2907
2908 #[tokio::test(flavor = "multi_thread")]
2909 async fn test_check_ping_pong() {
2910 reth_tracing::init_test_tracing();
2911
2912 let config = Discv4Config::builder().external_ip_resolver(None).build();
2913 let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2914 let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2915
2916 service_1.add_node(service_2.local_node_record);
2918
2919 let event = poll_fn(|cx| service_2.poll(cx)).await;
2921 assert_eq!(event, Discv4Event::Ping);
2922
2923 let key1 = kad_key(*service_1.local_peer_id());
2925 match service_2.kbuckets.entry(&key1) {
2926 kbucket::Entry::Present(_entry, status) => {
2927 assert!(!status.is_connected());
2928 }
2929 _ => unreachable!(),
2930 }
2931
2932 let event = poll_fn(|cx| service_1.poll(cx)).await;
2934 assert_eq!(event, Discv4Event::Pong);
2935
2936 let key2 = kad_key(*service_2.local_peer_id());
2938 match service_1.kbuckets.entry(&key2) {
2939 kbucket::Entry::Present(_entry, status) => {
2940 assert!(status.is_connected());
2941 }
2942 _ => unreachable!(),
2943 }
2944
2945 let event = poll_fn(|cx| service_1.poll(cx)).await;
2947 assert_eq!(event, Discv4Event::Ping);
2948
2949 let event = poll_fn(|cx| service_2.poll(cx)).await;
2951
2952 match event {
2953 Discv4Event::EnrRequest => {
2954 let event = poll_fn(|cx| service_2.poll(cx)).await;
2956 match event {
2957 Discv4Event::EnrRequest => {
2958 let event = poll_fn(|cx| service_2.poll(cx)).await;
2959 assert_eq!(event, Discv4Event::Pong);
2960 }
2961 Discv4Event::Pong => {}
2962 _ => {
2963 unreachable!()
2964 }
2965 }
2966 }
2967 Discv4Event::Pong => {}
2968 ev => unreachable!("{ev:?}"),
2969 }
2970
2971 match service_2.kbuckets.entry(&key1) {
2973 kbucket::Entry::Present(_entry, status) => {
2974 assert!(status.is_connected());
2975 }
2976 ev => unreachable!("{ev:?}"),
2977 }
2978 }
2979
2980 #[test]
2981 fn test_insert() {
2982 let local_node_record = rng_record(&mut rand::thread_rng());
2983 let mut kbuckets: KBucketsTable<NodeKey, NodeEntry> = KBucketsTable::new(
2984 NodeKey::from(&local_node_record).into(),
2985 Duration::from_secs(60),
2986 MAX_NODES_PER_BUCKET,
2987 None,
2988 None,
2989 );
2990
2991 let new_record = rng_record(&mut rand::thread_rng());
2992 let key = kad_key(new_record.id);
2993 match kbuckets.entry(&key) {
2994 kbucket::Entry::Absent(entry) => {
2995 let node = NodeEntry::new(new_record);
2996 let _ = entry.insert(
2997 node,
2998 NodeStatus {
2999 direction: ConnectionDirection::Outgoing,
3000 state: ConnectionState::Disconnected,
3001 },
3002 );
3003 }
3004 _ => {
3005 unreachable!()
3006 }
3007 };
3008 match kbuckets.entry(&key) {
3009 kbucket::Entry::Present(_, _) => {}
3010 _ => {
3011 unreachable!()
3012 }
3013 }
3014 }
3015
3016 #[tokio::test]
3017 async fn test_bootnode_not_in_update_stream() {
3018 reth_tracing::init_test_tracing();
3019 let (_, service_1) = create_discv4().await;
3020 let peerid_1 = *service_1.local_peer_id();
3021
3022 let config = Discv4Config::builder().add_boot_node(service_1.local_node_record).build();
3023 service_1.spawn();
3024
3025 let (_, mut service_2) = create_discv4_with_config(config).await;
3026
3027 let mut updates = service_2.update_stream();
3028
3029 service_2.spawn();
3030
3031 let mut bootnode_appeared = false;
3033 let timeout = tokio::time::sleep(Duration::from_secs(1));
3034 tokio::pin!(timeout);
3035
3036 loop {
3037 tokio::select! {
3038 Some(update) = updates.next() => {
3039 if let DiscoveryUpdate::Added(record) = update {
3040 if record.id == peerid_1 {
3041 bootnode_appeared = true;
3042 break;
3043 }
3044 }
3045 }
3046 _ = &mut timeout => break,
3047 }
3048 }
3049
3050 assert!(bootnode_appeared, "Bootnode should appear in update stream");
3052 }
3053}