1#![doc(
9 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
10 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
11 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
12)]
13#![cfg_attr(not(test), warn(unused_crate_dependencies))]
14#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
15
16pub use crate::resolver::{DnsResolver, MapResolver, Resolver};
17use crate::{
18 query::{QueryOutcome, QueryPool, ResolveEntryResult, ResolveRootResult},
19 sync::{ResolveKind, SyncAction},
20 tree::{DnsEntry, LinkEntry},
21};
22pub use config::DnsDiscoveryConfig;
23use enr::Enr;
24pub use error::ParseDnsEntryError;
25use reth_ethereum_forks::{EnrForkIdEntry, ForkId};
26use reth_network_peers::{pk2id, NodeRecord};
27use schnellru::{ByLength, LruMap};
28use secp256k1::SecretKey;
29use std::{
30 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
31 net::IpAddr,
32 pin::Pin,
33 sync::Arc,
34 task::{ready, Context, Poll},
35 time::{Duration, Instant},
36};
37use sync::SyncTree;
38use tokio::{
39 sync::{
40 mpsc,
41 mpsc::{error::TrySendError, UnboundedSender},
42 oneshot,
43 },
44 task::JoinHandle,
45};
46use tokio_stream::{
47 wrappers::{ReceiverStream, UnboundedReceiverStream},
48 Stream, StreamExt,
49};
50use tracing::{debug, trace};
51
52mod config;
53mod error;
54mod query;
55pub mod resolver;
56mod sync;
57pub mod tree;
58
59#[derive(Clone, Debug)]
61pub struct DnsDiscoveryHandle {
62 to_service: UnboundedSender<DnsDiscoveryCommand>,
64}
65
66impl DnsDiscoveryHandle {
69 pub fn sync_tree(&self, link: &str) -> Result<(), ParseDnsEntryError> {
71 self.sync_tree_with_link(link.parse()?);
72 Ok(())
73 }
74
75 pub fn sync_tree_with_link(&self, link: LinkEntry) {
77 let _ = self.to_service.send(DnsDiscoveryCommand::SyncTree(link));
78 }
79
80 pub async fn node_record_stream(
82 &self,
83 ) -> Result<ReceiverStream<DnsNodeRecordUpdate>, oneshot::error::RecvError> {
84 let (tx, rx) = oneshot::channel();
85 let cmd = DnsDiscoveryCommand::NodeRecordUpdates(tx);
86 let _ = self.to_service.send(cmd);
87 rx.await
88 }
89}
90
91#[must_use = "Service does nothing unless polled"]
93#[expect(missing_debug_implementations)]
94pub struct DnsDiscoveryService<R: Resolver = DnsResolver> {
95 command_tx: UnboundedSender<DnsDiscoveryCommand>,
97 command_rx: UnboundedReceiverStream<DnsDiscoveryCommand>,
99 node_record_listeners: Vec<mpsc::Sender<DnsNodeRecordUpdate>>,
101 trees: HashMap<LinkEntry, SyncTree>,
103 queries: QueryPool<R, SecretKey>,
105 dns_record_cache: LruMap<String, DnsEntry<SecretKey>>,
107 queued_events: VecDeque<DnsDiscoveryEvent>,
109 recheck_interval: Duration,
111 bootstrap_dns_networks: HashSet<LinkEntry>,
113}
114
115impl<R: Resolver> DnsDiscoveryService<R> {
118 pub fn new(resolver: Arc<R>, config: DnsDiscoveryConfig) -> Self {
131 let DnsDiscoveryConfig {
132 lookup_timeout,
133 max_requests_per_sec,
134 recheck_interval,
135 dns_record_cache_limit,
136 bootstrap_dns_networks,
137 } = config;
138 let queries = QueryPool::new(resolver, max_requests_per_sec, lookup_timeout);
139 let (command_tx, command_rx) = mpsc::unbounded_channel();
140 Self {
141 command_tx,
142 command_rx: UnboundedReceiverStream::new(command_rx),
143 node_record_listeners: Default::default(),
144 trees: Default::default(),
145 queries,
146 dns_record_cache: LruMap::new(ByLength::new(dns_record_cache_limit.get())),
147 queued_events: Default::default(),
148 recheck_interval,
149 bootstrap_dns_networks: bootstrap_dns_networks.unwrap_or_default(),
150 }
151 }
152
153 pub fn spawn(mut self) -> JoinHandle<()> {
157 tokio::task::spawn(async move {
158 self.bootstrap();
159
160 while let Some(event) = self.next().await {
161 trace!(target: "disc::dns", ?event, "processed");
162 }
163 })
164 }
165
166 pub fn bootstrap(&mut self) {
168 for link in self.bootstrap_dns_networks.clone() {
169 self.sync_tree_with_link(link);
170 }
171 }
172
173 pub fn new_pair(resolver: Arc<R>, config: DnsDiscoveryConfig) -> (Self, DnsDiscoveryHandle) {
176 let service = Self::new(resolver, config);
177 let handle = service.handle();
178 (service, handle)
179 }
180
181 pub fn handle(&self) -> DnsDiscoveryHandle {
183 DnsDiscoveryHandle { to_service: self.command_tx.clone() }
184 }
185
186 pub fn node_record_stream(&mut self) -> ReceiverStream<DnsNodeRecordUpdate> {
188 let (tx, rx) = mpsc::channel(256);
189 self.node_record_listeners.push(tx);
190 ReceiverStream::new(rx)
191 }
192
193 fn notify(&mut self, record: DnsNodeRecordUpdate) {
197 self.node_record_listeners.retain_mut(|listener| match listener.try_send(record.clone()) {
198 Ok(()) => true,
199 Err(err) => match err {
200 TrySendError::Full(_) => true,
201 TrySendError::Closed(_) => false,
202 },
203 });
204 }
205
206 pub fn sync_tree(&mut self, link: &str) -> Result<(), ParseDnsEntryError> {
208 self.sync_tree_with_link(link.parse()?);
209 Ok(())
210 }
211
212 pub fn sync_tree_with_link(&mut self, link: LinkEntry) {
214 self.queries.resolve_root(link);
215 }
216
217 fn resolve_entry(&mut self, link: LinkEntry<SecretKey>, hash: String, kind: ResolveKind) {
219 if let Some(entry) = self.dns_record_cache.get(&hash).cloned() {
220 let cached = ResolveEntryResult { entry: Some(Ok(entry)), link, hash, kind };
222 self.on_resolved_entry(cached);
223 return
224 }
225 self.queries.resolve_entry(link, hash, kind)
226 }
227
228 fn on_resolved_root(&mut self, resp: ResolveRootResult<SecretKey>) {
229 match resp {
230 Ok((root, link)) => match self.trees.entry(link.clone()) {
231 Entry::Occupied(mut entry) => {
232 entry.get_mut().update_root(root);
233 }
234 Entry::Vacant(entry) => {
235 entry.insert(SyncTree::new(root, link));
236 }
237 },
238 Err((err, link)) => {
239 debug!(target: "disc::dns",%err, ?link, "Failed to lookup root")
240 }
241 }
242 }
243
244 fn on_resolved_enr(&mut self, enr: Enr<SecretKey>) {
245 if let Some(record) = convert_enr_node_record(&enr) {
246 self.notify(record);
247 }
248 self.queued_events.push_back(DnsDiscoveryEvent::Enr(enr))
249 }
250
251 fn on_resolved_entry(&mut self, resp: ResolveEntryResult<SecretKey>) {
252 let ResolveEntryResult { entry, link, hash, kind } = resp;
253
254 match entry {
255 Some(Err(err)) => {
256 debug!(target: "disc::dns",%err, domain=%link.domain, ?hash, "Failed to lookup entry")
257 }
258 None => {
259 trace!(target: "disc::dns",domain=%link.domain, ?hash, "No dns entry")
260 }
261 Some(Ok(entry)) => {
262 self.dns_record_cache.insert(hash.clone(), entry.clone());
264
265 match entry {
266 DnsEntry::Root(root) => {
267 debug!(target: "disc::dns",%root, domain=%link.domain, ?hash, "resolved unexpected root entry");
268 }
269 DnsEntry::Link(link_entry) => {
270 if kind.is_link() {
271 if let Some(tree) = self.trees.get_mut(&link) {
272 tree.resolved_links_mut().insert(hash, link_entry.clone());
273 }
274 self.sync_tree_with_link(link_entry)
275 } else {
276 debug!(target: "disc::dns",%link_entry, domain=%link.domain, ?hash, "resolved unexpected Link entry");
277 }
278 }
279 DnsEntry::Branch(branch_entry) => {
280 if let Some(tree) = self.trees.get_mut(&link) {
281 tree.extend_children(kind, branch_entry.children)
282 }
283 }
284 DnsEntry::Node(entry) => {
285 if kind.is_link() {
286 debug!(target: "disc::dns",domain=%link.domain, ?hash, "resolved unexpected enr entry");
287 } else {
288 self.on_resolved_enr(entry.enr)
289 }
290 }
291 }
292 }
293 }
294 }
295
296 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DnsDiscoveryEvent> {
298 loop {
299 if let Some(event) = self.queued_events.pop_front() {
301 return Poll::Ready(event)
302 }
303
304 while let Poll::Ready(Some(cmd)) = Pin::new(&mut self.command_rx).poll_next(cx) {
306 match cmd {
307 DnsDiscoveryCommand::SyncTree(link) => {
308 self.sync_tree_with_link(link);
309 }
310 DnsDiscoveryCommand::NodeRecordUpdates(tx) => {
311 let _ = tx.send(self.node_record_stream());
312 }
313 }
314 }
315
316 while let Poll::Ready(outcome) = self.queries.poll(cx) {
317 match outcome {
319 QueryOutcome::Root(resp) => self.on_resolved_root(resp),
320 QueryOutcome::Entry(resp) => self.on_resolved_entry(resp),
321 }
322 }
323
324 let mut progress = false;
325 let now = Instant::now();
326 let mut pending_resolves = Vec::new();
327 let mut pending_updates = Vec::new();
328 for tree in self.trees.values_mut() {
329 while let Some(action) = tree.poll(now, self.recheck_interval) {
330 progress = true;
331 match action {
332 SyncAction::UpdateRoot => {
333 pending_updates.push(tree.link().clone());
334 }
335 SyncAction::Enr(hash) => {
336 pending_resolves.push((tree.link().clone(), hash, ResolveKind::Enr));
337 }
338 SyncAction::Link(hash) => {
339 pending_resolves.push((tree.link().clone(), hash, ResolveKind::Link));
340 }
341 }
342 }
343 }
344
345 for (domain, hash, kind) in pending_resolves {
346 self.resolve_entry(domain, hash, kind)
347 }
348
349 for link in pending_updates {
350 self.sync_tree_with_link(link)
351 }
352
353 if !progress && self.queued_events.is_empty() {
354 return Poll::Pending
355 }
356 }
357 }
358}
359
360impl<R: Resolver> Stream for DnsDiscoveryService<R> {
362 type Item = DnsDiscoveryEvent;
363
364 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
365 Poll::Ready(Some(ready!(self.get_mut().poll(cx))))
366 }
367}
368
369#[derive(Debug, Clone, Eq, PartialEq)]
371pub struct DnsNodeRecordUpdate {
372 pub node_record: NodeRecord,
374 pub fork_id: Option<ForkId>,
376 pub enr: Enr<SecretKey>,
378}
379
380enum DnsDiscoveryCommand {
382 SyncTree(LinkEntry),
384 NodeRecordUpdates(oneshot::Sender<ReceiverStream<DnsNodeRecordUpdate>>),
385}
386
387#[derive(Debug, Clone)]
389pub enum DnsDiscoveryEvent {
390 Enr(Enr<SecretKey>),
392}
393
394fn convert_enr_node_record(enr: &Enr<SecretKey>) -> Option<DnsNodeRecordUpdate> {
396 let node_record = NodeRecord {
397 address: enr.ip4().map(IpAddr::from).or_else(|| enr.ip6().map(IpAddr::from))?,
398 tcp_port: enr.tcp4().or_else(|| enr.tcp6())?,
399 udp_port: enr.udp4().or_else(|| enr.udp6())?,
400 id: pk2id(&enr.public_key()),
401 }
402 .into_ipv4_mapped();
403
404 let fork_id =
405 enr.get_decodable::<EnrForkIdEntry>(b"eth").transpose().ok().flatten().map(Into::into);
406
407 Some(DnsNodeRecordUpdate { node_record, fork_id, enr: enr.clone() })
408}
409
410#[cfg(test)]
411mod tests {
412 use super::*;
413 use crate::tree::TreeRootEntry;
414 use alloy_chains::Chain;
415 use alloy_rlp::{Decodable, Encodable};
416 use enr::EnrKey;
417 use reth_chainspec::MAINNET;
418 use reth_ethereum_forks::{EthereumHardfork, ForkHash};
419 use secp256k1::rand::thread_rng;
420 use std::{future::poll_fn, net::Ipv4Addr};
421
422 #[test]
423 fn test_convert_enr_node_record() {
424 let secret_key = SecretKey::new(&mut secp256k1::rand::thread_rng());
426 let enr = Enr::builder()
427 .ip("127.0.0.1".parse().unwrap())
428 .udp4(9000)
429 .tcp4(30303)
430 .add_value(b"eth", &EnrForkIdEntry::from(MAINNET.latest_fork_id()))
431 .build(&secret_key)
432 .unwrap();
433
434 let node_record_update = convert_enr_node_record(&enr).unwrap();
436
437 assert_eq!(node_record_update.node_record.address, "127.0.0.1".parse::<IpAddr>().unwrap());
438 assert_eq!(node_record_update.node_record.tcp_port, 30303);
439 assert_eq!(node_record_update.node_record.udp_port, 9000);
440 assert_eq!(node_record_update.fork_id, Some(MAINNET.latest_fork_id()));
441 assert_eq!(node_record_update.enr, enr);
442 }
443
444 #[test]
445 fn test_decode_and_convert_enr_node_record() {
446 let secret_key = SecretKey::new(&mut secp256k1::rand::thread_rng());
449 let enr = Enr::builder()
450 .ip("127.0.0.1".parse().unwrap())
451 .udp4(9000)
452 .tcp4(30303)
453 .add_value(b"eth", &EnrForkIdEntry::from(MAINNET.latest_fork_id()))
454 .add_value(b"opstack", &ForkId { hash: ForkHash(rand::random()), next: rand::random() })
455 .build(&secret_key)
456 .unwrap();
457
458 let mut encoded_enr = vec![];
459 enr.encode(&mut encoded_enr);
460
461 let decoded_enr = Enr::decode(&mut &encoded_enr[..]).unwrap();
463
464 let node_record_update = convert_enr_node_record(&decoded_enr).unwrap();
465
466 assert_eq!(node_record_update.node_record.address, "127.0.0.1".parse::<IpAddr>().unwrap());
467 assert_eq!(node_record_update.node_record.tcp_port, 30303);
468 assert_eq!(node_record_update.node_record.udp_port, 9000);
469 assert_eq!(node_record_update.fork_id, Some(MAINNET.latest_fork_id()));
470 assert_eq!(node_record_update.enr, enr);
471 }
472
473 #[tokio::test]
474 async fn test_start_root_sync() {
475 reth_tracing::init_test_tracing();
476
477 let secret_key = SecretKey::new(&mut thread_rng());
478 let resolver = MapResolver::default();
479 let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE";
480 let mut root: TreeRootEntry = s.parse().unwrap();
481 root.sign(&secret_key).unwrap();
482
483 let link =
484 LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() };
485 resolver.insert(link.domain.clone(), root.to_string());
486
487 let mut service = DnsDiscoveryService::new(Arc::new(resolver), Default::default());
488
489 service.sync_tree_with_link(link.clone());
490
491 poll_fn(|cx| {
492 let _ = service.poll(cx);
493 Poll::Ready(())
494 })
495 .await;
496
497 let tree = service.trees.get(&link).unwrap();
498 assert_eq!(tree.root().clone(), root);
499 }
500
501 #[tokio::test(flavor = "multi_thread")]
502 async fn test_get_node() {
503 reth_tracing::init_test_tracing();
504
505 let secret_key = SecretKey::new(&mut thread_rng());
506 let resolver = MapResolver::default();
507 let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE";
508 let mut root: TreeRootEntry = s.parse().unwrap();
509 root.sign(&secret_key).unwrap();
510
511 let link =
512 LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() };
513 resolver.insert(link.domain.clone(), root.to_string());
514
515 let mut builder = Enr::builder();
516 let fork_id = MAINNET.hardfork_fork_id(EthereumHardfork::Frontier).unwrap();
517 builder
518 .ip4(Ipv4Addr::LOCALHOST)
519 .udp4(30303)
520 .tcp4(30303)
521 .add_value(b"eth", &EnrForkIdEntry::from(fork_id));
522 let enr = builder.build(&secret_key).unwrap();
523
524 resolver.insert(format!("{}.{}", root.enr_root.clone(), link.domain), enr.to_base64());
525
526 let mut service = DnsDiscoveryService::new(Arc::new(resolver), Default::default());
527
528 let mut node_records = service.node_record_stream();
529
530 let task = tokio::task::spawn(async move {
531 let record = node_records.next().await.unwrap();
532 assert_eq!(record.fork_id, Some(fork_id));
533 });
534
535 service.sync_tree_with_link(link.clone());
536
537 let event = poll_fn(|cx| service.poll(cx)).await;
538
539 match event {
540 DnsDiscoveryEvent::Enr(discovered) => {
541 assert_eq!(discovered, enr);
542 }
543 }
544
545 poll_fn(|cx| {
546 assert!(service.poll(cx).is_pending());
547 Poll::Ready(())
548 })
549 .await;
550
551 task.await.unwrap();
552 }
553
554 #[tokio::test]
555 async fn test_recheck_tree() {
556 reth_tracing::init_test_tracing();
557
558 let config = DnsDiscoveryConfig {
559 recheck_interval: Duration::from_millis(750),
560 ..Default::default()
561 };
562
563 let secret_key = SecretKey::new(&mut thread_rng());
564 let resolver = Arc::new(MapResolver::default());
565 let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE";
566 let mut root: TreeRootEntry = s.parse().unwrap();
567 root.sign(&secret_key).unwrap();
568
569 let link =
570 LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() };
571 resolver.insert(link.domain.clone(), root.to_string());
572
573 let mut service = DnsDiscoveryService::new(Arc::clone(&resolver), config.clone());
574
575 service.sync_tree_with_link(link.clone());
576
577 poll_fn(|cx| {
578 assert!(service.poll(cx).is_pending());
579 Poll::Ready(())
580 })
581 .await;
582
583 tokio::time::sleep(config.recheck_interval).await;
585
586 let enr = Enr::empty(&secret_key).unwrap();
587 resolver.insert(format!("{}.{}", root.enr_root.clone(), link.domain), enr.to_base64());
588
589 let event = poll_fn(|cx| service.poll(cx)).await;
590
591 match event {
592 DnsDiscoveryEvent::Enr(discovered) => {
593 assert_eq!(discovered, enr);
594 }
595 }
596
597 poll_fn(|cx| {
598 assert!(service.poll(cx).is_pending());
599 Poll::Ready(())
600 })
601 .await;
602 }
603
604 #[tokio::test]
605 #[ignore]
606 async fn test_dns_resolver() {
607 reth_tracing::init_test_tracing();
608
609 let mut service = DnsDiscoveryService::new(
610 Arc::new(DnsResolver::from_system_conf().unwrap()),
611 Default::default(),
612 );
613
614 service.sync_tree(&Chain::mainnet().public_dns_network_protocol().unwrap()).unwrap();
615
616 while let Some(event) = service.next().await {
617 match event {
618 DnsDiscoveryEvent::Enr(enr) => {
619 println!("discovered enr {}", enr.to_base64());
620 }
621 }
622 }
623 }
624}