1#![doc(
9 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
10 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
11 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
12)]
13#![cfg_attr(not(test), warn(unused_crate_dependencies))]
14#![cfg_attr(docsrs, feature(doc_cfg))]
15
16pub use crate::resolver::{DnsResolver, MapResolver, Resolver};
17use crate::{
18 query::{QueryOutcome, QueryPool, ResolveEntryResult, ResolveRootResult},
19 sync::{ResolveKind, SyncAction},
20 tree::{DnsEntry, LinkEntry},
21};
22pub use config::DnsDiscoveryConfig;
23use enr::Enr;
24pub use error::ParseDnsEntryError;
25use reth_ethereum_forks::{EnrForkIdEntry, ForkId};
26use reth_network_peers::{pk2id, NodeRecord};
27use schnellru::{ByLength, LruMap};
28use secp256k1::SecretKey;
29use std::{
30 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
31 net::IpAddr,
32 pin::Pin,
33 sync::Arc,
34 task::{ready, Context, Poll},
35 time::{Duration, Instant},
36};
37use sync::SyncTree;
38use tokio::{
39 sync::{
40 mpsc,
41 mpsc::{error::TrySendError, UnboundedSender},
42 oneshot,
43 },
44 task::JoinHandle,
45};
46use tokio_stream::{
47 wrappers::{ReceiverStream, UnboundedReceiverStream},
48 Stream, StreamExt,
49};
50use tracing::{debug, trace};
51
52mod config;
53mod error;
54mod query;
55pub mod resolver;
56mod sync;
57pub mod tree;
58
59#[derive(Clone, Debug)]
61pub struct DnsDiscoveryHandle {
62 to_service: UnboundedSender<DnsDiscoveryCommand>,
64}
65
66impl DnsDiscoveryHandle {
69 pub fn sync_tree(&self, link: &str) -> Result<(), ParseDnsEntryError> {
71 self.sync_tree_with_link(link.parse()?);
72 Ok(())
73 }
74
75 pub fn sync_tree_with_link(&self, link: LinkEntry) {
77 let _ = self.to_service.send(DnsDiscoveryCommand::SyncTree(link));
78 }
79
80 pub async fn node_record_stream(
82 &self,
83 ) -> Result<ReceiverStream<DnsNodeRecordUpdate>, oneshot::error::RecvError> {
84 let (tx, rx) = oneshot::channel();
85 let cmd = DnsDiscoveryCommand::NodeRecordUpdates(tx);
86 let _ = self.to_service.send(cmd);
87 rx.await
88 }
89}
90
91#[must_use = "Service does nothing unless polled"]
93#[expect(missing_debug_implementations)]
94pub struct DnsDiscoveryService<R: Resolver = DnsResolver> {
95 command_tx: UnboundedSender<DnsDiscoveryCommand>,
97 command_rx: UnboundedReceiverStream<DnsDiscoveryCommand>,
99 node_record_listeners: Vec<mpsc::Sender<DnsNodeRecordUpdate>>,
101 trees: HashMap<LinkEntry, SyncTree>,
103 queries: QueryPool<R, SecretKey>,
105 dns_record_cache: LruMap<String, DnsEntry<SecretKey>>,
107 queued_events: VecDeque<DnsDiscoveryEvent>,
109 recheck_interval: Duration,
111 bootstrap_dns_networks: HashSet<LinkEntry>,
113}
114
115impl<R: Resolver> DnsDiscoveryService<R> {
118 pub fn new(resolver: Arc<R>, config: DnsDiscoveryConfig) -> Self {
131 let DnsDiscoveryConfig {
132 lookup_timeout,
133 max_requests_per_sec,
134 recheck_interval,
135 dns_record_cache_limit,
136 bootstrap_dns_networks,
137 } = config;
138 let queries = QueryPool::new(resolver, max_requests_per_sec, lookup_timeout);
139 let (command_tx, command_rx) = mpsc::unbounded_channel();
140 Self {
141 command_tx,
142 command_rx: UnboundedReceiverStream::new(command_rx),
143 node_record_listeners: Default::default(),
144 trees: Default::default(),
145 queries,
146 dns_record_cache: LruMap::new(ByLength::new(dns_record_cache_limit.get())),
147 queued_events: Default::default(),
148 recheck_interval,
149 bootstrap_dns_networks: bootstrap_dns_networks.unwrap_or_default(),
150 }
151 }
152
153 pub fn spawn(mut self) -> JoinHandle<()> {
157 tokio::task::spawn(async move {
158 self.bootstrap();
159
160 while let Some(event) = self.next().await {
161 trace!(target: "disc::dns", ?event, "processed");
162 }
163 })
164 }
165
166 pub fn bootstrap(&mut self) {
168 for link in self.bootstrap_dns_networks.clone() {
169 self.sync_tree_with_link(link);
170 }
171 }
172
173 pub fn new_pair(resolver: Arc<R>, config: DnsDiscoveryConfig) -> (Self, DnsDiscoveryHandle) {
176 let service = Self::new(resolver, config);
177 let handle = service.handle();
178 (service, handle)
179 }
180
181 pub fn handle(&self) -> DnsDiscoveryHandle {
183 DnsDiscoveryHandle { to_service: self.command_tx.clone() }
184 }
185
186 pub fn node_record_stream(&mut self) -> ReceiverStream<DnsNodeRecordUpdate> {
188 let (tx, rx) = mpsc::channel(256);
189 self.node_record_listeners.push(tx);
190 ReceiverStream::new(rx)
191 }
192
193 fn notify(&mut self, record: DnsNodeRecordUpdate) {
197 self.node_record_listeners.retain_mut(|listener| match listener.try_send(record.clone()) {
198 Ok(()) => true,
199 Err(err) => match err {
200 TrySendError::Full(_) => true,
201 TrySendError::Closed(_) => false,
202 },
203 });
204 }
205
206 pub fn sync_tree(&mut self, link: &str) -> Result<(), ParseDnsEntryError> {
208 self.sync_tree_with_link(link.parse()?);
209 Ok(())
210 }
211
212 pub fn sync_tree_with_link(&mut self, link: LinkEntry) {
214 self.queries.resolve_root(link);
215 }
216
217 fn resolve_entry(&mut self, link: LinkEntry<SecretKey>, hash: String, kind: ResolveKind) {
219 if let Some(entry) = self.dns_record_cache.get(&hash).cloned() {
220 let cached = ResolveEntryResult { entry: Some(Ok(entry)), link, hash, kind };
222 self.on_resolved_entry(cached);
223 return
224 }
225 self.queries.resolve_entry(link, hash, kind)
226 }
227
228 fn on_resolved_root(&mut self, resp: ResolveRootResult<SecretKey>) {
229 match resp {
230 Ok((root, link)) => match self.trees.entry(link.clone()) {
231 Entry::Occupied(mut entry) => {
232 entry.get_mut().update_root(root);
233 }
234 Entry::Vacant(entry) => {
235 entry.insert(SyncTree::new(root, link));
236 }
237 },
238 Err((err, link)) => {
239 debug!(target: "disc::dns",%err, ?link, "Failed to lookup root")
240 }
241 }
242 }
243
244 fn on_resolved_enr(&mut self, enr: Enr<SecretKey>) {
245 if let Some(record) = convert_enr_node_record(&enr) {
246 self.notify(record);
247 }
248 self.queued_events.push_back(DnsDiscoveryEvent::Enr(enr))
249 }
250
251 fn on_resolved_entry(&mut self, resp: ResolveEntryResult<SecretKey>) {
252 let ResolveEntryResult { entry, link, hash, kind } = resp;
253
254 match entry {
255 Some(Err(err)) => {
256 debug!(target: "disc::dns",%err, domain=%link.domain, ?hash, "Failed to lookup entry")
257 }
258 None => {
259 trace!(target: "disc::dns",domain=%link.domain, ?hash, "No dns entry")
260 }
261 Some(Ok(entry)) => {
262 self.dns_record_cache.insert(hash.clone(), entry.clone());
264
265 match entry {
266 DnsEntry::Root(root) => {
267 debug!(target: "disc::dns",%root, domain=%link.domain, ?hash, "resolved unexpected root entry");
268 }
269 DnsEntry::Link(link_entry) => {
270 if kind.is_link() {
271 self.sync_tree_with_link(link_entry)
272 } else {
273 debug!(target: "disc::dns",%link_entry, domain=%link.domain, ?hash, "resolved unexpected Link entry");
274 }
275 }
276 DnsEntry::Branch(branch_entry) => {
277 if let Some(tree) = self.trees.get_mut(&link) {
278 tree.extend_children(kind, branch_entry.children)
279 }
280 }
281 DnsEntry::Node(entry) => {
282 if kind.is_link() {
283 debug!(target: "disc::dns",domain=%link.domain, ?hash, "resolved unexpected enr entry");
284 } else {
285 self.on_resolved_enr(entry.enr)
286 }
287 }
288 }
289 }
290 }
291 }
292
293 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DnsDiscoveryEvent> {
295 loop {
296 if let Some(event) = self.queued_events.pop_front() {
298 return Poll::Ready(event)
299 }
300
301 while let Poll::Ready(Some(cmd)) = Pin::new(&mut self.command_rx).poll_next(cx) {
303 match cmd {
304 DnsDiscoveryCommand::SyncTree(link) => {
305 self.sync_tree_with_link(link);
306 }
307 DnsDiscoveryCommand::NodeRecordUpdates(tx) => {
308 let _ = tx.send(self.node_record_stream());
309 }
310 }
311 }
312
313 while let Poll::Ready(outcome) = self.queries.poll(cx) {
314 match outcome {
316 QueryOutcome::Root(resp) => self.on_resolved_root(resp),
317 QueryOutcome::Entry(resp) => self.on_resolved_entry(resp),
318 }
319 }
320
321 let mut progress = false;
322 let now = Instant::now();
323 let mut pending_resolves = Vec::new();
324 let mut pending_updates = Vec::new();
325 for tree in self.trees.values_mut() {
326 while let Some(action) = tree.poll(now, self.recheck_interval) {
327 progress = true;
328 match action {
329 SyncAction::UpdateRoot => {
330 pending_updates.push(tree.link().clone());
331 }
332 SyncAction::Enr(hash) => {
333 pending_resolves.push((tree.link().clone(), hash, ResolveKind::Enr));
334 }
335 SyncAction::Link(hash) => {
336 pending_resolves.push((tree.link().clone(), hash, ResolveKind::Link));
337 }
338 }
339 }
340 }
341
342 for (domain, hash, kind) in pending_resolves {
343 self.resolve_entry(domain, hash, kind)
344 }
345
346 for link in pending_updates {
347 self.sync_tree_with_link(link)
348 }
349
350 if !progress && self.queued_events.is_empty() {
351 return Poll::Pending
352 }
353 }
354 }
355}
356
357impl<R: Resolver> Stream for DnsDiscoveryService<R> {
359 type Item = DnsDiscoveryEvent;
360
361 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
362 Poll::Ready(Some(ready!(self.get_mut().poll(cx))))
363 }
364}
365
366#[derive(Debug, Clone, Eq, PartialEq)]
368pub struct DnsNodeRecordUpdate {
369 pub node_record: NodeRecord,
371 pub fork_id: Option<ForkId>,
373 pub enr: Enr<SecretKey>,
375}
376
377enum DnsDiscoveryCommand {
379 SyncTree(LinkEntry),
381 NodeRecordUpdates(oneshot::Sender<ReceiverStream<DnsNodeRecordUpdate>>),
382}
383
384#[derive(Debug, Clone)]
386pub enum DnsDiscoveryEvent {
387 Enr(Enr<SecretKey>),
389}
390
391fn convert_enr_node_record(enr: &Enr<SecretKey>) -> Option<DnsNodeRecordUpdate> {
393 let node_record = NodeRecord {
394 address: enr.ip4().map(IpAddr::from).or_else(|| enr.ip6().map(IpAddr::from))?,
395 tcp_port: enr.tcp4().or_else(|| enr.tcp6())?,
396 udp_port: enr.udp4().or_else(|| enr.udp6())?,
397 id: pk2id(&enr.public_key()),
398 }
399 .into_ipv4_mapped();
400
401 let fork_id =
402 enr.get_decodable::<EnrForkIdEntry>(b"eth").transpose().ok().flatten().map(Into::into);
403
404 Some(DnsNodeRecordUpdate { node_record, fork_id, enr: enr.clone() })
405}
406
407#[cfg(test)]
408mod tests {
409 use super::*;
410 use crate::tree::TreeRootEntry;
411 use alloy_chains::Chain;
412 use alloy_rlp::{Decodable, Encodable};
413 use enr::EnrKey;
414 use reth_chainspec::MAINNET;
415 use reth_ethereum_forks::{EthereumHardfork, ForkHash};
416 use secp256k1::rand::thread_rng;
417 use std::{future::poll_fn, net::Ipv4Addr};
418
419 #[test]
420 fn test_convert_enr_node_record() {
421 let secret_key = SecretKey::new(&mut secp256k1::rand::thread_rng());
423 let enr = Enr::builder()
424 .ip("127.0.0.1".parse().unwrap())
425 .udp4(9000)
426 .tcp4(30303)
427 .add_value(b"eth", &EnrForkIdEntry::from(MAINNET.latest_fork_id()))
428 .build(&secret_key)
429 .unwrap();
430
431 let node_record_update = convert_enr_node_record(&enr).unwrap();
433
434 assert_eq!(node_record_update.node_record.address, "127.0.0.1".parse::<IpAddr>().unwrap());
435 assert_eq!(node_record_update.node_record.tcp_port, 30303);
436 assert_eq!(node_record_update.node_record.udp_port, 9000);
437 assert_eq!(node_record_update.fork_id, Some(MAINNET.latest_fork_id()));
438 assert_eq!(node_record_update.enr, enr);
439 }
440
441 #[test]
442 fn test_decode_and_convert_enr_node_record() {
443 let secret_key = SecretKey::new(&mut secp256k1::rand::thread_rng());
446 let enr = Enr::builder()
447 .ip("127.0.0.1".parse().unwrap())
448 .udp4(9000)
449 .tcp4(30303)
450 .add_value(b"eth", &EnrForkIdEntry::from(MAINNET.latest_fork_id()))
451 .add_value(b"opstack", &ForkId { hash: ForkHash(rand::random()), next: rand::random() })
452 .build(&secret_key)
453 .unwrap();
454
455 let mut encoded_enr = vec![];
456 enr.encode(&mut encoded_enr);
457
458 let decoded_enr = Enr::decode(&mut &encoded_enr[..]).unwrap();
460
461 let node_record_update = convert_enr_node_record(&decoded_enr).unwrap();
462
463 assert_eq!(node_record_update.node_record.address, "127.0.0.1".parse::<IpAddr>().unwrap());
464 assert_eq!(node_record_update.node_record.tcp_port, 30303);
465 assert_eq!(node_record_update.node_record.udp_port, 9000);
466 assert_eq!(node_record_update.fork_id, Some(MAINNET.latest_fork_id()));
467 assert_eq!(node_record_update.enr, enr);
468 }
469
470 #[tokio::test]
471 async fn test_start_root_sync() {
472 reth_tracing::init_test_tracing();
473
474 let secret_key = SecretKey::new(&mut thread_rng());
475 let resolver = MapResolver::default();
476 let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE";
477 let mut root: TreeRootEntry = s.parse().unwrap();
478 root.sign(&secret_key).unwrap();
479
480 let link =
481 LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() };
482 resolver.insert(link.domain.clone(), root.to_string());
483
484 let mut service = DnsDiscoveryService::new(Arc::new(resolver), Default::default());
485
486 service.sync_tree_with_link(link.clone());
487
488 poll_fn(|cx| {
489 let _ = service.poll(cx);
490 Poll::Ready(())
491 })
492 .await;
493
494 let tree = service.trees.get(&link).unwrap();
495 assert_eq!(tree.root().clone(), root);
496 }
497
498 #[tokio::test(flavor = "multi_thread")]
499 async fn test_get_node() {
500 reth_tracing::init_test_tracing();
501
502 let secret_key = SecretKey::new(&mut thread_rng());
503 let resolver = MapResolver::default();
504 let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE";
505 let mut root: TreeRootEntry = s.parse().unwrap();
506 root.sign(&secret_key).unwrap();
507
508 let link =
509 LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() };
510 resolver.insert(link.domain.clone(), root.to_string());
511
512 let mut builder = Enr::builder();
513 let fork_id = MAINNET.hardfork_fork_id(EthereumHardfork::Frontier).unwrap();
514 builder
515 .ip4(Ipv4Addr::LOCALHOST)
516 .udp4(30303)
517 .tcp4(30303)
518 .add_value(b"eth", &EnrForkIdEntry::from(fork_id));
519 let enr = builder.build(&secret_key).unwrap();
520
521 resolver.insert(format!("{}.{}", root.enr_root.clone(), link.domain), enr.to_base64());
522
523 let mut service = DnsDiscoveryService::new(Arc::new(resolver), Default::default());
524
525 let mut node_records = service.node_record_stream();
526
527 let task = tokio::task::spawn(async move {
528 let record = node_records.next().await.unwrap();
529 assert_eq!(record.fork_id, Some(fork_id));
530 });
531
532 service.sync_tree_with_link(link.clone());
533
534 let event = poll_fn(|cx| service.poll(cx)).await;
535
536 match event {
537 DnsDiscoveryEvent::Enr(discovered) => {
538 assert_eq!(discovered, enr);
539 }
540 }
541
542 poll_fn(|cx| {
543 assert!(service.poll(cx).is_pending());
544 Poll::Ready(())
545 })
546 .await;
547
548 task.await.unwrap();
549 }
550
551 #[tokio::test]
552 async fn test_recheck_tree() {
553 reth_tracing::init_test_tracing();
554
555 let config = DnsDiscoveryConfig {
556 recheck_interval: Duration::from_millis(750),
557 ..Default::default()
558 };
559
560 let secret_key = SecretKey::new(&mut thread_rng());
561 let resolver = Arc::new(MapResolver::default());
562 let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE";
563 let mut root: TreeRootEntry = s.parse().unwrap();
564 root.sign(&secret_key).unwrap();
565
566 let link =
567 LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() };
568 resolver.insert(link.domain.clone(), root.to_string());
569
570 let mut service = DnsDiscoveryService::new(Arc::clone(&resolver), config.clone());
571
572 service.sync_tree_with_link(link.clone());
573
574 poll_fn(|cx| {
575 assert!(service.poll(cx).is_pending());
576 Poll::Ready(())
577 })
578 .await;
579
580 tokio::time::sleep(config.recheck_interval).await;
582
583 let mut new_root = root.clone();
584 new_root.sequence_number = new_root.sequence_number.saturating_add(1);
585 new_root.enr_root = "NEW_ENR_ROOT".to_string();
586 new_root.sign(&secret_key).unwrap();
587 resolver.insert(link.domain.clone(), new_root.to_string());
588
589 let enr = Enr::empty(&secret_key).unwrap();
590 resolver.insert(format!("{}.{}", new_root.enr_root.clone(), link.domain), enr.to_base64());
591
592 let event = poll_fn(|cx| service.poll(cx)).await;
593
594 match event {
595 DnsDiscoveryEvent::Enr(discovered) => {
596 assert_eq!(discovered, enr);
597 }
598 }
599
600 poll_fn(|cx| {
601 assert!(service.poll(cx).is_pending());
602 Poll::Ready(())
603 })
604 .await;
605 }
606
607 #[tokio::test]
608 #[ignore]
609 async fn test_dns_resolver() {
610 reth_tracing::init_test_tracing();
611
612 let mut service = DnsDiscoveryService::new(
613 Arc::new(DnsResolver::from_system_conf().unwrap()),
614 Default::default(),
615 );
616
617 service.sync_tree(&Chain::mainnet().public_dns_network_protocol().unwrap()).unwrap();
618
619 while let Some(event) = service.next().await {
620 match event {
621 DnsDiscoveryEvent::Enr(enr) => {
622 println!("discovered enr {}", enr.to_base64());
623 }
624 }
625 }
626 }
627}