#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
pub use crate::resolver::{DnsResolver, MapResolver, Resolver};
use crate::{
query::{QueryOutcome, QueryPool, ResolveEntryResult, ResolveRootResult},
sync::{ResolveKind, SyncAction},
tree::{DnsEntry, LinkEntry},
};
pub use config::DnsDiscoveryConfig;
use enr::Enr;
pub use error::ParseDnsEntryError;
use reth_ethereum_forks::{EnrForkIdEntry, ForkId};
use reth_network_peers::{pk2id, NodeRecord};
use schnellru::{ByLength, LruMap};
use secp256k1::SecretKey;
use std::{
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
net::IpAddr,
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
time::{Duration, Instant},
};
use sync::SyncTree;
use tokio::{
sync::{
mpsc,
mpsc::{error::TrySendError, UnboundedSender},
oneshot,
},
task::JoinHandle,
};
use tokio_stream::{
wrappers::{ReceiverStream, UnboundedReceiverStream},
Stream, StreamExt,
};
use tracing::{debug, trace};
mod config;
mod error;
mod query;
pub mod resolver;
mod sync;
pub mod tree;
#[derive(Clone, Debug)]
pub struct DnsDiscoveryHandle {
to_service: UnboundedSender<DnsDiscoveryCommand>,
}
impl DnsDiscoveryHandle {
pub fn sync_tree(&self, link: &str) -> Result<(), ParseDnsEntryError> {
self.sync_tree_with_link(link.parse()?);
Ok(())
}
pub fn sync_tree_with_link(&self, link: LinkEntry) {
let _ = self.to_service.send(DnsDiscoveryCommand::SyncTree(link));
}
pub async fn node_record_stream(
&self,
) -> Result<ReceiverStream<DnsNodeRecordUpdate>, oneshot::error::RecvError> {
let (tx, rx) = oneshot::channel();
let cmd = DnsDiscoveryCommand::NodeRecordUpdates(tx);
let _ = self.to_service.send(cmd);
rx.await
}
}
#[must_use = "Service does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct DnsDiscoveryService<R: Resolver = DnsResolver> {
command_tx: UnboundedSender<DnsDiscoveryCommand>,
command_rx: UnboundedReceiverStream<DnsDiscoveryCommand>,
node_record_listeners: Vec<mpsc::Sender<DnsNodeRecordUpdate>>,
trees: HashMap<LinkEntry, SyncTree>,
queries: QueryPool<R, SecretKey>,
dns_record_cache: LruMap<String, DnsEntry<SecretKey>>,
queued_events: VecDeque<DnsDiscoveryEvent>,
recheck_interval: Duration,
bootstrap_dns_networks: HashSet<LinkEntry>,
}
impl<R: Resolver> DnsDiscoveryService<R> {
pub fn new(resolver: Arc<R>, config: DnsDiscoveryConfig) -> Self {
let DnsDiscoveryConfig {
lookup_timeout,
max_requests_per_sec,
recheck_interval,
dns_record_cache_limit,
bootstrap_dns_networks,
} = config;
let queries = QueryPool::new(resolver, max_requests_per_sec, lookup_timeout);
let (command_tx, command_rx) = mpsc::unbounded_channel();
Self {
command_tx,
command_rx: UnboundedReceiverStream::new(command_rx),
node_record_listeners: Default::default(),
trees: Default::default(),
queries,
dns_record_cache: LruMap::new(ByLength::new(dns_record_cache_limit.get())),
queued_events: Default::default(),
recheck_interval,
bootstrap_dns_networks: bootstrap_dns_networks.unwrap_or_default(),
}
}
pub fn spawn(mut self) -> JoinHandle<()> {
tokio::task::spawn(async move {
self.bootstrap();
while let Some(event) = self.next().await {
trace!(target: "disc::dns", ?event, "processed");
}
})
}
pub fn bootstrap(&mut self) {
for link in self.bootstrap_dns_networks.clone() {
self.sync_tree_with_link(link);
}
}
pub fn new_pair(resolver: Arc<R>, config: DnsDiscoveryConfig) -> (Self, DnsDiscoveryHandle) {
let service = Self::new(resolver, config);
let handle = service.handle();
(service, handle)
}
pub fn handle(&self) -> DnsDiscoveryHandle {
DnsDiscoveryHandle { to_service: self.command_tx.clone() }
}
pub fn node_record_stream(&mut self) -> ReceiverStream<DnsNodeRecordUpdate> {
let (tx, rx) = mpsc::channel(256);
self.node_record_listeners.push(tx);
ReceiverStream::new(rx)
}
fn notify(&mut self, record: DnsNodeRecordUpdate) {
self.node_record_listeners.retain_mut(|listener| match listener.try_send(record.clone()) {
Ok(()) => true,
Err(err) => match err {
TrySendError::Full(_) => true,
TrySendError::Closed(_) => false,
},
});
}
pub fn sync_tree(&mut self, link: &str) -> Result<(), ParseDnsEntryError> {
self.sync_tree_with_link(link.parse()?);
Ok(())
}
pub fn sync_tree_with_link(&mut self, link: LinkEntry) {
self.queries.resolve_root(link);
}
fn resolve_entry(&mut self, link: LinkEntry<SecretKey>, hash: String, kind: ResolveKind) {
if let Some(entry) = self.dns_record_cache.get(&hash).cloned() {
let cached = ResolveEntryResult { entry: Some(Ok(entry)), link, hash, kind };
self.on_resolved_entry(cached);
return
}
self.queries.resolve_entry(link, hash, kind)
}
fn on_resolved_root(&mut self, resp: ResolveRootResult<SecretKey>) {
match resp {
Ok((root, link)) => match self.trees.entry(link.clone()) {
Entry::Occupied(mut entry) => {
entry.get_mut().update_root(root);
}
Entry::Vacant(entry) => {
entry.insert(SyncTree::new(root, link));
}
},
Err((err, link)) => {
debug!(target: "disc::dns",%err, ?link, "Failed to lookup root")
}
}
}
fn on_resolved_enr(&mut self, enr: Enr<SecretKey>) {
if let Some(record) = convert_enr_node_record(&enr) {
self.notify(record);
}
self.queued_events.push_back(DnsDiscoveryEvent::Enr(enr))
}
fn on_resolved_entry(&mut self, resp: ResolveEntryResult<SecretKey>) {
let ResolveEntryResult { entry, link, hash, kind } = resp;
match entry {
Some(Err(err)) => {
debug!(target: "disc::dns",%err, domain=%link.domain, ?hash, "Failed to lookup entry")
}
None => {
trace!(target: "disc::dns",domain=%link.domain, ?hash, "No dns entry")
}
Some(Ok(entry)) => {
self.dns_record_cache.insert(hash.clone(), entry.clone());
match entry {
DnsEntry::Root(root) => {
debug!(target: "disc::dns",%root, domain=%link.domain, ?hash, "resolved unexpected root entry");
}
DnsEntry::Link(link_entry) => {
if kind.is_link() {
if let Some(tree) = self.trees.get_mut(&link) {
tree.resolved_links_mut().insert(hash, link_entry.clone());
}
self.sync_tree_with_link(link_entry)
} else {
debug!(target: "disc::dns",%link_entry, domain=%link.domain, ?hash, "resolved unexpected Link entry");
}
}
DnsEntry::Branch(branch_entry) => {
if let Some(tree) = self.trees.get_mut(&link) {
tree.extend_children(kind, branch_entry.children)
}
}
DnsEntry::Node(entry) => {
if kind.is_link() {
debug!(target: "disc::dns",domain=%link.domain, ?hash, "resolved unexpected enr entry");
} else {
self.on_resolved_enr(entry.enr)
}
}
}
}
}
}
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DnsDiscoveryEvent> {
loop {
if let Some(event) = self.queued_events.pop_front() {
return Poll::Ready(event)
}
while let Poll::Ready(Some(cmd)) = Pin::new(&mut self.command_rx).poll_next(cx) {
match cmd {
DnsDiscoveryCommand::SyncTree(link) => {
self.sync_tree_with_link(link);
}
DnsDiscoveryCommand::NodeRecordUpdates(tx) => {
let _ = tx.send(self.node_record_stream());
}
}
}
while let Poll::Ready(outcome) = self.queries.poll(cx) {
match outcome {
QueryOutcome::Root(resp) => self.on_resolved_root(resp),
QueryOutcome::Entry(resp) => self.on_resolved_entry(resp),
}
}
let mut progress = false;
let now = Instant::now();
let mut pending_resolves = Vec::new();
let mut pending_updates = Vec::new();
for tree in self.trees.values_mut() {
while let Some(action) = tree.poll(now, self.recheck_interval) {
progress = true;
match action {
SyncAction::UpdateRoot => {
pending_updates.push(tree.link().clone());
}
SyncAction::Enr(hash) => {
pending_resolves.push((tree.link().clone(), hash, ResolveKind::Enr));
}
SyncAction::Link(hash) => {
pending_resolves.push((tree.link().clone(), hash, ResolveKind::Link));
}
}
}
}
for (domain, hash, kind) in pending_resolves {
self.resolve_entry(domain, hash, kind)
}
for link in pending_updates {
self.sync_tree_with_link(link)
}
if !progress && self.queued_events.is_empty() {
return Poll::Pending
}
}
}
}
impl<R: Resolver> Stream for DnsDiscoveryService<R> {
type Item = DnsDiscoveryEvent;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(Some(ready!(self.get_mut().poll(cx))))
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct DnsNodeRecordUpdate {
pub node_record: NodeRecord,
pub fork_id: Option<ForkId>,
pub enr: Enr<SecretKey>,
}
enum DnsDiscoveryCommand {
SyncTree(LinkEntry),
NodeRecordUpdates(oneshot::Sender<ReceiverStream<DnsNodeRecordUpdate>>),
}
#[derive(Debug, Clone)]
pub enum DnsDiscoveryEvent {
Enr(Enr<SecretKey>),
}
fn convert_enr_node_record(enr: &Enr<SecretKey>) -> Option<DnsNodeRecordUpdate> {
let node_record = NodeRecord {
address: enr.ip4().map(IpAddr::from).or_else(|| enr.ip6().map(IpAddr::from))?,
tcp_port: enr.tcp4().or_else(|| enr.tcp6())?,
udp_port: enr.udp4().or_else(|| enr.udp6())?,
id: pk2id(&enr.public_key()),
}
.into_ipv4_mapped();
let fork_id =
enr.get_decodable::<EnrForkIdEntry>(b"eth").transpose().ok().flatten().map(Into::into);
Some(DnsNodeRecordUpdate { node_record, fork_id, enr: enr.clone() })
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tree::TreeRootEntry;
use alloy_chains::Chain;
use alloy_rlp::{Decodable, Encodable};
use enr::EnrKey;
use reth_chainspec::MAINNET;
use reth_ethereum_forks::{EthereumHardfork, ForkHash};
use secp256k1::rand::thread_rng;
use std::{future::poll_fn, net::Ipv4Addr};
#[test]
fn test_convert_enr_node_record() {
let secret_key = SecretKey::new(&mut secp256k1::rand::thread_rng());
let enr = Enr::builder()
.ip("127.0.0.1".parse().unwrap())
.udp4(9000)
.tcp4(30303)
.add_value(b"eth", &EnrForkIdEntry::from(MAINNET.latest_fork_id()))
.build(&secret_key)
.unwrap();
let node_record_update = convert_enr_node_record(&enr).unwrap();
assert_eq!(node_record_update.node_record.address, "127.0.0.1".parse::<IpAddr>().unwrap());
assert_eq!(node_record_update.node_record.tcp_port, 30303);
assert_eq!(node_record_update.node_record.udp_port, 9000);
assert_eq!(node_record_update.fork_id, Some(MAINNET.latest_fork_id()));
assert_eq!(node_record_update.enr, enr);
}
#[test]
fn test_decode_and_convert_enr_node_record() {
let secret_key = SecretKey::new(&mut secp256k1::rand::thread_rng());
let enr = Enr::builder()
.ip("127.0.0.1".parse().unwrap())
.udp4(9000)
.tcp4(30303)
.add_value(b"eth", &EnrForkIdEntry::from(MAINNET.latest_fork_id()))
.add_value(b"opstack", &ForkId { hash: ForkHash(rand::random()), next: rand::random() })
.build(&secret_key)
.unwrap();
let mut encoded_enr = vec![];
enr.encode(&mut encoded_enr);
let decoded_enr = Enr::decode(&mut &encoded_enr[..]).unwrap();
let node_record_update = convert_enr_node_record(&decoded_enr).unwrap();
assert_eq!(node_record_update.node_record.address, "127.0.0.1".parse::<IpAddr>().unwrap());
assert_eq!(node_record_update.node_record.tcp_port, 30303);
assert_eq!(node_record_update.node_record.udp_port, 9000);
assert_eq!(node_record_update.fork_id, Some(MAINNET.latest_fork_id()));
assert_eq!(node_record_update.enr, enr);
}
#[tokio::test]
async fn test_start_root_sync() {
reth_tracing::init_test_tracing();
let secret_key = SecretKey::new(&mut thread_rng());
let resolver = MapResolver::default();
let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE";
let mut root: TreeRootEntry = s.parse().unwrap();
root.sign(&secret_key).unwrap();
let link =
LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() };
resolver.insert(link.domain.clone(), root.to_string());
let mut service = DnsDiscoveryService::new(Arc::new(resolver), Default::default());
service.sync_tree_with_link(link.clone());
poll_fn(|cx| {
let _ = service.poll(cx);
Poll::Ready(())
})
.await;
let tree = service.trees.get(&link).unwrap();
assert_eq!(tree.root().clone(), root);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_get_node() {
reth_tracing::init_test_tracing();
let secret_key = SecretKey::new(&mut thread_rng());
let resolver = MapResolver::default();
let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE";
let mut root: TreeRootEntry = s.parse().unwrap();
root.sign(&secret_key).unwrap();
let link =
LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() };
resolver.insert(link.domain.clone(), root.to_string());
let mut builder = Enr::builder();
let fork_id = MAINNET.hardfork_fork_id(EthereumHardfork::Frontier).unwrap();
builder
.ip4(Ipv4Addr::LOCALHOST)
.udp4(30303)
.tcp4(30303)
.add_value(b"eth", &EnrForkIdEntry::from(fork_id));
let enr = builder.build(&secret_key).unwrap();
resolver.insert(format!("{}.{}", root.enr_root.clone(), link.domain), enr.to_base64());
let mut service = DnsDiscoveryService::new(Arc::new(resolver), Default::default());
let mut node_records = service.node_record_stream();
let task = tokio::task::spawn(async move {
let record = node_records.next().await.unwrap();
assert_eq!(record.fork_id, Some(fork_id));
});
service.sync_tree_with_link(link.clone());
let event = poll_fn(|cx| service.poll(cx)).await;
match event {
DnsDiscoveryEvent::Enr(discovered) => {
assert_eq!(discovered, enr);
}
}
poll_fn(|cx| {
assert!(service.poll(cx).is_pending());
Poll::Ready(())
})
.await;
task.await.unwrap();
}
#[tokio::test]
async fn test_recheck_tree() {
reth_tracing::init_test_tracing();
let config = DnsDiscoveryConfig {
recheck_interval: Duration::from_millis(750),
..Default::default()
};
let secret_key = SecretKey::new(&mut thread_rng());
let resolver = Arc::new(MapResolver::default());
let s = "enrtree-root:v1 e=QFT4PBCRX4XQCV3VUYJ6BTCEPU l=JGUFMSAGI7KZYB3P7IZW4S5Y3A seq=3 sig=3FmXuVwpa8Y7OstZTx9PIb1mt8FrW7VpDOFv4AaGCsZ2EIHmhraWhe4NxYhQDlw5MjeFXYMbJjsPeKlHzmJREQE";
let mut root: TreeRootEntry = s.parse().unwrap();
root.sign(&secret_key).unwrap();
let link =
LinkEntry { domain: "nodes.example.org".to_string(), pubkey: secret_key.public() };
resolver.insert(link.domain.clone(), root.to_string());
let mut service = DnsDiscoveryService::new(Arc::clone(&resolver), config.clone());
service.sync_tree_with_link(link.clone());
poll_fn(|cx| {
assert!(service.poll(cx).is_pending());
Poll::Ready(())
})
.await;
tokio::time::sleep(config.recheck_interval).await;
let enr = Enr::empty(&secret_key).unwrap();
resolver.insert(format!("{}.{}", root.enr_root.clone(), link.domain), enr.to_base64());
let event = poll_fn(|cx| service.poll(cx)).await;
match event {
DnsDiscoveryEvent::Enr(discovered) => {
assert_eq!(discovered, enr);
}
}
poll_fn(|cx| {
assert!(service.poll(cx).is_pending());
Poll::Ready(())
})
.await;
}
#[tokio::test]
#[ignore]
async fn test_dns_resolver() {
reth_tracing::init_test_tracing();
let mut service = DnsDiscoveryService::new(
Arc::new(DnsResolver::from_system_conf().unwrap()),
Default::default(),
);
service.sync_tree(&Chain::mainnet().public_dns_network_protocol().unwrap()).unwrap();
while let Some(event) = service.next().await {
match event {
DnsDiscoveryEvent::Enr(enr) => {
println!("discovered enr {}", enr.to_base64());
}
}
}
}
}