reth_dns_discovery/
lib.rs

1//! Implementation of [EIP-1459](https://eips.ethereum.org/EIPS/eip-1459) Node Discovery via DNS.
2//!
3//! ## Feature Flags
4//!
5//! - `serde` (default): Enable serde support
6//! - `test-utils`: Export utilities for testing
7
8#![doc(
9    html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
10    html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
11    issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
12)]
13#![cfg_attr(not(test), warn(unused_crate_dependencies))]
14#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
15
16pub use crate::resolver::{DnsResolver, MapResolver, Resolver};
17use crate::{
18    query::{QueryOutcome, QueryPool, ResolveEntryResult, ResolveRootResult},
19    sync::{ResolveKind, SyncAction},
20    tree::{DnsEntry, LinkEntry},
21};
22pub use config::DnsDiscoveryConfig;
23use enr::Enr;
24pub use error::ParseDnsEntryError;
25use reth_ethereum_forks::{EnrForkIdEntry, ForkId};
26use reth_network_peers::{pk2id, NodeRecord};
27use schnellru::{ByLength, LruMap};
28use secp256k1::SecretKey;
29use std::{
30    collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
31    net::IpAddr,
32    pin::Pin,
33    sync::Arc,
34    task::{ready, Context, Poll},
35    time::{Duration, Instant},
36};
37use sync::SyncTree;
38use tokio::{
39    sync::{
40        mpsc,
41        mpsc::{error::TrySendError, UnboundedSender},
42        oneshot,
43    },
44    task::JoinHandle,
45};
46use tokio_stream::{
47    wrappers::{ReceiverStream, UnboundedReceiverStream},
48    Stream, StreamExt,
49};
50use tracing::{debug, trace};
51
52mod config;
53mod error;
54mod query;
55pub mod resolver;
56mod sync;
57pub mod tree;
58
59/// [`DnsDiscoveryService`] front-end.
60#[derive(Clone, Debug)]
61pub struct DnsDiscoveryHandle {
62    /// Channel for sending commands to the service.
63    to_service: UnboundedSender<DnsDiscoveryCommand>,
64}
65
66// === impl DnsDiscovery ===
67
68impl DnsDiscoveryHandle {
69    /// Starts syncing the given link to a tree.
70    pub fn sync_tree(&self, link: &str) -> Result<(), ParseDnsEntryError> {
71        self.sync_tree_with_link(link.parse()?);
72        Ok(())
73    }
74
75    /// Starts syncing the given link to a tree.
76    pub fn sync_tree_with_link(&self, link: LinkEntry) {
77        let _ = self.to_service.send(DnsDiscoveryCommand::SyncTree(link));
78    }
79
80    /// Returns the receiver half of new listener channel that streams discovered [`NodeRecord`]s.
81    pub async fn node_record_stream(
82        &self,
83    ) -> Result<ReceiverStream<DnsNodeRecordUpdate>, oneshot::error::RecvError> {
84        let (tx, rx) = oneshot::channel();
85        let cmd = DnsDiscoveryCommand::NodeRecordUpdates(tx);
86        let _ = self.to_service.send(cmd);
87        rx.await
88    }
89}
90
91/// A client that discovers nodes via DNS.
92#[must_use = "Service does nothing unless polled"]
93#[expect(missing_debug_implementations)]
94pub struct DnsDiscoveryService<R: Resolver = DnsResolver> {
95    /// Copy of the sender half, so new [`DnsDiscoveryHandle`] can be created on demand.
96    command_tx: UnboundedSender<DnsDiscoveryCommand>,
97    /// Receiver half of the command channel.
98    command_rx: UnboundedReceiverStream<DnsDiscoveryCommand>,
99    /// All subscribers for resolved [`NodeRecord`]s.
100    node_record_listeners: Vec<mpsc::Sender<DnsNodeRecordUpdate>>,
101    /// All the trees that can be synced.
102    trees: HashMap<LinkEntry, SyncTree>,
103    /// All queries currently in progress
104    queries: QueryPool<R, SecretKey>,
105    /// Cached dns records
106    dns_record_cache: LruMap<String, DnsEntry<SecretKey>>,
107    /// all buffered events
108    queued_events: VecDeque<DnsDiscoveryEvent>,
109    /// The rate at which trees should be updated.
110    recheck_interval: Duration,
111    /// Links to the DNS networks to bootstrap.
112    bootstrap_dns_networks: HashSet<LinkEntry>,
113}
114
115// === impl DnsDiscoveryService ===
116
117impl<R: Resolver> DnsDiscoveryService<R> {
118    /// Creates a new instance of the [`DnsDiscoveryService`] using the given settings.
119    ///
120    /// ```
121    /// use reth_dns_discovery::{DnsDiscoveryService, DnsResolver};
122    /// use std::sync::Arc;
123    /// # fn t() {
124    /// let service = DnsDiscoveryService::new(
125    ///     Arc::new(DnsResolver::from_system_conf().unwrap()),
126    ///     Default::default(),
127    /// );
128    /// # }
129    /// ```
130    pub fn new(resolver: Arc<R>, config: DnsDiscoveryConfig) -> Self {
131        let DnsDiscoveryConfig {
132            lookup_timeout,
133            max_requests_per_sec,
134            recheck_interval,
135            dns_record_cache_limit,
136            bootstrap_dns_networks,
137        } = config;
138        let queries = QueryPool::new(resolver, max_requests_per_sec, lookup_timeout);
139        let (command_tx, command_rx) = mpsc::unbounded_channel();
140        Self {
141            command_tx,
142            command_rx: UnboundedReceiverStream::new(command_rx),
143            node_record_listeners: Default::default(),
144            trees: Default::default(),
145            queries,
146            dns_record_cache: LruMap::new(ByLength::new(dns_record_cache_limit.get())),
147            queued_events: Default::default(),
148            recheck_interval,
149            bootstrap_dns_networks: bootstrap_dns_networks.unwrap_or_default(),
150        }
151    }
152
153    /// Spawns this services onto a new task
154    ///
155    /// Note: requires a running runtime
156    pub fn spawn(mut self) -> JoinHandle<()> {
157        tokio::task::spawn(async move {
158            self.bootstrap();
159
160            while let Some(event) = self.next().await {
161                trace!(target: "disc::dns", ?event, "processed");
162            }
163        })
164    }
165
166    /// Starts discovery with all configured bootstrap links
167    pub fn bootstrap(&mut self) {
168        for link in self.bootstrap_dns_networks.clone() {
169            self.sync_tree_with_link(link);
170        }
171    }
172
173    /// Same as [`DnsDiscoveryService::new`] but also returns a new handle that's connected to the
174    /// service
175    pub fn new_pair(resolver: Arc<R>, config: DnsDiscoveryConfig) -> (Self, DnsDiscoveryHandle) {
176        let service = Self::new(resolver, config);
177        let handle = service.handle();
178        (service, handle)
179    }
180
181    /// Returns a new [`DnsDiscoveryHandle`] that can send commands to this type.
182    pub fn handle(&self) -> DnsDiscoveryHandle {
183        DnsDiscoveryHandle { to_service: self.command_tx.clone() }
184    }
185
186    /// Creates a new channel for [`NodeRecord`]s.
187    pub fn node_record_stream(&mut self) -> ReceiverStream<DnsNodeRecordUpdate> {
188        let (tx, rx) = mpsc::channel(256);
189        self.node_record_listeners.push(tx);
190        ReceiverStream::new(rx)
191    }
192
193    /// Sends  the event to all listeners.
194    ///
195    /// Remove channels that got closed.
196    fn notify(&mut self, record: DnsNodeRecordUpdate) {
197        self.node_record_listeners.retain_mut(|listener| match listener.try_send(record.clone()) {
198            Ok(()) => true,
199            Err(err) => match err {
200                TrySendError::Full(_) => true,
201                TrySendError::Closed(_) => false,
202            },
203        });
204    }
205
206    /// Starts syncing the given link to a tree.
207    pub fn sync_tree(&mut self, link: &str) -> Result<(), ParseDnsEntryError> {
208        self.sync_tree_with_link(link.parse()?);
209        Ok(())
210    }
211
212    /// Starts syncing the given link to a tree.
213    pub fn sync_tree_with_link(&mut self, link: LinkEntry) {
214        self.queries.resolve_root(link);
215    }
216
217    /// Resolves an entry
218    fn resolve_entry(&mut self, link: LinkEntry<SecretKey>, hash: String, kind: ResolveKind) {
219        if let Some(entry) = self.dns_record_cache.get(&hash).cloned() {
220            // already resolved
221            let cached = ResolveEntryResult { entry: Some(Ok(entry)), link, hash, kind };
222            self.on_resolved_entry(cached);
223            return
224        }
225        self.queries.resolve_entry(link, hash, kind)
226    }
227
228    fn on_resolved_root(&mut self, resp: ResolveRootResult<SecretKey>) {
229        match resp {
230            Ok((root, link)) => match self.trees.entry(link.clone()) {
231                Entry::Occupied(mut entry) => {
232                    entry.get_mut().update_root(root);
233                }
234                Entry::Vacant(entry) => {
235                    entry.insert(SyncTree::new(root, link));
236                }
237            },
238            Err((err, link)) => {
239                debug!(target: "disc::dns",%err, ?link, "Failed to lookup root")
240            }
241        }
242    }
243
244    fn on_resolved_enr(&mut self, enr: Enr<SecretKey>) {
245        if let Some(record) = convert_enr_node_record(&enr) {
246            self.notify(record);
247        }
248        self.queued_events.push_back(DnsDiscoveryEvent::Enr(enr))
249    }
250
251    fn on_resolved_entry(&mut self, resp: ResolveEntryResult<SecretKey>) {
252        let ResolveEntryResult { entry, link, hash, kind } = resp;
253
254        match entry {
255            Some(Err(err)) => {
256                debug!(target: "disc::dns",%err, domain=%link.domain, ?hash, "Failed to lookup entry")
257            }
258            None => {
259                trace!(target: "disc::dns",domain=%link.domain, ?hash, "No dns entry")
260            }
261            Some(Ok(entry)) => {
262                // cache entry
263                self.dns_record_cache.insert(hash.clone(), entry.clone());
264
265                match entry {
266                    DnsEntry::Root(root) => {
267                        debug!(target: "disc::dns",%root, domain=%link.domain, ?hash, "resolved unexpected root entry");
268                    }
269                    DnsEntry::Link(link_entry) => {
270                        if kind.is_link() {
271                            if let Some(tree) = self.trees.get_mut(&link) {
272                                tree.resolved_links_mut().insert(hash, link_entry.clone());
273                            }
274                            self.sync_tree_with_link(link_entry)
275                        } else {
276                            debug!(target: "disc::dns",%link_entry, domain=%link.domain, ?hash, "resolved unexpected Link entry");
277                        }
278                    }
279                    DnsEntry::Branch(branch_entry) => {
280                        if let Some(tree) = self.trees.get_mut(&link) {
281                            tree.extend_children(kind, branch_entry.children)
282                        }
283                    }
284                    DnsEntry::Node(entry) => {
285                        if kind.is_link() {
286                            debug!(target: "disc::dns",domain=%link.domain, ?hash, "resolved unexpected enr entry");
287                        } else {
288                            self.on_resolved_enr(entry.enr)
289                        }
290                    }
291                }
292            }
293        }
294    }
295
296    /// Advances the state of the DNS discovery service by polling,triggering lookups
297    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DnsDiscoveryEvent> {
298        loop {
299            // drain buffered events first
300            if let Some(event) = self.queued_events.pop_front() {
301                return Poll::Ready(event)
302            }
303
304            // process all incoming commands
305            while let Poll::Ready(Some(cmd)) = Pin::new(&mut self.command_rx).poll_next(cx) {
306                match cmd {
307                    DnsDiscoveryCommand::SyncTree(link) => {
308                        self.sync_tree_with_link(link);
309                    }
310                    DnsDiscoveryCommand::NodeRecordUpdates(tx) => {
311                        let _ = tx.send(self.node_record_stream());
312                    }
313                }
314            }
315
316            while let Poll::Ready(outcome) = self.queries.poll(cx) {
317                // handle query outcome
318                match outcome {
319                    QueryOutcome::Root(resp) => self.on_resolved_root(resp),
320                    QueryOutcome::Entry(resp) => self.on_resolved_entry(resp),
321                }
322            }
323
324            let mut progress = false;
325            let now = Instant::now();
326            let mut pending_resolves = Vec::new();
327            let mut pending_updates = Vec::new();
328            for tree in self.trees.values_mut() {
329                while let Some(action) = tree.poll(now, self.recheck_interval) {
330                    progress = true;
331                    match action {
332                        SyncAction::UpdateRoot => {
333                            pending_updates.push(tree.link().clone());
334                        }
335                        SyncAction::Enr(hash) => {
336                            pending_resolves.push((tree.link().clone(), hash, ResolveKind::Enr));
337                        }
338                        SyncAction::Link(hash) => {
339                            pending_resolves.push((tree.link().clone(), hash, ResolveKind::Link));
340                        }
341                    }
342                }
343            }
344
345            for (domain, hash, kind) in pending_resolves {
346                self.resolve_entry(domain, hash, kind)
347            }
348
349            for link in pending_updates {
350                self.sync_tree_with_link(link)
351            }
352
353            if !progress && self.queued_events.is_empty() {
354                return Poll::Pending
355            }
356        }
357    }
358}
359
360/// A Stream events, mainly used for debugging
361impl<R: Resolver> Stream for DnsDiscoveryService<R> {
362    type Item = DnsDiscoveryEvent;
363
364    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
365        Poll::Ready(Some(ready!(self.get_mut().poll(cx))))
366    }
367}
368
369/// The converted discovered [Enr] object
370#[derive(Debug, Clone, Eq, PartialEq)]
371pub struct DnsNodeRecordUpdate {
372    /// Discovered node and it's addresses
373    pub node_record: NodeRecord,
374    /// The forkid of the node, if present in the ENR
375    pub fork_id: Option<ForkId>,
376    /// Original [`Enr`].
377    pub enr: Enr<SecretKey>,
378}
379
380/// Commands sent from [`DnsDiscoveryHandle`] to [`DnsDiscoveryService`]
381enum DnsDiscoveryCommand {
382    /// Sync a tree
383    SyncTree(LinkEntry),
384    NodeRecordUpdates(oneshot::Sender<ReceiverStream<DnsNodeRecordUpdate>>),
385}
386
387/// Represents dns discovery related update events.
388#[derive(Debug, Clone)]
389pub enum DnsDiscoveryEvent {
390    /// Resolved an Enr entry via DNS.
391    Enr(Enr<SecretKey>),
392}
393
394/// Converts an [Enr] into a [`NodeRecord`]
395fn convert_enr_node_record(enr: &Enr<SecretKey>) -> Option<DnsNodeRecordUpdate> {
396    let node_record = NodeRecord {
397        address: enr.ip4().map(IpAddr::from).or_else(|| enr.ip6().map(IpAddr::from))?,
398        tcp_port: enr.tcp4().or_else(|| enr.tcp6())?,
399        udp_port: enr.udp4().or_else(|| enr.udp6())?,
400        id: pk2id(&enr.public_key()),
401    }
402    .into_ipv4_mapped();
403
404    let fork_id =
405        enr.get_decodable::<EnrForkIdEntry>(b"eth").transpose().ok().flatten().map(Into::into);
406
407    Some(DnsNodeRecordUpdate { node_record, fork_id, enr: enr.clone() })
408}
409
410#[cfg(test)]
411mod tests {
412    use super::*;
413    use crate::tree::TreeRootEntry;
414    use alloy_chains::Chain;
415    use alloy_rlp::{Decodable, Encodable};
416    use enr::EnrKey;
417    use reth_chainspec::MAINNET;
418    use reth_ethereum_forks::{EthereumHardfork, ForkHash};
419    use secp256k1::rand::thread_rng;
420    use std::{future::poll_fn, net::Ipv4Addr};
421
422    #[test]
423    fn test_convert_enr_node_record() {
424        // rig
425        let secret_key = SecretKey::new(&mut secp256k1::rand::thread_rng());
426        let enr = Enr::builder()
427            .ip("127.0.0.1".parse().unwrap())
428            .udp4(9000)
429            .tcp4(30303)
430            .add_value(b"eth", &EnrForkIdEntry::from(MAINNET.latest_fork_id()))
431            .build(&secret_key)
432            .unwrap();
433
434        // test
435        let node_record_update = convert_enr_node_record(&enr).unwrap();
436
437        assert_eq!(node_record_update.node_record.address, "127.0.0.1".parse::<IpAddr>().unwrap());
438        assert_eq!(node_record_update.node_record.tcp_port, 30303);
439        assert_eq!(node_record_update.node_record.udp_port, 9000);
440        assert_eq!(node_record_update.fork_id, Some(MAINNET.latest_fork_id()));
441        assert_eq!(node_record_update.enr, enr);
442    }
443
444    #[test]
445    fn test_decode_and_convert_enr_node_record() {
446        // rig
447
448        let secret_key = SecretKey::new(&mut secp256k1::rand::thread_rng());
449        let enr = Enr::builder()
450            .ip("127.0.0.1".parse().unwrap())
451            .udp4(9000)
452            .tcp4(30303)
453            .add_value(b"eth", &EnrForkIdEntry::from(MAINNET.latest_fork_id()))
454            .add_value(b"opstack", &ForkId { hash: ForkHash(rand::random()), next: rand::random() })
455            .build(&secret_key)
456            .unwrap();
457
458        let mut encoded_enr = vec![];
459        enr.encode(&mut encoded_enr);
460
461        // test
462        let decoded_enr = Enr::decode(&mut &encoded_enr[..]).unwrap();
463
464        let node_record_update = convert_enr_node_record(&decoded_enr).unwrap();
465
466        assert_eq!(node_record_update.node_record.address, "127.0.0.1".parse::<IpAddr>().unwrap());
467        assert_eq!(node_record_update.node_record.tcp_port, 30303);
468        assert_eq!(node_record_update.node_record.udp_port, 9000);
469        assert_eq!(node_record_update.fork_id, Some(MAINNET.latest_fork_id()));
470        assert_eq!(node_record_update.enr, enr);
471    }
472
473    #[tokio::test]
474    async fn test_start_root_sync() {
475        reth_tracing::init_test_tracing();
476
477        let secret_key = SecretKey::new(&mut thread_rng());
478        let resolver = MapResolver::default();
479        let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE";
480        let mut root: TreeRootEntry = s.parse().unwrap();
481        root.sign(&secret_key).unwrap();
482
483        let link =
484            LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() };
485        resolver.insert(link.domain.clone(), root.to_string());
486
487        let mut service = DnsDiscoveryService::new(Arc::new(resolver), Default::default());
488
489        service.sync_tree_with_link(link.clone());
490
491        poll_fn(|cx| {
492            let _ = service.poll(cx);
493            Poll::Ready(())
494        })
495        .await;
496
497        let tree = service.trees.get(&link).unwrap();
498        assert_eq!(tree.root().clone(), root);
499    }
500
501    #[tokio::test(flavor = "multi_thread")]
502    async fn test_get_node() {
503        reth_tracing::init_test_tracing();
504
505        let secret_key = SecretKey::new(&mut thread_rng());
506        let resolver = MapResolver::default();
507        let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE";
508        let mut root: TreeRootEntry = s.parse().unwrap();
509        root.sign(&secret_key).unwrap();
510
511        let link =
512            LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() };
513        resolver.insert(link.domain.clone(), root.to_string());
514
515        let mut builder = Enr::builder();
516        let fork_id = MAINNET.hardfork_fork_id(EthereumHardfork::Frontier).unwrap();
517        builder
518            .ip4(Ipv4Addr::LOCALHOST)
519            .udp4(30303)
520            .tcp4(30303)
521            .add_value(b"eth", &EnrForkIdEntry::from(fork_id));
522        let enr = builder.build(&secret_key).unwrap();
523
524        resolver.insert(format!("{}.{}", root.enr_root.clone(), link.domain), enr.to_base64());
525
526        let mut service = DnsDiscoveryService::new(Arc::new(resolver), Default::default());
527
528        let mut node_records = service.node_record_stream();
529
530        let task = tokio::task::spawn(async move {
531            let record = node_records.next().await.unwrap();
532            assert_eq!(record.fork_id, Some(fork_id));
533        });
534
535        service.sync_tree_with_link(link.clone());
536
537        let event = poll_fn(|cx| service.poll(cx)).await;
538
539        match event {
540            DnsDiscoveryEvent::Enr(discovered) => {
541                assert_eq!(discovered, enr);
542            }
543        }
544
545        poll_fn(|cx| {
546            assert!(service.poll(cx).is_pending());
547            Poll::Ready(())
548        })
549        .await;
550
551        task.await.unwrap();
552    }
553
554    #[tokio::test]
555    async fn test_recheck_tree() {
556        reth_tracing::init_test_tracing();
557
558        let config = DnsDiscoveryConfig {
559            recheck_interval: Duration::from_millis(750),
560            ..Default::default()
561        };
562
563        let secret_key = SecretKey::new(&mut thread_rng());
564        let resolver = Arc::new(MapResolver::default());
565        let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE";
566        let mut root: TreeRootEntry = s.parse().unwrap();
567        root.sign(&secret_key).unwrap();
568
569        let link =
570            LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() };
571        resolver.insert(link.domain.clone(), root.to_string());
572
573        let mut service = DnsDiscoveryService::new(Arc::clone(&resolver), config.clone());
574
575        service.sync_tree_with_link(link.clone());
576
577        poll_fn(|cx| {
578            assert!(service.poll(cx).is_pending());
579            Poll::Ready(())
580        })
581        .await;
582
583        // await recheck timeout
584        tokio::time::sleep(config.recheck_interval).await;
585
586        let enr = Enr::empty(&secret_key).unwrap();
587        resolver.insert(format!("{}.{}", root.enr_root.clone(), link.domain), enr.to_base64());
588
589        let event = poll_fn(|cx| service.poll(cx)).await;
590
591        match event {
592            DnsDiscoveryEvent::Enr(discovered) => {
593                assert_eq!(discovered, enr);
594            }
595        }
596
597        poll_fn(|cx| {
598            assert!(service.poll(cx).is_pending());
599            Poll::Ready(())
600        })
601        .await;
602    }
603
604    #[tokio::test]
605    #[ignore]
606    async fn test_dns_resolver() {
607        reth_tracing::init_test_tracing();
608
609        let mut service = DnsDiscoveryService::new(
610            Arc::new(DnsResolver::from_system_conf().unwrap()),
611            Default::default(),
612        );
613
614        service.sync_tree(&Chain::mainnet().public_dns_network_protocol().unwrap()).unwrap();
615
616        while let Some(event) = service.next().await {
617            match event {
618                DnsDiscoveryEvent::Enr(enr) => {
619                    println!("discovered enr {}", enr.to_base64());
620                }
621            }
622        }
623    }
624}