Skip to main content

reth_network/
trusted_peers_resolver.rs

1//! Periodically resolves DNS records for a set of trusted peers and emits updates as they complete
2
3use futures::{future::BoxFuture, ready, stream::FuturesUnordered, FutureExt, StreamExt};
4use reth_network_peers::{NodeRecord, PeerId, TrustedPeer};
5use std::{
6    io,
7    task::{Context, Poll},
8};
9use tokio::time::Interval;
10use tracing::warn;
11
12/// `TrustedPeersResolver` periodically spawns DNS resolution tasks for trusted peers.
13/// It returns a resolved (`PeerId`, `NodeRecord`) update when one of its in‑flight tasks completes.
14#[derive(Debug)]
15pub struct TrustedPeersResolver {
16    /// The list of trusted peers to resolve.
17    pub trusted_peers: Vec<TrustedPeer>,
18    /// The timer that triggers a new resolution cycle.
19    pub interval: Interval,
20    /// Futures for currently in‑flight resolution tasks.
21    pub pending: FuturesUnordered<BoxFuture<'static, (PeerId, Result<NodeRecord, io::Error>)>>,
22}
23
24impl TrustedPeersResolver {
25    /// Create a new resolver with the given trusted peers and resolution interval.
26    pub fn new(trusted_peers: Vec<TrustedPeer>, resolve_interval: Interval) -> Self {
27        Self { trusted_peers, interval: resolve_interval, pending: FuturesUnordered::new() }
28    }
29
30    /// Update the resolution interval (useful for testing purposes)
31    #[allow(dead_code)]
32    pub fn set_interval(&mut self, interval: Interval) {
33        self.interval = interval;
34    }
35
36    /// Remove a trusted peer from the resolver.
37    pub fn remove(&mut self, peer_id: PeerId) {
38        self.trusted_peers.retain(|trusted| trusted.id != peer_id);
39    }
40
41    /// Poll the resolver.
42    /// When the interval ticks, new resolution futures for each trusted peer are spawned.
43    /// If a future completes successfully, it returns the resolved (`PeerId`, `NodeRecord`).
44    pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<(PeerId, NodeRecord)> {
45        if self.trusted_peers.is_empty() {
46            return Poll::Pending;
47        }
48
49        if self.interval.poll_tick(cx).is_ready() {
50            self.pending.clear();
51
52            for trusted in self.trusted_peers.iter().cloned() {
53                let peer_id = trusted.id;
54                let task = async move {
55                    let result = trusted.resolve().await;
56                    (peer_id, result)
57                }
58                .boxed();
59                self.pending.push(task);
60            }
61        }
62
63        match ready!(self.pending.poll_next_unpin(cx)) {
64            Some((peer_id, Ok(record))) => Poll::Ready((peer_id, record)),
65            Some((peer_id, Err(e))) => {
66                warn!(target: "net::peers", "Failed to resolve trusted peer {:?}: {:?}", peer_id, e);
67                Poll::Pending
68            }
69            None => Poll::Pending,
70        }
71    }
72}