1#![doc(
9 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
10 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
11 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
12)]
13#![cfg_attr(not(test), warn(unused_crate_dependencies))]
14#![cfg_attr(docsrs, feature(doc_cfg))]
15
16pub use crate::resolver::{DnsResolver, MapResolver, Resolver};
17use crate::{
18 query::{QueryOutcome, QueryPool, ResolveEntryResult, ResolveRootResult},
19 sync::{ResolveKind, SyncAction},
20 tree::{DnsEntry, LinkEntry},
21};
22pub use config::DnsDiscoveryConfig;
23use enr::Enr;
24pub use error::ParseDnsEntryError;
25use reth_ethereum_forks::{EnrForkIdEntry, ForkId};
26use reth_network_peers::{pk2id, NodeRecord};
27use schnellru::{ByLength, LruMap};
28use secp256k1::SecretKey;
29use std::{
30 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
31 net::IpAddr,
32 pin::Pin,
33 sync::Arc,
34 task::{ready, Context, Poll},
35 time::{Duration, Instant},
36};
37use sync::SyncTree;
38use tokio::{
39 sync::{
40 mpsc,
41 mpsc::{error::TrySendError, UnboundedSender},
42 oneshot,
43 },
44 task::JoinHandle,
45};
46use tokio_stream::{
47 wrappers::{ReceiverStream, UnboundedReceiverStream},
48 Stream, StreamExt,
49};
50use tracing::{debug, trace};
51
52mod config;
53mod error;
54mod query;
55pub mod resolver;
56mod sync;
57pub mod tree;
58
59#[derive(Clone, Debug)]
61pub struct DnsDiscoveryHandle {
62 to_service: UnboundedSender<DnsDiscoveryCommand>,
64}
65
66impl DnsDiscoveryHandle {
69 pub fn sync_tree(&self, link: &str) -> Result<(), ParseDnsEntryError> {
71 self.sync_tree_with_link(link.parse()?);
72 Ok(())
73 }
74
75 pub fn sync_tree_with_link(&self, link: LinkEntry) {
77 let _ = self.to_service.send(DnsDiscoveryCommand::SyncTree(link));
78 }
79
80 pub async fn node_record_stream(
82 &self,
83 ) -> Result<ReceiverStream<DnsNodeRecordUpdate>, oneshot::error::RecvError> {
84 let (tx, rx) = oneshot::channel();
85 let cmd = DnsDiscoveryCommand::NodeRecordUpdates(tx);
86 let _ = self.to_service.send(cmd);
87 rx.await
88 }
89}
90
91#[must_use = "Service does nothing unless polled"]
93#[expect(missing_debug_implementations)]
94pub struct DnsDiscoveryService<R: Resolver = DnsResolver> {
95 command_tx: UnboundedSender<DnsDiscoveryCommand>,
97 command_rx: UnboundedReceiverStream<DnsDiscoveryCommand>,
99 node_record_listeners: Vec<mpsc::Sender<DnsNodeRecordUpdate>>,
101 trees: HashMap<LinkEntry, SyncTree>,
103 queries: QueryPool<R, SecretKey>,
105 dns_record_cache: LruMap<String, DnsEntry<SecretKey>>,
107 queued_events: VecDeque<DnsDiscoveryEvent>,
109 recheck_interval: Duration,
111 bootstrap_dns_networks: HashSet<LinkEntry>,
113}
114
115impl<R: Resolver> DnsDiscoveryService<R> {
118 pub fn new(resolver: Arc<R>, config: DnsDiscoveryConfig) -> Self {
131 let DnsDiscoveryConfig {
132 lookup_timeout,
133 max_requests_per_sec,
134 recheck_interval,
135 dns_record_cache_limit,
136 bootstrap_dns_networks,
137 } = config;
138 let queries = QueryPool::new(resolver, max_requests_per_sec, lookup_timeout);
139 let (command_tx, command_rx) = mpsc::unbounded_channel();
140 Self {
141 command_tx,
142 command_rx: UnboundedReceiverStream::new(command_rx),
143 node_record_listeners: Default::default(),
144 trees: Default::default(),
145 queries,
146 dns_record_cache: LruMap::new(ByLength::new(dns_record_cache_limit.get())),
147 queued_events: Default::default(),
148 recheck_interval,
149 bootstrap_dns_networks: bootstrap_dns_networks.unwrap_or_default(),
150 }
151 }
152
153 pub fn spawn(mut self) -> JoinHandle<()> {
157 tokio::task::spawn(async move {
158 self.bootstrap();
159
160 while let Some(event) = self.next().await {
161 trace!(target: "disc::dns", ?event, "processed");
162 }
163 })
164 }
165
166 pub fn bootstrap(&mut self) {
168 for link in self.bootstrap_dns_networks.clone() {
169 self.sync_tree_with_link(link);
170 }
171 }
172
173 pub fn new_pair(resolver: Arc<R>, config: DnsDiscoveryConfig) -> (Self, DnsDiscoveryHandle) {
176 let service = Self::new(resolver, config);
177 let handle = service.handle();
178 (service, handle)
179 }
180
181 pub fn handle(&self) -> DnsDiscoveryHandle {
183 DnsDiscoveryHandle { to_service: self.command_tx.clone() }
184 }
185
186 pub fn node_record_stream(&mut self) -> ReceiverStream<DnsNodeRecordUpdate> {
188 let (tx, rx) = mpsc::channel(256);
189 self.node_record_listeners.push(tx);
190 ReceiverStream::new(rx)
191 }
192
193 fn notify(&mut self, record: DnsNodeRecordUpdate) {
197 self.node_record_listeners.retain_mut(|listener| match listener.try_send(record.clone()) {
198 Ok(()) => true,
199 Err(err) => match err {
200 TrySendError::Full(_) => true,
201 TrySendError::Closed(_) => false,
202 },
203 });
204 }
205
206 pub fn sync_tree(&mut self, link: &str) -> Result<(), ParseDnsEntryError> {
208 self.sync_tree_with_link(link.parse()?);
209 Ok(())
210 }
211
212 pub fn sync_tree_with_link(&mut self, link: LinkEntry) {
214 self.queries.resolve_root(link);
215 }
216
217 fn resolve_entry(&mut self, link: LinkEntry<SecretKey>, hash: String, kind: ResolveKind) {
219 if let Some(entry) = self.dns_record_cache.get(&hash).cloned() {
220 let cached = ResolveEntryResult { entry: Some(Ok(entry)), link, hash, kind };
222 self.on_resolved_entry(cached);
223 return
224 }
225 self.queries.resolve_entry(link, hash, kind)
226 }
227
228 fn on_resolved_root(&mut self, resp: ResolveRootResult<SecretKey>) {
229 match resp {
230 Ok((root, link)) => match self.trees.entry(link.clone()) {
231 Entry::Occupied(mut entry) => {
232 entry.get_mut().update_root(root);
233 }
234 Entry::Vacant(entry) => {
235 entry.insert(SyncTree::new(root, link));
236 }
237 },
238 Err((err, link)) => {
239 debug!(target: "disc::dns",%err, ?link, "Failed to lookup root")
240 }
241 }
242 }
243
244 fn on_resolved_enr(&mut self, enr: Enr<SecretKey>) {
245 if let Some(record) = convert_enr_node_record(&enr) {
246 self.notify(record);
247 }
248 self.queued_events.push_back(DnsDiscoveryEvent::Enr(enr))
249 }
250
251 fn on_resolved_entry(&mut self, resp: ResolveEntryResult<SecretKey>) {
252 let ResolveEntryResult { entry, link, hash, kind } = resp;
253
254 match entry {
255 Some(Err(err)) => {
256 debug!(target: "disc::dns",%err, domain=%link.domain, ?hash, "Failed to lookup entry")
257 }
258 None => {
259 trace!(target: "disc::dns",domain=%link.domain, ?hash, "No dns entry")
260 }
261 Some(Ok(entry)) => {
262 self.dns_record_cache.insert(hash.clone(), entry.clone());
264
265 match entry {
266 DnsEntry::Root(root) => {
267 debug!(target: "disc::dns",%root, domain=%link.domain, ?hash, "resolved unexpected root entry");
268 }
269 DnsEntry::Link(link_entry) => {
270 if kind.is_link() {
271 self.sync_tree_with_link(link_entry)
272 } else {
273 debug!(target: "disc::dns",%link_entry, domain=%link.domain, ?hash, "resolved unexpected Link entry");
274 }
275 }
276 DnsEntry::Branch(branch_entry) => {
277 if let Some(tree) = self.trees.get_mut(&link) {
278 tree.extend_children(kind, branch_entry.children)
279 }
280 }
281 DnsEntry::Node(entry) => {
282 if kind.is_link() {
283 debug!(target: "disc::dns",domain=%link.domain, ?hash, "resolved unexpected enr entry");
284 } else {
285 self.on_resolved_enr(entry.enr)
286 }
287 }
288 }
289 }
290 }
291 }
292
293 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DnsDiscoveryEvent> {
295 loop {
296 if let Some(event) = self.queued_events.pop_front() {
298 return Poll::Ready(event)
299 }
300
301 while let Poll::Ready(Some(cmd)) = Pin::new(&mut self.command_rx).poll_next(cx) {
303 match cmd {
304 DnsDiscoveryCommand::SyncTree(link) => {
305 self.sync_tree_with_link(link);
306 }
307 DnsDiscoveryCommand::NodeRecordUpdates(tx) => {
308 let _ = tx.send(self.node_record_stream());
309 }
310 }
311 }
312
313 while let Poll::Ready(outcome) = self.queries.poll(cx) {
314 match outcome {
316 QueryOutcome::Root(resp) => self.on_resolved_root(resp),
317 QueryOutcome::Entry(resp) => self.on_resolved_entry(resp),
318 }
319 }
320
321 let mut progress = false;
322 let now = Instant::now();
323 let mut pending_resolves = Vec::new();
324 let mut pending_updates = Vec::new();
325 for tree in self.trees.values_mut() {
326 while let Some(action) = tree.poll(now, self.recheck_interval) {
327 progress = true;
328 match action {
329 SyncAction::UpdateRoot => {
330 pending_updates.push(tree.link().clone());
331 }
332 SyncAction::Enr(hash) => {
333 pending_resolves.push((tree.link().clone(), hash, ResolveKind::Enr));
334 }
335 SyncAction::Link(hash) => {
336 pending_resolves.push((tree.link().clone(), hash, ResolveKind::Link));
337 }
338 }
339 }
340 }
341
342 for (domain, hash, kind) in pending_resolves {
343 self.resolve_entry(domain, hash, kind)
344 }
345
346 for link in pending_updates {
347 self.sync_tree_with_link(link)
348 }
349
350 if !progress && self.queued_events.is_empty() {
351 return Poll::Pending
352 }
353 }
354 }
355}
356
357impl<R: Resolver> Stream for DnsDiscoveryService<R> {
359 type Item = DnsDiscoveryEvent;
360
361 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
362 Poll::Ready(Some(ready!(self.get_mut().poll(cx))))
363 }
364}
365
366#[derive(Debug, Clone, Eq, PartialEq)]
368pub struct DnsNodeRecordUpdate {
369 pub node_record: NodeRecord,
371 pub fork_id: Option<ForkId>,
373 pub enr: Enr<SecretKey>,
375}
376
377enum DnsDiscoveryCommand {
379 SyncTree(LinkEntry),
381 NodeRecordUpdates(oneshot::Sender<ReceiverStream<DnsNodeRecordUpdate>>),
382}
383
384#[derive(Debug, Clone)]
386pub enum DnsDiscoveryEvent {
387 Enr(Enr<SecretKey>),
389}
390
391fn convert_enr_node_record(enr: &Enr<SecretKey>) -> Option<DnsNodeRecordUpdate> {
393 let node_record = NodeRecord {
394 address: enr.ip4().map(IpAddr::from).or_else(|| enr.ip6().map(IpAddr::from))?,
395 tcp_port: enr.tcp4().or_else(|| enr.tcp6())?,
396 udp_port: enr.udp4().or_else(|| enr.udp6())?,
397 id: pk2id(&enr.public_key()),
398 }
399 .into_ipv4_mapped();
400
401 let fork_id =
402 enr.get_decodable::<EnrForkIdEntry>(b"eth").transpose().ok().flatten().map(Into::into);
403
404 Some(DnsNodeRecordUpdate { node_record, fork_id, enr: enr.clone() })
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410 use crate::tree::TreeRootEntry;
411 use alloy_chains::Chain;
412 use alloy_primitives::keccak256;
413 use alloy_rlp::{Decodable, Encodable};
414 use data_encoding::BASE32_NOPAD;
415 use enr::EnrKey;
416 use reth_chainspec::MAINNET;
417 use reth_ethereum_forks::{EthereumHardfork, ForkHash};
418 use secp256k1::rand::thread_rng;
419 use std::{future::poll_fn, net::Ipv4Addr};
420
421 fn entry_hash(entry_txt: &str) -> String {
422 BASE32_NOPAD.encode(&keccak256(entry_txt.as_bytes()).as_slice()[..16])
423 }
424
425 #[test]
426 fn test_convert_enr_node_record() {
427 let secret_key = SecretKey::new(&mut secp256k1::rand::thread_rng());
429 let enr = Enr::builder()
430 .ip("127.0.0.1".parse().unwrap())
431 .udp4(9000)
432 .tcp4(30303)
433 .add_value(b"eth", &EnrForkIdEntry::from(MAINNET.latest_fork_id()))
434 .build(&secret_key)
435 .unwrap();
436
437 let node_record_update = convert_enr_node_record(&enr).unwrap();
439
440 assert_eq!(node_record_update.node_record.address, "127.0.0.1".parse::<IpAddr>().unwrap());
441 assert_eq!(node_record_update.node_record.tcp_port, 30303);
442 assert_eq!(node_record_update.node_record.udp_port, 9000);
443 assert_eq!(node_record_update.fork_id, Some(MAINNET.latest_fork_id()));
444 assert_eq!(node_record_update.enr, enr);
445 }
446
447 #[test]
448 fn test_decode_and_convert_enr_node_record() {
449 let secret_key = SecretKey::new(&mut secp256k1::rand::thread_rng());
452 let enr = Enr::builder()
453 .ip("127.0.0.1".parse().unwrap())
454 .udp4(9000)
455 .tcp4(30303)
456 .add_value(b"eth", &EnrForkIdEntry::from(MAINNET.latest_fork_id()))
457 .add_value(b"opstack", &ForkId { hash: ForkHash(rand::random()), next: rand::random() })
458 .build(&secret_key)
459 .unwrap();
460
461 let mut encoded_enr = vec![];
462 enr.encode(&mut encoded_enr);
463
464 let decoded_enr = Enr::decode(&mut &encoded_enr[..]).unwrap();
466
467 let node_record_update = convert_enr_node_record(&decoded_enr).unwrap();
468
469 assert_eq!(node_record_update.node_record.address, "127.0.0.1".parse::<IpAddr>().unwrap());
470 assert_eq!(node_record_update.node_record.tcp_port, 30303);
471 assert_eq!(node_record_update.node_record.udp_port, 9000);
472 assert_eq!(node_record_update.fork_id, Some(MAINNET.latest_fork_id()));
473 assert_eq!(node_record_update.enr, enr);
474 }
475
476 #[tokio::test]
477 async fn test_start_root_sync() {
478 reth_tracing::init_test_tracing();
479
480 let secret_key = SecretKey::new(&mut thread_rng());
481 let resolver = MapResolver::default();
482 let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE";
483 let mut root: TreeRootEntry = s.parse().unwrap();
484 root.sign(&secret_key).unwrap();
485
486 let link =
487 LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() };
488 resolver.insert(link.domain.clone(), root.to_string());
489
490 let mut service = DnsDiscoveryService::new(Arc::new(resolver), Default::default());
491
492 service.sync_tree_with_link(link.clone());
493
494 poll_fn(|cx| {
495 let _ = service.poll(cx);
496 Poll::Ready(())
497 })
498 .await;
499
500 let tree = service.trees.get(&link).unwrap();
501 assert_eq!(tree.root().clone(), root);
502 }
503
504 #[tokio::test(flavor = "multi_thread")]
505 async fn test_get_node() {
506 reth_tracing::init_test_tracing();
507
508 let secret_key = SecretKey::new(&mut thread_rng());
509 let resolver = MapResolver::default();
510 let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE";
511 let mut root: TreeRootEntry = s.parse().unwrap();
512
513 let link =
514 LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() };
515
516 let mut builder = Enr::builder();
517 let fork_id = MAINNET.hardfork_fork_id(EthereumHardfork::Frontier).unwrap();
518 builder
519 .ip4(Ipv4Addr::LOCALHOST)
520 .udp4(30303)
521 .tcp4(30303)
522 .add_value(b"eth", &EnrForkIdEntry::from(fork_id));
523 let enr = builder.build(&secret_key).unwrap();
524 let enr_txt = enr.to_base64();
525
526 root.enr_root = entry_hash(&enr_txt);
527 root.sign(&secret_key).unwrap();
528
529 resolver.insert(link.domain.clone(), root.to_string());
530 resolver.insert(format!("{}.{}", root.enr_root.clone(), link.domain), enr_txt);
531
532 let mut service = DnsDiscoveryService::new(Arc::new(resolver), Default::default());
533
534 let mut node_records = service.node_record_stream();
535
536 let task = tokio::task::spawn(async move {
537 let record = node_records.next().await.unwrap();
538 assert_eq!(record.fork_id, Some(fork_id));
539 });
540
541 service.sync_tree_with_link(link.clone());
542
543 let event = poll_fn(|cx| service.poll(cx)).await;
544
545 match event {
546 DnsDiscoveryEvent::Enr(discovered) => {
547 assert_eq!(discovered, enr);
548 }
549 }
550
551 poll_fn(|cx| {
552 assert!(service.poll(cx).is_pending());
553 Poll::Ready(())
554 })
555 .await;
556
557 task.await.unwrap();
558 }
559
560 #[tokio::test]
561 async fn test_recheck_tree() {
562 reth_tracing::init_test_tracing();
563
564 let config = DnsDiscoveryConfig {
565 recheck_interval: Duration::from_millis(750),
566 ..Default::default()
567 };
568
569 let secret_key = SecretKey::new(&mut thread_rng());
570 let resolver = Arc::new(MapResolver::default());
571 let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE";
572 let mut root: TreeRootEntry = s.parse().unwrap();
573 root.sign(&secret_key).unwrap();
574
575 let link =
576 LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() };
577 resolver.insert(link.domain.clone(), root.to_string());
578
579 let mut service = DnsDiscoveryService::new(Arc::clone(&resolver), config.clone());
580
581 service.sync_tree_with_link(link.clone());
582
583 poll_fn(|cx| {
584 assert!(service.poll(cx).is_pending());
585 Poll::Ready(())
586 })
587 .await;
588
589 tokio::time::sleep(config.recheck_interval).await;
591
592 let mut new_root = root.clone();
593 new_root.sequence_number = new_root.sequence_number.saturating_add(1);
594
595 let enr = Enr::empty(&secret_key).unwrap();
596 let enr_txt = enr.to_base64();
597 new_root.enr_root = entry_hash(&enr_txt);
598 new_root.sign(&secret_key).unwrap();
599 resolver.insert(link.domain.clone(), new_root.to_string());
600 resolver.insert(format!("{}.{}", new_root.enr_root.clone(), link.domain), enr_txt);
601
602 let event = poll_fn(|cx| service.poll(cx)).await;
603
604 match event {
605 DnsDiscoveryEvent::Enr(discovered) => {
606 assert_eq!(discovered, enr);
607 }
608 }
609
610 poll_fn(|cx| {
611 assert!(service.poll(cx).is_pending());
612 Poll::Ready(())
613 })
614 .await;
615 }
616
617 #[tokio::test]
618 async fn test_hash_mismatch_is_not_cached_and_does_not_poison_same_hash() {
619 let secret_key = SecretKey::new(&mut thread_rng());
620 let resolver = MapResolver::default();
621
622 let invalid_entry = "enrtree-branch:AAAAAAAAAAAAAAAAAAAA".to_string();
623 let valid_entry = "enrtree-branch:YNEGZIWHOM7TOOSUATAPTM".to_string();
624
625 let hash = entry_hash(&valid_entry);
626
627 let bad_link =
628 LinkEntry { domain: "bad.example.org".to_string(), pubkey: secret_key.public() };
629 let good_link =
630 LinkEntry { domain: "good.example.org".to_string(), pubkey: secret_key.public() };
631
632 resolver.insert(format!("{}.{}", hash, bad_link.domain), invalid_entry);
633 resolver.insert(format!("{}.{}", hash, good_link.domain), valid_entry.clone());
634
635 let mut service = DnsDiscoveryService::new(Arc::new(resolver), Default::default());
636
637 service.resolve_entry(bad_link, hash.clone(), ResolveKind::Enr);
638 poll_fn(|cx| {
639 let _ = service.poll(cx);
640 Poll::Ready(())
641 })
642 .await;
643
644 assert!(service.dns_record_cache.get(&hash).is_none());
645
646 service.resolve_entry(good_link, hash.clone(), ResolveKind::Enr);
647 poll_fn(|cx| {
648 let _ = service.poll(cx);
649 Poll::Ready(())
650 })
651 .await;
652
653 let cached = service.dns_record_cache.get(&hash).cloned();
654 assert_eq!(cached.map(|entry| entry.to_string()), Some(valid_entry));
655 }
656
657 #[tokio::test]
658 #[ignore]
659 async fn test_dns_resolver() {
660 reth_tracing::init_test_tracing();
661
662 let mut service = DnsDiscoveryService::new(
663 Arc::new(DnsResolver::from_system_conf().unwrap()),
664 Default::default(),
665 );
666
667 service.sync_tree(&Chain::mainnet().public_dns_network_protocol().unwrap()).unwrap();
668
669 while let Some(event) = service.next().await {
670 match event {
671 DnsDiscoveryEvent::Enr(enr) => {
672 println!("discovered enr {}", enr.to_base64());
673 }
674 }
675 }
676 }
677}