1#![doc(
20 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
21 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
22 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
23)]
24#![cfg_attr(not(test), warn(unused_crate_dependencies))]
25#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
26
27use crate::{
28 error::{DecodePacketError, Discv4Error},
29 proto::{FindNode, Message, Neighbours, Packet, Ping, Pong},
30};
31use alloy_primitives::{bytes::Bytes, hex, B256};
32use discv5::{
33 kbucket,
34 kbucket::{
35 BucketInsertResult, Distance, Entry as BucketEntry, InsertResult, KBucketsTable,
36 NodeStatus, MAX_NODES_PER_BUCKET,
37 },
38 ConnectionDirection, ConnectionState,
39};
40use enr::Enr;
41use itertools::Itertools;
42use parking_lot::Mutex;
43use proto::{EnrRequest, EnrResponse};
44use reth_ethereum_forks::ForkId;
45use reth_network_peers::{pk2id, PeerId};
46use secp256k1::SecretKey;
47use std::{
48 cell::RefCell,
49 collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque},
50 fmt,
51 future::poll_fn,
52 io,
53 net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
54 pin::Pin,
55 rc::Rc,
56 sync::Arc,
57 task::{ready, Context, Poll},
58 time::{Duration, Instant, SystemTime, UNIX_EPOCH},
59};
60use tokio::{
61 net::UdpSocket,
62 sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::Sender as OneshotSender},
63 task::{JoinHandle, JoinSet},
64 time::Interval,
65};
66use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
67use tracing::{debug, trace};
68
69pub mod error;
70pub mod proto;
71
72mod config;
73pub use config::{Discv4Config, Discv4ConfigBuilder};
74
75mod node;
76use node::{kad_key, NodeKey};
77
78mod table;
79
80pub use reth_network_peers::NodeRecord;
82
83#[cfg(any(test, feature = "test-utils"))]
84pub mod test_utils;
85
86use crate::table::PongTable;
87use reth_net_nat::ResolveNatInterval;
88pub use reth_net_nat::{external_ip, NatResolver};
90
91pub const DEFAULT_DISCOVERY_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
95
96pub const DEFAULT_DISCOVERY_PORT: u16 = 30303;
100
101pub const DEFAULT_DISCOVERY_ADDRESS: SocketAddr =
105 SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, DEFAULT_DISCOVERY_PORT));
106
107const MAX_PACKET_SIZE: usize = 1280;
109
110const MIN_PACKET_SIZE: usize = 32 + 65 + 1;
112
113const ALPHA: usize = 3;
115
116const MAX_NODES_PING: usize = 2 * MAX_NODES_PER_BUCKET;
121
122const MAX_QUEUED_PINGS: usize = 2 * MAX_NODES_PER_BUCKET;
130
131const SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS: usize = (MAX_PACKET_SIZE - 109) / 91;
137
138const ENDPOINT_PROOF_EXPIRATION: Duration = Duration::from_secs(24 * 60 * 60);
142
143const EXPIRE_DURATION: Duration = Duration::from_secs(60 * 60);
145
146const UDP_MESSAGE_POLL_LOOP_BUDGET: i32 = 4;
151
152type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>;
153type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>;
154
155pub(crate) type IngressSender = mpsc::Sender<IngressEvent>;
156pub(crate) type IngressReceiver = mpsc::Receiver<IngressEvent>;
157
158type NodeRecordSender = OneshotSender<Vec<NodeRecord>>;
159
160#[derive(Debug, Clone)]
167pub struct Discv4 {
168 local_addr: SocketAddr,
170 to_service: mpsc::UnboundedSender<Discv4Command>,
172 node_record: Arc<Mutex<NodeRecord>>,
176}
177
178impl Discv4 {
179 pub async fn spawn(
183 local_address: SocketAddr,
184 local_enr: NodeRecord,
185 secret_key: SecretKey,
186 config: Discv4Config,
187 ) -> io::Result<Self> {
188 let (discv4, service) = Self::bind(local_address, local_enr, secret_key, config).await?;
189
190 service.spawn();
191
192 Ok(discv4)
193 }
194
195 #[cfg(feature = "test-utils")]
199 pub fn noop() -> Self {
200 let (to_service, _rx) = mpsc::unbounded_channel();
201 let local_addr =
202 (IpAddr::from(std::net::Ipv4Addr::UNSPECIFIED), DEFAULT_DISCOVERY_PORT).into();
203 Self {
204 local_addr,
205 to_service,
206 node_record: Arc::new(Mutex::new(NodeRecord::new(
207 "127.0.0.1:3030".parse().unwrap(),
208 PeerId::random(),
209 ))),
210 }
211 }
212
213 pub async fn bind(
245 local_address: SocketAddr,
246 mut local_node_record: NodeRecord,
247 secret_key: SecretKey,
248 config: Discv4Config,
249 ) -> io::Result<(Self, Discv4Service)> {
250 let socket = UdpSocket::bind(local_address).await?;
251 let local_addr = socket.local_addr()?;
252 local_node_record.udp_port = local_addr.port();
253 trace!(target: "discv4", ?local_addr,"opened UDP socket");
254
255 let service = Discv4Service::new(socket, local_addr, local_node_record, secret_key, config);
256 let discv4 = service.handle();
257 Ok((discv4, service))
258 }
259
260 pub const fn local_addr(&self) -> SocketAddr {
262 self.local_addr
263 }
264
265 pub fn node_record(&self) -> NodeRecord {
269 *self.node_record.lock()
270 }
271
272 pub fn external_ip(&self) -> IpAddr {
274 self.node_record.lock().address
275 }
276
277 pub fn set_lookup_interval(&self, duration: Duration) {
279 self.send_to_service(Discv4Command::SetLookupInterval(duration))
280 }
281
282 pub async fn lookup_self(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
297 self.lookup_node(None).await
298 }
299
300 pub async fn lookup(&self, node_id: PeerId) -> Result<Vec<NodeRecord>, Discv4Error> {
304 self.lookup_node(Some(node_id)).await
305 }
306
307 pub async fn lookup_random(&self) -> Result<Vec<NodeRecord>, Discv4Error> {
309 let target = PeerId::random();
310 self.lookup_node(Some(target)).await
311 }
312
313 pub fn send_lookup(&self, node_id: PeerId) {
315 let cmd = Discv4Command::Lookup { node_id: Some(node_id), tx: None };
316 self.send_to_service(cmd);
317 }
318
319 async fn lookup_node(&self, node_id: Option<PeerId>) -> Result<Vec<NodeRecord>, Discv4Error> {
320 let (tx, rx) = oneshot::channel();
321 let cmd = Discv4Command::Lookup { node_id, tx: Some(tx) };
322 self.to_service.send(cmd)?;
323 Ok(rx.await?)
324 }
325
326 pub fn send_lookup_self(&self) {
328 let cmd = Discv4Command::Lookup { node_id: None, tx: None };
329 self.send_to_service(cmd);
330 }
331
332 pub fn remove_peer(&self, node_id: PeerId) {
334 let cmd = Discv4Command::Remove(node_id);
335 self.send_to_service(cmd);
336 }
337
338 pub fn add_node(&self, node_record: NodeRecord) {
340 let cmd = Discv4Command::Add(node_record);
341 self.send_to_service(cmd);
342 }
343
344 pub fn ban(&self, node_id: PeerId, ip: IpAddr) {
348 let cmd = Discv4Command::Ban(node_id, ip);
349 self.send_to_service(cmd);
350 }
351
352 pub fn ban_ip(&self, ip: IpAddr) {
356 let cmd = Discv4Command::BanIp(ip);
357 self.send_to_service(cmd);
358 }
359
360 pub fn ban_node(&self, node_id: PeerId) {
364 let cmd = Discv4Command::BanPeer(node_id);
365 self.send_to_service(cmd);
366 }
367
368 pub fn set_tcp_port(&self, port: u16) {
372 let cmd = Discv4Command::SetTcpPort(port);
373 self.send_to_service(cmd);
374 }
375
376 pub fn set_eip868_rlp_pair(&self, key: Vec<u8>, rlp: Bytes) {
382 let cmd = Discv4Command::SetEIP868RLPPair { key, rlp };
383 self.send_to_service(cmd);
384 }
385
386 pub fn set_eip868_rlp(&self, key: Vec<u8>, value: impl alloy_rlp::Encodable) {
390 self.set_eip868_rlp_pair(key, Bytes::from(alloy_rlp::encode(&value)))
391 }
392
393 #[inline]
394 fn send_to_service(&self, cmd: Discv4Command) {
395 let _ = self.to_service.send(cmd).map_err(|err| {
396 debug!(
397 target: "discv4",
398 %err,
399 "channel capacity reached, dropping command",
400 )
401 });
402 }
403
404 pub async fn update_stream(&self) -> Result<ReceiverStream<DiscoveryUpdate>, Discv4Error> {
406 let (tx, rx) = oneshot::channel();
407 let cmd = Discv4Command::Updates(tx);
408 self.to_service.send(cmd)?;
409 Ok(rx.await?)
410 }
411
412 pub fn terminate(&self) {
414 self.send_to_service(Discv4Command::Terminated);
415 }
416}
417
418#[must_use = "Stream does nothing unless polled"]
432pub struct Discv4Service {
433 local_address: SocketAddr,
435 local_eip_868_enr: Enr<SecretKey>,
437 local_node_record: NodeRecord,
439 shared_node_record: Arc<Mutex<NodeRecord>>,
441 secret_key: SecretKey,
443 _socket: Arc<UdpSocket>,
445 _tasks: JoinSet<()>,
449 kbuckets: KBucketsTable<NodeKey, NodeEntry>,
451 ingress: IngressReceiver,
455 egress: EgressSender,
459 queued_pings: VecDeque<(NodeRecord, PingReason)>,
466 pending_pings: HashMap<PeerId, PingRequest>,
468 pending_lookup: HashMap<PeerId, (Instant, LookupContext)>,
473 pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
475 pending_enr_requests: HashMap<PeerId, EnrRequestState>,
477 to_service: mpsc::UnboundedSender<Discv4Command>,
479 commands_rx: mpsc::UnboundedReceiver<Discv4Command>,
481 update_listeners: Vec<mpsc::Sender<DiscoveryUpdate>>,
483 lookup_interval: Interval,
485 lookup_rotator: LookupTargetRotator,
487 evict_expired_requests_interval: Interval,
489 ping_interval: Interval,
491 resolve_external_ip_interval: Option<ResolveNatInterval>,
493 config: Discv4Config,
495 queued_events: VecDeque<Discv4Event>,
497 received_pongs: PongTable,
499 expire_interval: Interval,
501}
502
503impl Discv4Service {
504 pub(crate) fn new(
506 socket: UdpSocket,
507 local_address: SocketAddr,
508 local_node_record: NodeRecord,
509 secret_key: SecretKey,
510 config: Discv4Config,
511 ) -> Self {
512 let socket = Arc::new(socket);
513 let (ingress_tx, ingress_rx) = mpsc::channel(config.udp_ingress_message_buffer);
514 let (egress_tx, egress_rx) = mpsc::channel(config.udp_egress_message_buffer);
515 let mut tasks = JoinSet::<()>::new();
516
517 let udp = Arc::clone(&socket);
518 tasks.spawn(receive_loop(udp, ingress_tx, local_node_record.id));
519
520 let udp = Arc::clone(&socket);
521 tasks.spawn(send_loop(udp, egress_rx));
522
523 let kbuckets = KBucketsTable::new(
524 NodeKey::from(&local_node_record).into(),
525 Duration::from_secs(60),
526 MAX_NODES_PER_BUCKET,
527 None,
528 None,
529 );
530
531 let self_lookup_interval = tokio::time::interval(config.lookup_interval);
532
533 let ping_interval = tokio::time::interval_at(
536 tokio::time::Instant::now() + config.ping_interval,
537 config.ping_interval,
538 );
539
540 let evict_expired_requests_interval = tokio::time::interval_at(
541 tokio::time::Instant::now() + config.request_timeout,
542 config.request_timeout,
543 );
544
545 let lookup_rotator = if config.enable_dht_random_walk {
546 LookupTargetRotator::default()
547 } else {
548 LookupTargetRotator::local_only()
549 };
550
551 let local_eip_868_enr = {
553 let mut builder = Enr::builder();
554 builder.ip(local_node_record.address);
555 if local_node_record.address.is_ipv4() {
556 builder.udp4(local_node_record.udp_port);
557 builder.tcp4(local_node_record.tcp_port);
558 } else {
559 builder.udp6(local_node_record.udp_port);
560 builder.tcp6(local_node_record.tcp_port);
561 }
562
563 for (key, val) in &config.additional_eip868_rlp_pairs {
564 builder.add_value_rlp(key, val.clone());
565 }
566 builder.build(&secret_key).expect("v4 is set")
567 };
568
569 let (to_service, commands_rx) = mpsc::unbounded_channel();
570
571 let shared_node_record = Arc::new(Mutex::new(local_node_record));
572
573 Self {
574 local_address,
575 local_eip_868_enr,
576 local_node_record,
577 shared_node_record,
578 _socket: socket,
579 kbuckets,
580 secret_key,
581 _tasks: tasks,
582 ingress: ingress_rx,
583 egress: egress_tx,
584 queued_pings: VecDeque::with_capacity(MAX_QUEUED_PINGS),
585 pending_pings: Default::default(),
586 pending_lookup: Default::default(),
587 pending_find_nodes: Default::default(),
588 pending_enr_requests: Default::default(),
589 commands_rx,
590 to_service,
591 update_listeners: Vec::with_capacity(1),
592 lookup_interval: self_lookup_interval,
593 ping_interval,
594 evict_expired_requests_interval,
595 lookup_rotator,
596 resolve_external_ip_interval: config.resolve_external_ip_interval(),
597 config,
598 queued_events: Default::default(),
599 received_pongs: Default::default(),
600 expire_interval: tokio::time::interval(EXPIRE_DURATION),
601 }
602 }
603
604 pub fn handle(&self) -> Discv4 {
606 Discv4 {
607 local_addr: self.local_address,
608 to_service: self.to_service.clone(),
609 node_record: self.shared_node_record.clone(),
610 }
611 }
612
613 fn enr_seq(&self) -> Option<u64> {
615 self.config.enable_eip868.then(|| self.local_eip_868_enr.seq())
616 }
617
618 pub fn set_lookup_interval(&mut self, duration: Duration) {
620 self.lookup_interval = tokio::time::interval(duration);
621 }
622
623 pub fn set_external_ip_addr(&mut self, external_ip: IpAddr) {
626 if self.local_node_record.address != external_ip {
627 debug!(target: "discv4", ?external_ip, "Updating external ip");
628 self.local_node_record.address = external_ip;
629 let _ = self.local_eip_868_enr.set_ip(external_ip, &self.secret_key);
630 let mut lock = self.shared_node_record.lock();
631 *lock = self.local_node_record;
632 debug!(target: "discv4", enr=?self.local_eip_868_enr, "Updated local ENR");
633 }
634 }
635
636 pub const fn local_peer_id(&self) -> &PeerId {
638 &self.local_node_record.id
639 }
640
641 pub const fn local_addr(&self) -> SocketAddr {
643 self.local_address
644 }
645
646 pub const fn local_enr(&self) -> NodeRecord {
650 self.local_node_record
651 }
652
653 #[cfg(test)]
655 pub const fn local_enr_mut(&mut self) -> &mut NodeRecord {
656 &mut self.local_node_record
657 }
658
659 pub fn contains_node(&self, id: PeerId) -> bool {
661 let key = kad_key(id);
662 self.kbuckets.get_index(&key).is_some()
663 }
664
665 pub fn bootstrap(&mut self) {
680 for record in self.config.bootstrap_nodes.clone() {
681 debug!(target: "discv4", ?record, "pinging boot node");
682 let key = kad_key(record.id);
683 let entry = NodeEntry::new(record);
684
685 match self.kbuckets.insert_or_update(
687 &key,
688 entry,
689 NodeStatus {
690 state: ConnectionState::Disconnected,
691 direction: ConnectionDirection::Outgoing,
692 },
693 ) {
694 InsertResult::Failed(_) => {}
695 _ => {
696 self.try_ping(record, PingReason::InitialInsert);
697 }
698 }
699 }
700 }
701
702 pub fn spawn(mut self) -> JoinHandle<()> {
706 tokio::task::spawn(async move {
707 self.bootstrap();
708
709 while let Some(event) = self.next().await {
710 trace!(target: "discv4", ?event, "processed");
711 }
712 trace!(target: "discv4", "service terminated");
713 })
714 }
715
716 pub fn update_stream(&mut self) -> ReceiverStream<DiscoveryUpdate> {
718 let (tx, rx) = mpsc::channel(512);
719 self.update_listeners.push(tx);
720 ReceiverStream::new(rx)
721 }
722
723 pub fn lookup_self(&mut self) {
725 self.lookup(self.local_node_record.id)
726 }
727
728 pub fn lookup(&mut self, target: PeerId) {
738 self.lookup_with(target, None)
739 }
740
741 fn lookup_with(&mut self, target: PeerId, tx: Option<NodeRecordSender>) {
751 trace!(target: "discv4", ?target, "Starting lookup");
752 let target_key = kad_key(target);
753
754 let ctx = LookupContext::new(
757 target_key.clone(),
758 self.kbuckets
759 .closest_values(&target_key)
760 .filter(|node| {
761 node.value.has_endpoint_proof &&
762 !self.pending_find_nodes.contains_key(&node.key.preimage().0)
763 })
764 .take(MAX_NODES_PER_BUCKET)
765 .map(|n| (target_key.distance(&n.key), n.value.record)),
766 tx,
767 );
768
769 let closest = ctx.closest(ALPHA);
771
772 if closest.is_empty() && self.pending_find_nodes.is_empty() {
773 self.bootstrap();
778 return
779 }
780
781 trace!(target: "discv4", ?target, num = closest.len(), "Start lookup closest nodes");
782
783 for node in closest {
784 self.find_node_checked(&node, ctx.clone());
788 }
789 }
790
791 fn find_node(&mut self, node: &NodeRecord, ctx: LookupContext) {
795 trace!(target: "discv4", ?node, lookup=?ctx.target(), "Sending FindNode");
796 ctx.mark_queried(node.id);
797 let id = ctx.target();
798 let msg = Message::FindNode(FindNode { id, expire: self.find_node_expiration() });
799 self.send_packet(msg, node.udp_addr());
800 self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx));
801 }
802
803 fn find_node_checked(&mut self, node: &NodeRecord, ctx: LookupContext) {
808 let max_failures = self.config.max_find_node_failures;
809 let needs_ping = self
810 .on_entry(node.id, |entry| entry.exceeds_find_node_failures(max_failures))
811 .unwrap_or(true);
812 if needs_ping {
813 self.try_ping(*node, PingReason::Lookup(*node, ctx))
814 } else {
815 self.find_node(node, ctx)
816 }
817 }
818
819 fn notify(&mut self, update: DiscoveryUpdate) {
823 self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) {
824 Ok(()) => true,
825 Err(err) => match err {
826 TrySendError::Full(_) => true,
827 TrySendError::Closed(_) => false,
828 },
829 });
830 }
831
832 pub fn ban_ip(&mut self, ip: IpAddr) {
834 self.config.ban_list.ban_ip(ip);
835 }
836
837 pub fn ban_node(&mut self, node_id: PeerId) {
839 self.remove_node(node_id);
840 self.config.ban_list.ban_peer(node_id);
841 }
842
843 pub fn ban_ip_until(&mut self, ip: IpAddr, until: Instant) {
845 self.config.ban_list.ban_ip_until(ip, until);
846 }
847
848 pub fn ban_node_until(&mut self, node_id: PeerId, until: Instant) {
850 self.remove_node(node_id);
851 self.config.ban_list.ban_peer_until(node_id, until);
852 }
853
854 pub fn remove_node(&mut self, node_id: PeerId) -> bool {
859 let key = kad_key(node_id);
860 self.remove_key(node_id, key)
861 }
862
863 pub fn soft_remove_node(&mut self, node_id: PeerId) -> bool {
868 let key = kad_key(node_id);
869 let Some(bucket) = self.kbuckets.get_bucket(&key) else { return false };
870 if bucket.num_entries() < MAX_NODES_PER_BUCKET / 2 {
871 return false
873 }
874 self.remove_key(node_id, key)
875 }
876
877 fn remove_key(&mut self, node_id: PeerId, key: discv5::Key<NodeKey>) -> bool {
878 let removed = self.kbuckets.remove(&key);
879 if removed {
880 trace!(target: "discv4", ?node_id, "removed node");
881 self.notify(DiscoveryUpdate::Removed(node_id));
882 }
883 removed
884 }
885
886 pub fn num_connected(&self) -> usize {
888 self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected())
889 }
890
891 fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool {
893 if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) {
894 if timestamp.elapsed() < self.config.bond_expiration {
895 return true
896 }
897 }
898 false
899 }
900
901 fn on_entry<F, R>(&mut self, peer_id: PeerId, f: F) -> Option<R>
903 where
904 F: FnOnce(&NodeEntry) -> R,
905 {
906 let key = kad_key(peer_id);
907 match self.kbuckets.entry(&key) {
908 BucketEntry::Present(entry, _) => Some(f(entry.value())),
909 BucketEntry::Pending(mut entry, _) => Some(f(entry.value())),
910 _ => None,
911 }
912 }
913
914 fn update_on_reping(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
921 if record.id == self.local_node_record.id {
922 return
923 }
924
925 if !self.config.enable_eip868 {
927 last_enr_seq = None;
928 }
929
930 let key = kad_key(record.id);
931 let old_enr = match self.kbuckets.entry(&key) {
932 kbucket::Entry::Present(mut entry, _) => {
933 entry.value_mut().update_with_enr(last_enr_seq)
934 }
935 kbucket::Entry::Pending(mut entry, _) => entry.value().update_with_enr(last_enr_seq),
936 _ => return,
937 };
938
939 match (last_enr_seq, old_enr) {
941 (Some(new), Some(old)) => {
942 if new > old {
943 self.send_enr_request(record);
944 }
945 }
946 (Some(_), None) => {
947 self.send_enr_request(record);
949 }
950 _ => {}
951 };
952 }
953
954 fn update_on_pong(&mut self, record: NodeRecord, mut last_enr_seq: Option<u64>) {
956 if record.id == *self.local_peer_id() {
957 return
958 }
959
960 if !self.config.enable_eip868 {
962 last_enr_seq = None;
963 }
964
965 let has_enr_seq = last_enr_seq.is_some();
968
969 let key = kad_key(record.id);
970 match self.kbuckets.entry(&key) {
971 kbucket::Entry::Present(mut entry, old_status) => {
972 entry.value_mut().establish_proof();
974 entry.value_mut().update_with_enr(last_enr_seq);
975
976 if !old_status.is_connected() {
977 let _ = entry.update(ConnectionState::Connected, Some(old_status.direction));
978 trace!(target: "discv4", ?record, "added after successful endpoint proof");
979 self.notify(DiscoveryUpdate::Added(record));
980
981 if has_enr_seq {
982 self.send_enr_request(record);
984 }
985 }
986 }
987 kbucket::Entry::Pending(mut entry, mut status) => {
988 entry.value().establish_proof();
990 entry.value().update_with_enr(last_enr_seq);
991
992 if !status.is_connected() {
993 status.state = ConnectionState::Connected;
994 let _ = entry.update(status);
995 trace!(target: "discv4", ?record, "added after successful endpoint proof");
996 self.notify(DiscoveryUpdate::Added(record));
997
998 if has_enr_seq {
999 self.send_enr_request(record);
1001 }
1002 }
1003 }
1004 _ => {}
1005 };
1006 }
1007
1008 pub fn add_all_nodes(&mut self, records: impl IntoIterator<Item = NodeRecord>) {
1012 for record in records {
1013 self.add_node(record);
1014 }
1015 }
1016
1017 pub fn add_node(&mut self, record: NodeRecord) -> bool {
1023 let key = kad_key(record.id);
1024 match self.kbuckets.entry(&key) {
1025 kbucket::Entry::Absent(entry) => {
1026 let node = NodeEntry::new(record);
1027 match entry.insert(
1028 node,
1029 NodeStatus {
1030 direction: ConnectionDirection::Outgoing,
1031 state: ConnectionState::Disconnected,
1032 },
1033 ) {
1034 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1035 trace!(target: "discv4", ?record, "inserted new record");
1036 }
1037 _ => return false,
1038 }
1039 }
1040 _ => return false,
1041 }
1042
1043 self.try_ping(record, PingReason::InitialInsert);
1045 true
1046 }
1047
1048 pub(crate) fn send_packet(&self, msg: Message, to: SocketAddr) -> B256 {
1050 let (payload, hash) = msg.encode(&self.secret_key);
1051 trace!(target: "discv4", r#type=?msg.msg_type(), ?to, ?hash, "sending packet");
1052 let _ = self.egress.try_send((payload, to)).map_err(|err| {
1053 debug!(
1054 target: "discv4",
1055 %err,
1056 "dropped outgoing packet",
1057 );
1058 });
1059 hash
1060 }
1061
1062 fn on_ping(&mut self, ping: Ping, remote_addr: SocketAddr, remote_id: PeerId, hash: B256) {
1064 if self.is_expired(ping.expire) {
1065 return
1067 }
1068
1069 let record = NodeRecord {
1071 address: remote_addr.ip(),
1072 udp_port: remote_addr.port(),
1073 tcp_port: ping.from.tcp_port,
1074 id: remote_id,
1075 }
1076 .into_ipv4_mapped();
1077
1078 let key = kad_key(record.id);
1079
1080 let mut is_new_insert = false;
1087 let mut needs_bond = false;
1088 let mut is_proven = false;
1089
1090 let old_enr = match self.kbuckets.entry(&key) {
1091 kbucket::Entry::Present(mut entry, _) => {
1092 if entry.value().is_expired() {
1093 needs_bond = true;
1096 } else {
1097 is_proven = entry.value().has_endpoint_proof;
1098 }
1099 entry.value_mut().update_with_enr(ping.enr_sq)
1100 }
1101 kbucket::Entry::Pending(mut entry, _) => {
1102 if entry.value().is_expired() {
1103 needs_bond = true;
1106 } else {
1107 is_proven = entry.value().has_endpoint_proof;
1108 }
1109 entry.value().update_with_enr(ping.enr_sq)
1110 }
1111 kbucket::Entry::Absent(entry) => {
1112 let mut node = NodeEntry::new(record);
1113 node.last_enr_seq = ping.enr_sq;
1114
1115 match entry.insert(
1116 node,
1117 NodeStatus {
1118 direction: ConnectionDirection::Incoming,
1119 state: ConnectionState::Disconnected,
1121 },
1122 ) {
1123 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1124 is_new_insert = true;
1126 }
1127 BucketInsertResult::Full => {
1128 trace!(target: "discv4", ?record, "discovered new record but bucket is full");
1132 self.notify(DiscoveryUpdate::DiscoveredAtCapacity(record));
1133 needs_bond = true;
1134 }
1135 BucketInsertResult::TooManyIncoming | BucketInsertResult::NodeExists => {
1136 needs_bond = true;
1137 }
1139 BucketInsertResult::FailedFilter => return,
1140 }
1141
1142 None
1143 }
1144 kbucket::Entry::SelfEntry => return,
1145 };
1146
1147 let pong = Message::Pong(Pong {
1150 to: record.into(),
1152 echo: hash,
1153 expire: ping.expire,
1154 enr_sq: self.enr_seq(),
1155 });
1156 self.send_packet(pong, remote_addr);
1157
1158 if is_new_insert {
1160 self.try_ping(record, PingReason::InitialInsert);
1161 } else if needs_bond {
1162 self.try_ping(record, PingReason::EstablishBond);
1163 } else if is_proven {
1164 if let Some((_, ctx)) = self.pending_lookup.remove(&record.id) {
1168 if self.pending_find_nodes.contains_key(&record.id) {
1169 ctx.unmark_queried(record.id);
1172 } else {
1173 self.find_node(&record, ctx);
1176 }
1177 }
1178 } else {
1179 match (ping.enr_sq, old_enr) {
1181 (Some(new), Some(old)) => {
1182 if new > old {
1183 self.send_enr_request(record);
1184 }
1185 }
1186 (Some(_), None) => {
1187 self.send_enr_request(record);
1188 }
1189 _ => {}
1190 };
1191 }
1192 }
1193
1194 fn try_ping(&mut self, node: NodeRecord, reason: PingReason) {
1196 if node.id == *self.local_peer_id() {
1197 return
1199 }
1200
1201 if self.pending_pings.contains_key(&node.id) ||
1202 self.pending_find_nodes.contains_key(&node.id)
1203 {
1204 return
1205 }
1206
1207 if self.queued_pings.iter().any(|(n, _)| n.id == node.id) {
1208 return
1209 }
1210
1211 if self.pending_pings.len() < MAX_NODES_PING {
1212 self.send_ping(node, reason);
1213 } else if self.queued_pings.len() < MAX_QUEUED_PINGS {
1214 self.queued_pings.push_back((node, reason));
1215 }
1216 }
1217
1218 pub(crate) fn send_ping(&mut self, node: NodeRecord, reason: PingReason) -> B256 {
1222 let remote_addr = node.udp_addr();
1223 let id = node.id;
1224 let ping = Ping {
1225 from: self.local_node_record.into(),
1226 to: node.into(),
1227 expire: self.ping_expiration(),
1228 enr_sq: self.enr_seq(),
1229 };
1230 trace!(target: "discv4", ?ping, "sending ping");
1231 let echo_hash = self.send_packet(Message::Ping(ping), remote_addr);
1232
1233 self.pending_pings
1234 .insert(id, PingRequest { sent_at: Instant::now(), node, echo_hash, reason });
1235 echo_hash
1236 }
1237
1238 pub(crate) fn send_enr_request(&mut self, node: NodeRecord) {
1242 if !self.config.enable_eip868 {
1243 return
1244 }
1245 let remote_addr = node.udp_addr();
1246 let enr_request = EnrRequest { expire: self.enr_request_expiration() };
1247
1248 trace!(target: "discv4", ?enr_request, "sending enr request");
1249 let echo_hash = self.send_packet(Message::EnrRequest(enr_request), remote_addr);
1250
1251 self.pending_enr_requests
1252 .insert(node.id, EnrRequestState { sent_at: Instant::now(), echo_hash });
1253 }
1254
1255 fn on_pong(&mut self, pong: Pong, remote_addr: SocketAddr, remote_id: PeerId) {
1257 if self.is_expired(pong.expire) {
1258 return
1259 }
1260
1261 let PingRequest { node, reason, .. } = match self.pending_pings.entry(remote_id) {
1262 Entry::Occupied(entry) => {
1263 {
1264 let request = entry.get();
1265 if request.echo_hash != pong.echo {
1266 trace!(target: "discv4", from=?remote_addr, expected=?request.echo_hash, echo_hash=?pong.echo,"Got unexpected Pong");
1267 return
1268 }
1269 }
1270 entry.remove()
1271 }
1272 Entry::Vacant(_) => return,
1273 };
1274
1275 self.received_pongs.on_pong(remote_id, remote_addr.ip());
1277
1278 match reason {
1279 PingReason::InitialInsert => {
1280 self.update_on_pong(node, pong.enr_sq);
1281 }
1282 PingReason::EstablishBond => {
1283 self.update_on_pong(node, pong.enr_sq);
1285 }
1286 PingReason::RePing => {
1287 self.update_on_reping(node, pong.enr_sq);
1288 }
1289 PingReason::Lookup(node, ctx) => {
1290 self.update_on_pong(node, pong.enr_sq);
1291 self.pending_lookup.insert(node.id, (Instant::now(), ctx));
1296 }
1297 }
1298 }
1299
1300 fn on_find_node(&mut self, msg: FindNode, remote_addr: SocketAddr, node_id: PeerId) {
1302 if self.is_expired(msg.expire) {
1303 return
1305 }
1306 if node_id == *self.local_peer_id() {
1307 return
1309 }
1310
1311 if self.has_bond(node_id, remote_addr.ip()) {
1312 self.respond_closest(msg.id, remote_addr)
1313 }
1314 }
1315
1316 fn on_enr_response(&mut self, msg: EnrResponse, remote_addr: SocketAddr, id: PeerId) {
1318 trace!(target: "discv4", ?remote_addr, ?msg, "received ENR response");
1319 if let Some(resp) = self.pending_enr_requests.remove(&id) {
1320 let enr_id = pk2id(&msg.enr.public_key());
1322 if id != enr_id {
1323 return
1324 }
1325
1326 if resp.echo_hash == msg.request_hash {
1327 let key = kad_key(id);
1328 let fork_id = msg.eth_fork_id();
1329 let (record, old_fork_id) = match self.kbuckets.entry(&key) {
1330 kbucket::Entry::Present(mut entry, _) => {
1331 let id = entry.value_mut().update_with_fork_id(fork_id);
1332 (entry.value().record, id)
1333 }
1334 kbucket::Entry::Pending(mut entry, _) => {
1335 let id = entry.value().update_with_fork_id(fork_id);
1336 (entry.value().record, id)
1337 }
1338 _ => return,
1339 };
1340 match (fork_id, old_fork_id) {
1341 (Some(new), Some(old)) => {
1342 if new != old {
1343 self.notify(DiscoveryUpdate::EnrForkId(record, new))
1344 }
1345 }
1346 (Some(new), None) => self.notify(DiscoveryUpdate::EnrForkId(record, new)),
1347 _ => {}
1348 }
1349 }
1350 }
1351 }
1352
1353 fn on_enr_request(
1355 &self,
1356 msg: EnrRequest,
1357 remote_addr: SocketAddr,
1358 id: PeerId,
1359 request_hash: B256,
1360 ) {
1361 if !self.config.enable_eip868 || self.is_expired(msg.expire) {
1362 return
1363 }
1364
1365 if self.has_bond(id, remote_addr.ip()) {
1366 self.send_packet(
1367 Message::EnrResponse(EnrResponse {
1368 request_hash,
1369 enr: self.local_eip_868_enr.clone(),
1370 }),
1371 remote_addr,
1372 );
1373 }
1374 }
1375
1376 fn on_neighbours(&mut self, msg: Neighbours, remote_addr: SocketAddr, node_id: PeerId) {
1379 if self.is_expired(msg.expire) {
1380 return
1382 }
1383 let ctx = match self.pending_find_nodes.entry(node_id) {
1385 Entry::Occupied(mut entry) => {
1386 {
1387 let request = entry.get_mut();
1388 request.answered = true;
1390 let total = request.response_count + msg.nodes.len();
1391
1392 if total <= MAX_NODES_PER_BUCKET {
1394 request.response_count = total;
1395 } else {
1396 trace!(target: "discv4", total, from=?remote_addr, "Received neighbors packet entries exceeds max nodes per bucket");
1397 return
1398 }
1399 };
1400
1401 if entry.get().response_count == MAX_NODES_PER_BUCKET {
1402 let ctx = entry.remove().lookup_context;
1404 ctx.mark_responded(node_id);
1405 ctx
1406 } else {
1407 entry.get().lookup_context.clone()
1408 }
1409 }
1410 Entry::Vacant(_) => {
1411 trace!(target: "discv4", from=?remote_addr, "Received unsolicited Neighbours");
1413 return
1414 }
1415 };
1416
1417 trace!(target: "discv4",
1419 target=format!("{:#?}", node_id),
1420 peers_count=msg.nodes.len(),
1421 peers=format!("[{:#}]", msg.nodes.iter()
1422 .map(|node_rec| node_rec.id
1423 ).format(", ")),
1424 "Received peers from Neighbours packet"
1425 );
1426
1427 for node in msg.nodes.into_iter().map(NodeRecord::into_ipv4_mapped) {
1430 if self.config.ban_list.is_banned(&node.id, &node.address) {
1432 trace!(target: "discv4", peer_id=?node.id, ip=?node.address, "ignoring banned record");
1433 continue
1434 }
1435
1436 ctx.add_node(node);
1437 }
1438
1439 let closest =
1441 ctx.filter_closest(ALPHA, |node| !self.pending_find_nodes.contains_key(&node.id));
1442
1443 for closest in closest {
1444 let key = kad_key(closest.id);
1445 match self.kbuckets.entry(&key) {
1446 BucketEntry::Absent(entry) => {
1447 ctx.mark_queried(closest.id);
1453 let node = NodeEntry::new(closest);
1454 match entry.insert(
1455 node,
1456 NodeStatus {
1457 direction: ConnectionDirection::Outgoing,
1458 state: ConnectionState::Disconnected,
1459 },
1460 ) {
1461 BucketInsertResult::Inserted | BucketInsertResult::Pending { .. } => {
1462 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1464 }
1465 BucketInsertResult::Full => {
1466 self.notify(DiscoveryUpdate::DiscoveredAtCapacity(closest))
1468 }
1469 _ => {}
1470 }
1471 }
1472 BucketEntry::SelfEntry => {
1473 }
1475 BucketEntry::Present(entry, _) => {
1476 if entry.value().has_endpoint_proof {
1477 if entry
1478 .value()
1479 .exceeds_find_node_failures(self.config.max_find_node_failures)
1480 {
1481 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1482 } else {
1483 self.find_node(&closest, ctx.clone());
1484 }
1485 }
1486 }
1487 BucketEntry::Pending(mut entry, _) => {
1488 if entry.value().has_endpoint_proof {
1489 if entry
1490 .value()
1491 .exceeds_find_node_failures(self.config.max_find_node_failures)
1492 {
1493 self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
1494 } else {
1495 self.find_node(&closest, ctx.clone());
1496 }
1497 }
1498 }
1499 }
1500 }
1501 }
1502
1503 fn respond_closest(&mut self, target: PeerId, to: SocketAddr) {
1505 let key = kad_key(target);
1506 let expire = self.send_neighbours_expiration();
1507
1508 let closest_nodes =
1510 self.kbuckets.closest_values(&key).take(MAX_NODES_PER_BUCKET).collect::<Vec<_>>();
1511
1512 for nodes in closest_nodes.chunks(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) {
1513 let nodes = nodes.iter().map(|node| node.value.record).collect::<Vec<NodeRecord>>();
1514 trace!(target: "discv4", len = nodes.len(), to=?to,"Sent neighbours packet");
1515 let msg = Message::Neighbours(Neighbours { nodes, expire });
1516 self.send_packet(msg, to);
1517 }
1518 }
1519
1520 fn evict_expired_requests(&mut self, now: Instant) {
1521 self.pending_enr_requests.retain(|_node_id, enr_request| {
1522 now.duration_since(enr_request.sent_at) < self.config.enr_expiration
1523 });
1524
1525 let mut failed_pings = Vec::new();
1526 self.pending_pings.retain(|node_id, ping_request| {
1527 if now.duration_since(ping_request.sent_at) > self.config.ping_expiration {
1528 failed_pings.push(*node_id);
1529 return false
1530 }
1531 true
1532 });
1533
1534 if !failed_pings.is_empty() {
1535 trace!(target: "discv4", num=%failed_pings.len(), "evicting nodes due to failed pong");
1537 for node_id in failed_pings {
1538 self.remove_node(node_id);
1539 }
1540 }
1541
1542 let mut failed_lookups = Vec::new();
1543 self.pending_lookup.retain(|node_id, (lookup_sent_at, _)| {
1544 if now.duration_since(*lookup_sent_at) > self.config.request_timeout {
1545 failed_lookups.push(*node_id);
1546 return false
1547 }
1548 true
1549 });
1550
1551 if !failed_lookups.is_empty() {
1552 trace!(target: "discv4", num=%failed_lookups.len(), "evicting nodes due to failed lookup");
1554 for node_id in failed_lookups {
1555 self.remove_node(node_id);
1556 }
1557 }
1558
1559 self.evict_failed_find_nodes(now);
1560 }
1561
1562 fn evict_failed_find_nodes(&mut self, now: Instant) {
1564 let mut failed_find_nodes = Vec::new();
1565 self.pending_find_nodes.retain(|node_id, find_node_request| {
1566 if now.duration_since(find_node_request.sent_at) > self.config.neighbours_expiration {
1567 if !find_node_request.answered {
1568 failed_find_nodes.push(*node_id);
1571 }
1572 return false
1573 }
1574 true
1575 });
1576
1577 if failed_find_nodes.is_empty() {
1578 return
1579 }
1580
1581 trace!(target: "discv4", num=%failed_find_nodes.len(), "processing failed find nodes");
1582
1583 for node_id in failed_find_nodes {
1584 let key = kad_key(node_id);
1585 let failures = match self.kbuckets.entry(&key) {
1586 kbucket::Entry::Present(mut entry, _) => {
1587 entry.value_mut().inc_failed_request();
1588 entry.value().find_node_failures
1589 }
1590 kbucket::Entry::Pending(mut entry, _) => {
1591 entry.value().inc_failed_request();
1592 entry.value().find_node_failures
1593 }
1594 _ => continue,
1595 };
1596
1597 if failures > self.config.max_find_node_failures {
1601 self.soft_remove_node(node_id);
1602 }
1603 }
1604 }
1605
1606 fn re_ping_oldest(&mut self) {
1611 let mut nodes = self
1612 .kbuckets
1613 .iter_ref()
1614 .filter(|entry| entry.node.value.is_expired())
1615 .map(|n| n.node.value)
1616 .collect::<Vec<_>>();
1617 nodes.sort_by(|a, b| a.last_seen.cmp(&b.last_seen));
1618 let to_ping = nodes.into_iter().map(|n| n.record).take(MAX_NODES_PING).collect::<Vec<_>>();
1619 for node in to_ping {
1620 self.try_ping(node, PingReason::RePing)
1621 }
1622 }
1623
1624 fn is_expired(&self, expiration: u64) -> bool {
1626 self.ensure_not_expired(expiration).is_err()
1627 }
1628
1629 fn ensure_not_expired(&self, timestamp: u64) -> Result<(), ()> {
1639 let _ = i64::try_from(timestamp).map_err(drop)?;
1641
1642 let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
1643 if self.config.enforce_expiration_timestamps && timestamp < now {
1644 trace!(target: "discv4", "Expired packet");
1645 return Err(())
1646 }
1647 Ok(())
1648 }
1649
1650 fn ping_buffered(&mut self) {
1652 while self.pending_pings.len() < MAX_NODES_PING {
1653 match self.queued_pings.pop_front() {
1654 Some((next, reason)) => self.try_ping(next, reason),
1655 None => break,
1656 }
1657 }
1658 }
1659
1660 fn ping_expiration(&self) -> u64 {
1661 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.ping_expiration)
1662 .as_secs()
1663 }
1664
1665 fn find_node_expiration(&self) -> u64 {
1666 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.request_timeout)
1667 .as_secs()
1668 }
1669
1670 fn enr_request_expiration(&self) -> u64 {
1671 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.enr_expiration)
1672 .as_secs()
1673 }
1674
1675 fn send_neighbours_expiration(&self) -> u64 {
1676 (SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + self.config.neighbours_expiration)
1677 .as_secs()
1678 }
1679
1680 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Discv4Event> {
1686 loop {
1687 if let Some(event) = self.queued_events.pop_front() {
1689 return Poll::Ready(event)
1690 }
1691
1692 if self.config.enable_lookup {
1694 while self.lookup_interval.poll_tick(cx).is_ready() {
1695 let target = self.lookup_rotator.next(&self.local_node_record.id);
1696 self.lookup_with(target, None);
1697 }
1698 }
1699
1700 while self.ping_interval.poll_tick(cx).is_ready() {
1702 self.re_ping_oldest();
1703 }
1704
1705 if let Some(Poll::Ready(Some(ip))) =
1706 self.resolve_external_ip_interval.as_mut().map(|r| r.poll_tick(cx))
1707 {
1708 self.set_external_ip_addr(ip);
1709 }
1710
1711 while let Poll::Ready(Some(cmd)) = self.commands_rx.poll_recv(cx) {
1713 match cmd {
1714 Discv4Command::Add(enr) => {
1715 self.add_node(enr);
1716 }
1717 Discv4Command::Lookup { node_id, tx } => {
1718 let node_id = node_id.unwrap_or(self.local_node_record.id);
1719 self.lookup_with(node_id, tx);
1720 }
1721 Discv4Command::SetLookupInterval(duration) => {
1722 self.set_lookup_interval(duration);
1723 }
1724 Discv4Command::Updates(tx) => {
1725 let rx = self.update_stream();
1726 let _ = tx.send(rx);
1727 }
1728 Discv4Command::BanPeer(node_id) => self.ban_node(node_id),
1729 Discv4Command::Remove(node_id) => {
1730 self.remove_node(node_id);
1731 }
1732 Discv4Command::Ban(node_id, ip) => {
1733 self.ban_node(node_id);
1734 self.ban_ip(ip);
1735 }
1736 Discv4Command::BanIp(ip) => {
1737 self.ban_ip(ip);
1738 }
1739 Discv4Command::SetEIP868RLPPair { key, rlp } => {
1740 debug!(target: "discv4", key=%String::from_utf8_lossy(&key), "Update EIP-868 extension pair");
1741
1742 let _ = self.local_eip_868_enr.insert_raw_rlp(key, rlp, &self.secret_key);
1743 }
1744 Discv4Command::SetTcpPort(port) => {
1745 debug!(target: "discv4", %port, "Update tcp port");
1746 self.local_node_record.tcp_port = port;
1747 if self.local_node_record.address.is_ipv4() {
1748 let _ = self.local_eip_868_enr.set_tcp4(port, &self.secret_key);
1749 } else {
1750 let _ = self.local_eip_868_enr.set_tcp6(port, &self.secret_key);
1751 }
1752 }
1753
1754 Discv4Command::Terminated => {
1755 self.queued_events.push_back(Discv4Event::Terminated);
1757 }
1758 }
1759 }
1760
1761 let mut udp_message_budget = UDP_MESSAGE_POLL_LOOP_BUDGET;
1763
1764 while let Poll::Ready(Some(event)) = self.ingress.poll_recv(cx) {
1766 match event {
1767 IngressEvent::RecvError(err) => {
1768 debug!(target: "discv4", %err, "failed to read datagram");
1769 }
1770 IngressEvent::BadPacket(from, err, data) => {
1771 trace!(target: "discv4", ?from, %err, packet=?hex::encode(&data), "bad packet");
1772 }
1773 IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => {
1774 trace!(target: "discv4", r#type=?msg.msg_type(), from=?remote_addr,"received packet");
1775 let event = match msg {
1776 Message::Ping(ping) => {
1777 self.on_ping(ping, remote_addr, node_id, hash);
1778 Discv4Event::Ping
1779 }
1780 Message::Pong(pong) => {
1781 self.on_pong(pong, remote_addr, node_id);
1782 Discv4Event::Pong
1783 }
1784 Message::FindNode(msg) => {
1785 self.on_find_node(msg, remote_addr, node_id);
1786 Discv4Event::FindNode
1787 }
1788 Message::Neighbours(msg) => {
1789 self.on_neighbours(msg, remote_addr, node_id);
1790 Discv4Event::Neighbours
1791 }
1792 Message::EnrRequest(msg) => {
1793 self.on_enr_request(msg, remote_addr, node_id, hash);
1794 Discv4Event::EnrRequest
1795 }
1796 Message::EnrResponse(msg) => {
1797 self.on_enr_response(msg, remote_addr, node_id);
1798 Discv4Event::EnrResponse
1799 }
1800 };
1801
1802 self.queued_events.push_back(event);
1803 }
1804 }
1805
1806 udp_message_budget -= 1;
1807 if udp_message_budget < 0 {
1808 trace!(target: "discv4", budget=UDP_MESSAGE_POLL_LOOP_BUDGET, "exhausted message poll budget");
1809 if self.queued_events.is_empty() {
1810 cx.waker().wake_by_ref();
1813 }
1814 break
1815 }
1816 }
1817
1818 self.ping_buffered();
1820
1821 while self.evict_expired_requests_interval.poll_tick(cx).is_ready() {
1823 self.evict_expired_requests(Instant::now());
1824 }
1825
1826 while self.expire_interval.poll_tick(cx).is_ready() {
1828 self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION);
1829 }
1830
1831 if self.queued_events.is_empty() {
1832 return Poll::Pending
1833 }
1834 }
1835 }
1836}
1837
1838impl Stream for Discv4Service {
1840 type Item = Discv4Event;
1841
1842 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1843 match ready!(self.get_mut().poll(cx)) {
1845 Discv4Event::Terminated => Poll::Ready(None),
1847 ev => Poll::Ready(Some(ev)),
1849 }
1850 }
1851}
1852
1853impl fmt::Debug for Discv4Service {
1854 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1855 f.debug_struct("Discv4Service")
1856 .field("local_address", &self.local_address)
1857 .field("local_peer_id", &self.local_peer_id())
1858 .field("local_node_record", &self.local_node_record)
1859 .field("queued_pings", &self.queued_pings)
1860 .field("pending_lookup", &self.pending_lookup)
1861 .field("pending_find_nodes", &self.pending_find_nodes)
1862 .field("lookup_interval", &self.lookup_interval)
1863 .finish_non_exhaustive()
1864 }
1865}
1866
1867#[derive(Debug, Eq, PartialEq)]
1871pub enum Discv4Event {
1872 Ping,
1874 Pong,
1876 FindNode,
1878 Neighbours,
1880 EnrRequest,
1882 EnrResponse,
1884 Terminated,
1886}
1887
1888pub(crate) async fn send_loop(udp: Arc<UdpSocket>, rx: EgressReceiver) {
1890 let mut stream = ReceiverStream::new(rx);
1891 while let Some((payload, to)) = stream.next().await {
1892 match udp.send_to(&payload, to).await {
1893 Ok(size) => {
1894 trace!(target: "discv4", ?to, ?size,"sent payload");
1895 }
1896 Err(err) => {
1897 debug!(target: "discv4", ?to, %err,"Failed to send datagram.");
1898 }
1899 }
1900 }
1901}
1902
1903const MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP: usize = 60usize;
1905
1906pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_id: PeerId) {
1911 let send = |event: IngressEvent| async {
1912 let _ = tx.send(event).await.map_err(|err| {
1913 debug!(
1914 target: "discv4",
1915 %err,
1916 "failed send incoming packet",
1917 )
1918 });
1919 };
1920
1921 let mut cache = ReceiveCache::default();
1922
1923 let tick = MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP / 2;
1925 let mut interval = tokio::time::interval(Duration::from_secs(tick as u64));
1926
1927 let mut buf = [0; MAX_PACKET_SIZE];
1928 loop {
1929 let res = udp.recv_from(&mut buf).await;
1930 match res {
1931 Err(err) => {
1932 debug!(target: "discv4", %err, "Failed to read datagram.");
1933 send(IngressEvent::RecvError(err)).await;
1934 }
1935 Ok((read, remote_addr)) => {
1936 if cache.inc_ip(remote_addr.ip()) > MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP {
1938 trace!(target: "discv4", ?remote_addr, "Too many incoming packets from IP.");
1939 continue
1940 }
1941
1942 let packet = &buf[..read];
1943 match Message::decode(packet) {
1944 Ok(packet) => {
1945 if packet.node_id == local_id {
1946 debug!(target: "discv4", ?remote_addr, "Received own packet.");
1948 continue
1949 }
1950
1951 if cache.contains_packet(packet.hash) {
1953 debug!(target: "discv4", ?remote_addr, "Received duplicate packet.");
1954 continue
1955 }
1956
1957 send(IngressEvent::Packet(remote_addr, packet)).await;
1958 }
1959 Err(err) => {
1960 trace!(target: "discv4", %err,"Failed to decode packet");
1961 send(IngressEvent::BadPacket(remote_addr, err, packet.to_vec())).await
1962 }
1963 }
1964 }
1965 }
1966
1967 if poll_fn(|cx| match interval.poll_tick(cx) {
1969 Poll::Ready(_) => Poll::Ready(true),
1970 Poll::Pending => Poll::Ready(false),
1971 })
1972 .await
1973 {
1974 cache.tick_ips(tick);
1975 }
1976 }
1977}
1978
1979struct ReceiveCache {
1983 ip_messages: HashMap<IpAddr, usize>,
1989 unique_packets: schnellru::LruMap<B256, ()>,
1991}
1992
1993impl ReceiveCache {
1994 fn tick_ips(&mut self, tick: usize) {
1998 self.ip_messages.retain(|_, count| {
1999 if let Some(reset) = count.checked_sub(tick) {
2000 *count = reset;
2001 true
2002 } else {
2003 false
2004 }
2005 });
2006 }
2007
2008 fn inc_ip(&mut self, ip: IpAddr) -> usize {
2010 let ctn = self.ip_messages.entry(ip).or_default();
2011 *ctn = ctn.saturating_add(1);
2012 *ctn
2013 }
2014
2015 fn contains_packet(&mut self, hash: B256) -> bool {
2017 !self.unique_packets.insert(hash, ())
2018 }
2019}
2020
2021impl Default for ReceiveCache {
2022 fn default() -> Self {
2023 Self {
2024 ip_messages: Default::default(),
2025 unique_packets: schnellru::LruMap::new(schnellru::ByLength::new(32)),
2026 }
2027 }
2028}
2029
2030enum Discv4Command {
2032 Add(NodeRecord),
2033 SetTcpPort(u16),
2034 SetEIP868RLPPair { key: Vec<u8>, rlp: Bytes },
2035 Ban(PeerId, IpAddr),
2036 BanPeer(PeerId),
2037 BanIp(IpAddr),
2038 Remove(PeerId),
2039 Lookup { node_id: Option<PeerId>, tx: Option<NodeRecordSender> },
2040 SetLookupInterval(Duration),
2041 Updates(OneshotSender<ReceiverStream<DiscoveryUpdate>>),
2042 Terminated,
2043}
2044
2045#[derive(Debug)]
2047pub(crate) enum IngressEvent {
2048 RecvError(io::Error),
2050 BadPacket(SocketAddr, DecodePacketError, Vec<u8>),
2052 Packet(SocketAddr, Packet),
2054}
2055
2056#[derive(Debug)]
2058struct PingRequest {
2059 sent_at: Instant,
2061 node: NodeRecord,
2063 echo_hash: B256,
2065 reason: PingReason,
2067}
2068
2069#[derive(Debug)]
2073struct LookupTargetRotator {
2074 interval: usize,
2075 counter: usize,
2076}
2077
2078impl LookupTargetRotator {
2081 const fn local_only() -> Self {
2083 Self { interval: 1, counter: 0 }
2084 }
2085}
2086
2087impl Default for LookupTargetRotator {
2088 fn default() -> Self {
2089 Self {
2090 interval: 4,
2092 counter: 3,
2093 }
2094 }
2095}
2096
2097impl LookupTargetRotator {
2098 fn next(&mut self, local: &PeerId) -> PeerId {
2100 self.counter += 1;
2101 self.counter %= self.interval;
2102 if self.counter == 0 {
2103 return *local
2104 }
2105 PeerId::random()
2106 }
2107}
2108
2109#[derive(Clone, Debug)]
2114struct LookupContext {
2115 inner: Rc<LookupContextInner>,
2116}
2117
2118impl LookupContext {
2119 fn new(
2121 target: discv5::Key<NodeKey>,
2122 nearest_nodes: impl IntoIterator<Item = (Distance, NodeRecord)>,
2123 listener: Option<NodeRecordSender>,
2124 ) -> Self {
2125 let closest_nodes = nearest_nodes
2126 .into_iter()
2127 .map(|(distance, record)| {
2128 (distance, QueryNode { record, queried: false, responded: false })
2129 })
2130 .collect();
2131
2132 let inner = Rc::new(LookupContextInner {
2133 target,
2134 closest_nodes: RefCell::new(closest_nodes),
2135 listener,
2136 });
2137 Self { inner }
2138 }
2139
2140 fn target(&self) -> PeerId {
2142 self.inner.target.preimage().0
2143 }
2144
2145 fn closest(&self, num: usize) -> Vec<NodeRecord> {
2146 self.inner
2147 .closest_nodes
2148 .borrow()
2149 .iter()
2150 .filter(|(_, node)| !node.queried)
2151 .map(|(_, n)| n.record)
2152 .take(num)
2153 .collect()
2154 }
2155
2156 fn filter_closest<P>(&self, num: usize, filter: P) -> Vec<NodeRecord>
2158 where
2159 P: FnMut(&NodeRecord) -> bool,
2160 {
2161 self.inner
2162 .closest_nodes
2163 .borrow()
2164 .iter()
2165 .filter(|(_, node)| !node.queried)
2166 .map(|(_, n)| n.record)
2167 .filter(filter)
2168 .take(num)
2169 .collect()
2170 }
2171
2172 fn add_node(&self, record: NodeRecord) {
2174 let distance = self.inner.target.distance(&kad_key(record.id));
2175 let mut closest = self.inner.closest_nodes.borrow_mut();
2176 if let btree_map::Entry::Vacant(entry) = closest.entry(distance) {
2177 entry.insert(QueryNode { record, queried: false, responded: false });
2178 }
2179 }
2180
2181 fn set_queried(&self, id: PeerId, val: bool) {
2182 if let Some((_, node)) =
2183 self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2184 {
2185 node.queried = val;
2186 }
2187 }
2188
2189 fn mark_queried(&self, id: PeerId) {
2191 self.set_queried(id, true)
2192 }
2193
2194 fn unmark_queried(&self, id: PeerId) {
2196 self.set_queried(id, false)
2197 }
2198
2199 fn mark_responded(&self, id: PeerId) {
2201 if let Some((_, node)) =
2202 self.inner.closest_nodes.borrow_mut().iter_mut().find(|(_, node)| node.record.id == id)
2203 {
2204 node.responded = true;
2205 }
2206 }
2207}
2208
2209unsafe impl Send for LookupContext {}
2216#[derive(Debug)]
2217struct LookupContextInner {
2218 target: discv5::Key<NodeKey>,
2220 closest_nodes: RefCell<BTreeMap<Distance, QueryNode>>,
2222 listener: Option<NodeRecordSender>,
2227}
2228
2229impl Drop for LookupContextInner {
2230 fn drop(&mut self) {
2231 if let Some(tx) = self.listener.take() {
2232 let nodes = self
2235 .closest_nodes
2236 .take()
2237 .into_values()
2238 .filter(|node| node.responded)
2239 .map(|node| node.record)
2240 .collect();
2241 let _ = tx.send(nodes);
2242 }
2243 }
2244}
2245
2246#[derive(Debug, Clone, Copy)]
2248struct QueryNode {
2249 record: NodeRecord,
2250 queried: bool,
2251 responded: bool,
2252}
2253
2254#[derive(Debug)]
2255struct FindNodeRequest {
2256 sent_at: Instant,
2258 response_count: usize,
2260 answered: bool,
2262 lookup_context: LookupContext,
2264}
2265
2266impl FindNodeRequest {
2269 fn new(resp: LookupContext) -> Self {
2270 Self { sent_at: Instant::now(), response_count: 0, answered: false, lookup_context: resp }
2271 }
2272}
2273
2274#[derive(Debug)]
2275struct EnrRequestState {
2276 sent_at: Instant,
2278 echo_hash: B256,
2280}
2281
2282#[derive(Debug, Clone, Eq, PartialEq)]
2284struct NodeEntry {
2285 record: NodeRecord,
2287 last_seen: Instant,
2289 last_enr_seq: Option<u64>,
2291 fork_id: Option<ForkId>,
2293 find_node_failures: u8,
2295 has_endpoint_proof: bool,
2297}
2298
2299impl NodeEntry {
2302 fn new(record: NodeRecord) -> Self {
2304 Self {
2305 record,
2306 last_seen: Instant::now(),
2307 last_enr_seq: None,
2308 fork_id: None,
2309 find_node_failures: 0,
2310 has_endpoint_proof: false,
2311 }
2312 }
2313
2314 #[cfg(test)]
2315 fn new_proven(record: NodeRecord) -> Self {
2316 let mut node = Self::new(record);
2317 node.has_endpoint_proof = true;
2318 node
2319 }
2320
2321 const fn establish_proof(&mut self) {
2323 self.has_endpoint_proof = true;
2324 self.find_node_failures = 0;
2325 }
2326
2327 const fn exceeds_find_node_failures(&self, max_failures: u8) -> bool {
2329 self.find_node_failures >= max_failures
2330 }
2331
2332 fn update_with_enr(&mut self, last_enr_seq: Option<u64>) -> Option<u64> {
2334 self.update_now(|s| std::mem::replace(&mut s.last_enr_seq, last_enr_seq))
2335 }
2336
2337 const fn inc_failed_request(&mut self) {
2339 self.find_node_failures += 1;
2340 }
2341
2342 fn update_with_fork_id(&mut self, fork_id: Option<ForkId>) -> Option<ForkId> {
2344 self.update_now(|s| std::mem::replace(&mut s.fork_id, fork_id))
2345 }
2346
2347 fn update_now<F, R>(&mut self, f: F) -> R
2349 where
2350 F: FnOnce(&mut Self) -> R,
2351 {
2352 self.last_seen = Instant::now();
2353 f(self)
2354 }
2355}
2356
2357impl NodeEntry {
2360 fn is_expired(&self) -> bool {
2362 self.last_seen.elapsed() > (ENDPOINT_PROOF_EXPIRATION / 2)
2363 }
2364}
2365
2366#[derive(Debug)]
2368enum PingReason {
2369 InitialInsert,
2371 EstablishBond,
2373 RePing,
2375 Lookup(NodeRecord, LookupContext),
2377}
2378
2379#[derive(Debug, Clone)]
2381pub enum DiscoveryUpdate {
2382 Added(NodeRecord),
2384 DiscoveredAtCapacity(NodeRecord),
2386 EnrForkId(NodeRecord, ForkId),
2388 Removed(PeerId),
2390 Batch(Vec<DiscoveryUpdate>),
2392}
2393
2394#[cfg(test)]
2395mod tests {
2396 use super::*;
2397 use crate::test_utils::{create_discv4, create_discv4_with_config, rng_endpoint, rng_record};
2398 use alloy_primitives::hex;
2399 use alloy_rlp::{Decodable, Encodable};
2400 use rand_08::Rng;
2401 use reth_ethereum_forks::{EnrForkIdEntry, ForkHash};
2402 use reth_network_peers::mainnet_nodes;
2403 use std::future::poll_fn;
2404
2405 #[tokio::test]
2406 async fn test_configured_enr_forkid_entry() {
2407 let fork: ForkId = ForkId { hash: ForkHash([220, 233, 108, 45]), next: 0u64 };
2408 let mut disc_conf = Discv4Config::default();
2409 disc_conf.add_eip868_pair("eth", EnrForkIdEntry::from(fork));
2410 let (_discv4, service) = create_discv4_with_config(disc_conf).await;
2411 let eth = service.local_eip_868_enr.get_raw_rlp(b"eth").unwrap();
2412 let fork_entry_id = EnrForkIdEntry::decode(&mut ð[..]).unwrap();
2413
2414 let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2415 let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2416 let expected = EnrForkIdEntry {
2417 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2418 };
2419 assert_eq!(expected, fork_entry_id);
2420 assert_eq!(expected, decoded);
2421 }
2422
2423 #[test]
2424 fn test_enr_forkid_entry_decode() {
2425 let raw: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2426 let decoded = EnrForkIdEntry::decode(&mut &raw[..]).unwrap();
2427 let expected = EnrForkIdEntry {
2428 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2429 };
2430 assert_eq!(expected, decoded);
2431 }
2432
2433 #[test]
2434 fn test_enr_forkid_entry_encode() {
2435 let original = EnrForkIdEntry {
2436 fork_id: ForkId { hash: ForkHash([0xdc, 0xe9, 0x6c, 0x2d]), next: 0 },
2437 };
2438 let expected: [u8; 8] = [0xc7, 0xc6, 0x84, 0xdc, 0xe9, 0x6c, 0x2d, 0x80];
2439 let mut encoded = Vec::with_capacity(expected.len());
2440 original.encode(&mut encoded);
2441 assert_eq!(&expected[..], encoded.as_slice());
2442 }
2443
2444 #[test]
2445 fn test_local_rotator() {
2446 let id = PeerId::random();
2447 let mut rotator = LookupTargetRotator::local_only();
2448 assert_eq!(rotator.next(&id), id);
2449 assert_eq!(rotator.next(&id), id);
2450 }
2451
2452 #[test]
2453 fn test_rotator() {
2454 let id = PeerId::random();
2455 let mut rotator = LookupTargetRotator::default();
2456 assert_eq!(rotator.next(&id), id);
2457 assert_ne!(rotator.next(&id), id);
2458 assert_ne!(rotator.next(&id), id);
2459 assert_ne!(rotator.next(&id), id);
2460 assert_eq!(rotator.next(&id), id);
2461 }
2462
2463 #[tokio::test]
2464 async fn test_pending_ping() {
2465 let (_, mut service) = create_discv4().await;
2466
2467 let local_addr = service.local_addr();
2468
2469 let mut num_inserted = 0;
2470 loop {
2471 let node = NodeRecord::new(local_addr, PeerId::random());
2472 if service.add_node(node) {
2473 num_inserted += 1;
2474 assert!(service.pending_pings.contains_key(&node.id));
2475 assert_eq!(service.pending_pings.len(), num_inserted);
2476 if num_inserted == MAX_NODES_PING {
2477 break
2478 }
2479 }
2480 }
2481
2482 num_inserted = 0;
2484 for _ in 0..MAX_NODES_PING {
2485 let node = NodeRecord::new(local_addr, PeerId::random());
2486 if service.add_node(node) {
2487 num_inserted += 1;
2488 assert!(!service.pending_pings.contains_key(&node.id));
2489 assert_eq!(service.pending_pings.len(), MAX_NODES_PING);
2490 assert_eq!(service.queued_pings.len(), num_inserted);
2491 }
2492 }
2493 }
2494
2495 #[tokio::test(flavor = "multi_thread")]
2497 #[ignore]
2498 async fn test_mainnet_lookup() {
2499 reth_tracing::init_test_tracing();
2500 let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2501
2502 let all_nodes = mainnet_nodes();
2503 let config = Discv4Config::builder()
2504 .add_boot_nodes(all_nodes)
2505 .lookup_interval(Duration::from_secs(1))
2506 .add_eip868_pair("eth", fork_id)
2507 .build();
2508 let (_discv4, mut service) = create_discv4_with_config(config).await;
2509
2510 let mut updates = service.update_stream();
2511
2512 let _handle = service.spawn();
2513
2514 let mut table = HashMap::new();
2515 while let Some(update) = updates.next().await {
2516 match update {
2517 DiscoveryUpdate::EnrForkId(record, fork_id) => {
2518 println!("{record:?}, {fork_id:?}");
2519 }
2520 DiscoveryUpdate::Added(record) => {
2521 table.insert(record.id, record);
2522 }
2523 DiscoveryUpdate::Removed(id) => {
2524 table.remove(&id);
2525 }
2526 _ => {}
2527 }
2528 println!("total peers {}", table.len());
2529 }
2530 }
2531
2532 #[tokio::test]
2533 async fn test_mapped_ipv4() {
2534 reth_tracing::init_test_tracing();
2535 let mut rng = rand_08::thread_rng();
2536 let config = Discv4Config::builder().build();
2537 let (_discv4, mut service) = create_discv4_with_config(config).await;
2538
2539 let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2540 let v6 = v4.to_ipv6_mapped();
2541 let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2542
2543 let ping = Ping {
2544 from: rng_endpoint(&mut rng),
2545 to: rng_endpoint(&mut rng),
2546 expire: service.ping_expiration(),
2547 enr_sq: Some(rng.r#gen()),
2548 };
2549
2550 let id = PeerId::random();
2551 service.on_ping(ping, addr, id, B256::random());
2552
2553 let key = kad_key(id);
2554 match service.kbuckets.entry(&key) {
2555 kbucket::Entry::Present(entry, _) => {
2556 let node_addr = entry.value().record.address;
2557 assert!(node_addr.is_ipv4());
2558 assert_eq!(node_addr, IpAddr::from(v4));
2559 }
2560 _ => unreachable!(),
2561 };
2562 }
2563
2564 #[tokio::test]
2565 async fn test_respect_ping_expiration() {
2566 reth_tracing::init_test_tracing();
2567 let mut rng = rand_08::thread_rng();
2568 let config = Discv4Config::builder().build();
2569 let (_discv4, mut service) = create_discv4_with_config(config).await;
2570
2571 let v4: Ipv4Addr = "0.0.0.0".parse().unwrap();
2572 let v6 = v4.to_ipv6_mapped();
2573 let addr: SocketAddr = (v6, DEFAULT_DISCOVERY_PORT).into();
2574
2575 let ping = Ping {
2576 from: rng_endpoint(&mut rng),
2577 to: rng_endpoint(&mut rng),
2578 expire: SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs() - 1,
2579 enr_sq: Some(rng.r#gen()),
2580 };
2581
2582 let id = PeerId::random();
2583 service.on_ping(ping, addr, id, B256::random());
2584
2585 let key = kad_key(id);
2586 match service.kbuckets.entry(&key) {
2587 kbucket::Entry::Absent(_) => {}
2588 _ => unreachable!(),
2589 };
2590 }
2591
2592 #[tokio::test]
2593 async fn test_single_lookups() {
2594 reth_tracing::init_test_tracing();
2595
2596 let config = Discv4Config::builder().build();
2597 let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2598
2599 let id = PeerId::random();
2600 let key = kad_key(id);
2601 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2602
2603 let _ = service.kbuckets.insert_or_update(
2604 &key,
2605 NodeEntry::new_proven(record),
2606 NodeStatus {
2607 direction: ConnectionDirection::Incoming,
2608 state: ConnectionState::Connected,
2609 },
2610 );
2611
2612 service.lookup_self();
2613 assert_eq!(service.pending_find_nodes.len(), 1);
2614
2615 poll_fn(|cx| {
2616 let _ = service.poll(cx);
2617 assert_eq!(service.pending_find_nodes.len(), 1);
2618
2619 Poll::Ready(())
2620 })
2621 .await;
2622 }
2623
2624 #[tokio::test]
2625 async fn test_on_neighbours_recursive_lookup() {
2626 reth_tracing::init_test_tracing();
2627
2628 let config = Discv4Config::builder().build();
2629 let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
2630 let (_discv4, mut service2) = create_discv4_with_config(config).await;
2631
2632 let id = PeerId::random();
2633 let key = kad_key(id);
2634 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2635
2636 let _ = service.kbuckets.insert_or_update(
2637 &key,
2638 NodeEntry::new_proven(record),
2639 NodeStatus {
2640 direction: ConnectionDirection::Incoming,
2641 state: ConnectionState::Connected,
2642 },
2643 );
2644 service.lookup_self();
2647 assert_eq!(service.pending_find_nodes.len(), 1);
2648
2649 poll_fn(|cx| {
2650 let _ = service.poll(cx);
2651 assert_eq!(service.pending_find_nodes.len(), 1);
2652
2653 Poll::Ready(())
2654 })
2655 .await;
2656
2657 let expiry = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() +
2658 10000000000000;
2659 let msg = Neighbours { nodes: vec![service2.local_node_record], expire: expiry };
2660 service.on_neighbours(msg, record.tcp_addr(), id);
2661 let event = poll_fn(|cx| service2.poll(cx)).await;
2663 assert_eq!(event, Discv4Event::Ping);
2664 assert_eq!(service.pending_find_nodes.len(), 1);
2667 let event = poll_fn(|cx| service.poll(cx)).await;
2669 assert_eq!(event, Discv4Event::Pong);
2670 let event = poll_fn(|cx| service.poll(cx)).await;
2675 assert_eq!(event, Discv4Event::Ping);
2676 assert_eq!(service.pending_find_nodes.len(), 2);
2679 }
2680
2681 #[tokio::test]
2682 async fn test_no_local_in_closest() {
2683 reth_tracing::init_test_tracing();
2684
2685 let config = Discv4Config::builder().build();
2686 let (_discv4, mut service) = create_discv4_with_config(config).await;
2687
2688 let target_key = kad_key(PeerId::random());
2689
2690 let id = PeerId::random();
2691 let key = kad_key(id);
2692 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2693
2694 let _ = service.kbuckets.insert_or_update(
2695 &key,
2696 NodeEntry::new(record),
2697 NodeStatus {
2698 direction: ConnectionDirection::Incoming,
2699 state: ConnectionState::Connected,
2700 },
2701 );
2702
2703 let closest = service
2704 .kbuckets
2705 .closest_values(&target_key)
2706 .map(|n| n.value.record)
2707 .take(MAX_NODES_PER_BUCKET)
2708 .collect::<Vec<_>>();
2709
2710 assert_eq!(closest.len(), 1);
2711 assert!(!closest.iter().any(|r| r.id == *service.local_peer_id()));
2712 }
2713
2714 #[tokio::test]
2715 async fn test_random_lookup() {
2716 reth_tracing::init_test_tracing();
2717
2718 let config = Discv4Config::builder().build();
2719 let (_discv4, mut service) = create_discv4_with_config(config).await;
2720
2721 let target = PeerId::random();
2722
2723 let id = PeerId::random();
2724 let key = kad_key(id);
2725 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2726
2727 let _ = service.kbuckets.insert_or_update(
2728 &key,
2729 NodeEntry::new_proven(record),
2730 NodeStatus {
2731 direction: ConnectionDirection::Incoming,
2732 state: ConnectionState::Connected,
2733 },
2734 );
2735
2736 service.lookup(target);
2737 assert_eq!(service.pending_find_nodes.len(), 1);
2738
2739 let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2740
2741 assert_eq!(ctx.target(), target);
2742 assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2743
2744 ctx.add_node(record);
2745 assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
2746 }
2747
2748 #[tokio::test]
2749 async fn test_reping_on_find_node_failures() {
2750 reth_tracing::init_test_tracing();
2751
2752 let config = Discv4Config::builder().build();
2753 let (_discv4, mut service) = create_discv4_with_config(config).await;
2754
2755 let target = PeerId::random();
2756
2757 let id = PeerId::random();
2758 let key = kad_key(id);
2759 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2760
2761 let mut entry = NodeEntry::new_proven(record);
2762 entry.find_node_failures = u8::MAX;
2763 let _ = service.kbuckets.insert_or_update(
2764 &key,
2765 entry,
2766 NodeStatus {
2767 direction: ConnectionDirection::Incoming,
2768 state: ConnectionState::Connected,
2769 },
2770 );
2771
2772 service.lookup(target);
2773 assert_eq!(service.pending_find_nodes.len(), 0);
2774 assert_eq!(service.pending_pings.len(), 1);
2775
2776 service.update_on_pong(record, None);
2777
2778 service
2779 .on_entry(record.id, |entry| {
2780 assert_eq!(entry.find_node_failures, 0);
2782 assert!(entry.has_endpoint_proof);
2783 })
2784 .unwrap();
2785 }
2786
2787 #[tokio::test]
2788 async fn test_service_commands() {
2789 reth_tracing::init_test_tracing();
2790
2791 let config = Discv4Config::builder().build();
2792 let (discv4, mut service) = create_discv4_with_config(config).await;
2793
2794 service.lookup_self();
2795
2796 let _handle = service.spawn();
2797 discv4.send_lookup_self();
2798 let _ = discv4.lookup_self().await;
2799 }
2800
2801 #[tokio::test]
2802 async fn test_requests_timeout() {
2803 reth_tracing::init_test_tracing();
2804 let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
2805
2806 let config = Discv4Config::builder()
2807 .request_timeout(Duration::from_millis(200))
2808 .ping_expiration(Duration::from_millis(200))
2809 .lookup_neighbours_expiration(Duration::from_millis(200))
2810 .add_eip868_pair("eth", fork_id)
2811 .build();
2812 let (_disv4, mut service) = create_discv4_with_config(config).await;
2813
2814 let id = PeerId::random();
2815 let key = kad_key(id);
2816 let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
2817
2818 let _ = service.kbuckets.insert_or_update(
2819 &key,
2820 NodeEntry::new_proven(record),
2821 NodeStatus {
2822 direction: ConnectionDirection::Incoming,
2823 state: ConnectionState::Connected,
2824 },
2825 );
2826
2827 service.lookup_self();
2828 assert_eq!(service.pending_find_nodes.len(), 1);
2829
2830 let ctx = service.pending_find_nodes.values().next().unwrap().lookup_context.clone();
2831
2832 service.pending_lookup.insert(record.id, (Instant::now(), ctx));
2833
2834 assert_eq!(service.pending_lookup.len(), 1);
2835
2836 let ping = Ping {
2837 from: service.local_node_record.into(),
2838 to: record.into(),
2839 expire: service.ping_expiration(),
2840 enr_sq: service.enr_seq(),
2841 };
2842 let echo_hash = service.send_packet(Message::Ping(ping), record.udp_addr());
2843 let ping_request = PingRequest {
2844 sent_at: Instant::now(),
2845 node: record,
2846 echo_hash,
2847 reason: PingReason::InitialInsert,
2848 };
2849 service.pending_pings.insert(record.id, ping_request);
2850
2851 assert_eq!(service.pending_pings.len(), 1);
2852
2853 tokio::time::sleep(Duration::from_secs(1)).await;
2854
2855 poll_fn(|cx| {
2856 let _ = service.poll(cx);
2857
2858 assert_eq!(service.pending_find_nodes.len(), 0);
2859 assert_eq!(service.pending_lookup.len(), 0);
2860 assert_eq!(service.pending_pings.len(), 0);
2861
2862 Poll::Ready(())
2863 })
2864 .await;
2865 }
2866
2867 #[tokio::test(flavor = "multi_thread")]
2869 async fn test_check_wrong_to() {
2870 reth_tracing::init_test_tracing();
2871
2872 let config = Discv4Config::builder().external_ip_resolver(None).build();
2873 let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2874 let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2875
2876 let mut ping = Ping {
2878 from: service_1.local_node_record.into(),
2879 to: service_2.local_node_record.into(),
2880 expire: service_1.ping_expiration(),
2881 enr_sq: service_1.enr_seq(),
2882 };
2883 ping.to.address = "192.0.2.0".parse().unwrap();
2884
2885 let echo_hash = service_1.send_packet(Message::Ping(ping), service_2.local_addr());
2886 let ping_request = PingRequest {
2887 sent_at: Instant::now(),
2888 node: service_2.local_node_record,
2889 echo_hash,
2890 reason: PingReason::InitialInsert,
2891 };
2892 service_1.pending_pings.insert(*service_2.local_peer_id(), ping_request);
2893
2894 let event = poll_fn(|cx| service_2.poll(cx)).await;
2896 assert_eq!(event, Discv4Event::Ping);
2897
2898 let event = poll_fn(|cx| service_1.poll(cx)).await;
2900 assert_eq!(event, Discv4Event::Pong);
2901 let event = poll_fn(|cx| service_1.poll(cx)).await;
2903 assert_eq!(event, Discv4Event::Ping);
2904 }
2905
2906 #[tokio::test(flavor = "multi_thread")]
2907 async fn test_check_ping_pong() {
2908 reth_tracing::init_test_tracing();
2909
2910 let config = Discv4Config::builder().external_ip_resolver(None).build();
2911 let (_discv4, mut service_1) = create_discv4_with_config(config.clone()).await;
2912 let (_discv4, mut service_2) = create_discv4_with_config(config).await;
2913
2914 service_1.add_node(service_2.local_node_record);
2916
2917 let event = poll_fn(|cx| service_2.poll(cx)).await;
2919 assert_eq!(event, Discv4Event::Ping);
2920
2921 let key1 = kad_key(*service_1.local_peer_id());
2923 match service_2.kbuckets.entry(&key1) {
2924 kbucket::Entry::Present(_entry, status) => {
2925 assert!(!status.is_connected());
2926 }
2927 _ => unreachable!(),
2928 }
2929
2930 let event = poll_fn(|cx| service_1.poll(cx)).await;
2932 assert_eq!(event, Discv4Event::Pong);
2933
2934 let key2 = kad_key(*service_2.local_peer_id());
2936 match service_1.kbuckets.entry(&key2) {
2937 kbucket::Entry::Present(_entry, status) => {
2938 assert!(status.is_connected());
2939 }
2940 _ => unreachable!(),
2941 }
2942
2943 let event = poll_fn(|cx| service_1.poll(cx)).await;
2945 assert_eq!(event, Discv4Event::Ping);
2946
2947 let event = poll_fn(|cx| service_2.poll(cx)).await;
2949
2950 match event {
2951 Discv4Event::EnrRequest => {
2952 let event = poll_fn(|cx| service_2.poll(cx)).await;
2954 match event {
2955 Discv4Event::EnrRequest => {
2956 let event = poll_fn(|cx| service_2.poll(cx)).await;
2957 assert_eq!(event, Discv4Event::Pong);
2958 }
2959 Discv4Event::Pong => {}
2960 _ => {
2961 unreachable!()
2962 }
2963 }
2964 }
2965 Discv4Event::Pong => {}
2966 ev => unreachable!("{ev:?}"),
2967 }
2968
2969 match service_2.kbuckets.entry(&key1) {
2971 kbucket::Entry::Present(_entry, status) => {
2972 assert!(status.is_connected());
2973 }
2974 ev => unreachable!("{ev:?}"),
2975 }
2976 }
2977
2978 #[test]
2979 fn test_insert() {
2980 let local_node_record = rng_record(&mut rand_08::thread_rng());
2981 let mut kbuckets: KBucketsTable<NodeKey, NodeEntry> = KBucketsTable::new(
2982 NodeKey::from(&local_node_record).into(),
2983 Duration::from_secs(60),
2984 MAX_NODES_PER_BUCKET,
2985 None,
2986 None,
2987 );
2988
2989 let new_record = rng_record(&mut rand_08::thread_rng());
2990 let key = kad_key(new_record.id);
2991 match kbuckets.entry(&key) {
2992 kbucket::Entry::Absent(entry) => {
2993 let node = NodeEntry::new(new_record);
2994 let _ = entry.insert(
2995 node,
2996 NodeStatus {
2997 direction: ConnectionDirection::Outgoing,
2998 state: ConnectionState::Disconnected,
2999 },
3000 );
3001 }
3002 _ => {
3003 unreachable!()
3004 }
3005 };
3006 match kbuckets.entry(&key) {
3007 kbucket::Entry::Present(_, _) => {}
3008 _ => {
3009 unreachable!()
3010 }
3011 }
3012 }
3013
3014 #[tokio::test]
3015 async fn test_bootnode_not_in_update_stream() {
3016 reth_tracing::init_test_tracing();
3017 let (_, service_1) = create_discv4().await;
3018 let peerid_1 = *service_1.local_peer_id();
3019
3020 let config = Discv4Config::builder().add_boot_node(service_1.local_node_record).build();
3021 service_1.spawn();
3022
3023 let (_, mut service_2) = create_discv4_with_config(config).await;
3024
3025 let mut updates = service_2.update_stream();
3026
3027 service_2.spawn();
3028
3029 let mut bootnode_appeared = false;
3031 let timeout = tokio::time::sleep(Duration::from_secs(1));
3032 tokio::pin!(timeout);
3033
3034 loop {
3035 tokio::select! {
3036 Some(update) = updates.next() => {
3037 if let DiscoveryUpdate::Added(record) = update {
3038 if record.id == peerid_1 {
3039 bootnode_appeared = true;
3040 break;
3041 }
3042 }
3043 }
3044 _ = &mut timeout => break,
3045 }
3046 }
3047
3048 assert!(bootnode_appeared, "Bootnode should appear in update stream");
3050 }
3051}