reth_network/
discovery.rs

1//! Discovery support for the network.
2
3use crate::{
4    cache::LruMap,
5    error::{NetworkError, ServiceKind},
6};
7use enr::Enr;
8use futures::StreamExt;
9use reth_discv4::{DiscoveryUpdate, Discv4, Discv4Config};
10use reth_discv5::{DiscoveredPeer, Discv5};
11use reth_dns_discovery::{
12    DnsDiscoveryConfig, DnsDiscoveryHandle, DnsDiscoveryService, DnsNodeRecordUpdate, DnsResolver,
13};
14use reth_ethereum_forks::{EnrForkIdEntry, ForkId};
15use reth_network_api::{DiscoveredEvent, DiscoveryEvent};
16use reth_network_peers::{NodeRecord, PeerId};
17use reth_network_types::PeerAddr;
18use secp256k1::SecretKey;
19use std::{
20    collections::VecDeque,
21    net::{IpAddr, SocketAddr},
22    pin::Pin,
23    sync::Arc,
24    task::{ready, Context, Poll},
25};
26use tokio::{sync::mpsc, task::JoinHandle};
27use tokio_stream::{wrappers::ReceiverStream, Stream};
28use tracing::{debug, trace};
29
30/// Default max capacity for cache of discovered peers.
31///
32/// Default is 10 000 peers.
33pub const DEFAULT_MAX_CAPACITY_DISCOVERED_PEERS_CACHE: u32 = 10_000;
34
35/// An abstraction over the configured discovery protocol.
36///
37/// Listens for new discovered nodes and emits events for discovered nodes and their
38/// address.
39#[derive(Debug)]
40pub struct Discovery {
41    /// All nodes discovered via discovery protocol.
42    ///
43    /// These nodes can be ephemeral and are updated via the discovery protocol.
44    discovered_nodes: LruMap<PeerId, PeerAddr>,
45    /// Local ENR of the discovery v4 service (discv5 ENR has same [`PeerId`]).
46    local_enr: NodeRecord,
47    /// Handler to interact with the Discovery v4 service
48    discv4: Option<Discv4>,
49    /// All KAD table updates from the discv4 service.
50    discv4_updates: Option<ReceiverStream<DiscoveryUpdate>>,
51    /// The handle to the spawned discv4 service
52    _discv4_service: Option<JoinHandle<()>>,
53    /// Handler to interact with the Discovery v5 service
54    discv5: Option<Discv5>,
55    /// All KAD table updates from the discv5 service.
56    discv5_updates: Option<ReceiverStream<discv5::Event>>,
57    /// Handler to interact with the DNS discovery service
58    _dns_discovery: Option<DnsDiscoveryHandle>,
59    /// Updates from the DNS discovery service.
60    dns_discovery_updates: Option<ReceiverStream<DnsNodeRecordUpdate>>,
61    /// The handle to the spawned DNS discovery service
62    _dns_disc_service: Option<JoinHandle<()>>,
63    /// Events buffered until polled.
64    queued_events: VecDeque<DiscoveryEvent>,
65    /// List of listeners subscribed to discovery events.
66    discovery_listeners: Vec<mpsc::UnboundedSender<DiscoveryEvent>>,
67}
68
69impl Discovery {
70    /// Spawns the discovery service.
71    ///
72    /// This will spawn the [`reth_discv4::Discv4Service`] onto a new task and establish a listener
73    /// channel to receive all discovered nodes.
74    pub async fn new(
75        tcp_addr: SocketAddr,
76        discovery_v4_addr: SocketAddr,
77        sk: SecretKey,
78        discv4_config: Option<Discv4Config>,
79        discv5_config: Option<reth_discv5::Config>, // contains discv5 listen address
80        dns_discovery_config: Option<DnsDiscoveryConfig>,
81    ) -> Result<Self, NetworkError> {
82        // setup discv4 with the discovery address and tcp port
83        let local_enr =
84            NodeRecord::from_secret_key(discovery_v4_addr, &sk).with_tcp_port(tcp_addr.port());
85
86        let discv4_future = async {
87            let Some(disc_config) = discv4_config else { return Ok((None, None, None)) };
88            let (discv4, mut discv4_service) =
89                Discv4::bind(discovery_v4_addr, local_enr, sk, disc_config).await.map_err(
90                    |err| {
91                        NetworkError::from_io_error(err, ServiceKind::Discovery(discovery_v4_addr))
92                    },
93                )?;
94            let discv4_updates = discv4_service.update_stream();
95            // spawn the service
96            let discv4_service = discv4_service.spawn();
97
98            debug!(target:"net", ?discovery_v4_addr, "started discovery v4");
99
100            Ok((Some(discv4), Some(discv4_updates), Some(discv4_service)))
101        };
102
103        let discv5_future = async {
104            let Some(config) = discv5_config else { return Ok::<_, NetworkError>((None, None)) };
105            let (discv5, discv5_updates) = Discv5::start(&sk, config).await?;
106            debug!(target:"net", discovery_v5_enr=? discv5.local_enr(), "started discovery v5");
107            Ok((Some(discv5), Some(discv5_updates.into())))
108        };
109
110        let ((discv4, discv4_updates, _discv4_service), (discv5, discv5_updates)) =
111            tokio::try_join!(discv4_future, discv5_future)?;
112
113        // setup DNS discovery
114        let (_dns_discovery, dns_discovery_updates, _dns_disc_service) =
115            if let Some(dns_config) = dns_discovery_config {
116                let (mut service, dns_disc) = DnsDiscoveryService::new_pair(
117                    Arc::new(DnsResolver::from_system_conf()?),
118                    dns_config,
119                );
120                let dns_discovery_updates = service.node_record_stream();
121                let dns_disc_service = service.spawn();
122                (Some(dns_disc), Some(dns_discovery_updates), Some(dns_disc_service))
123            } else {
124                (None, None, None)
125            };
126
127        Ok(Self {
128            discovery_listeners: Default::default(),
129            local_enr,
130            discv4,
131            discv4_updates,
132            _discv4_service,
133            discv5,
134            discv5_updates,
135            discovered_nodes: LruMap::new(DEFAULT_MAX_CAPACITY_DISCOVERED_PEERS_CACHE),
136            queued_events: Default::default(),
137            _dns_disc_service,
138            _dns_discovery,
139            dns_discovery_updates,
140        })
141    }
142
143    /// Registers a listener for receiving [`DiscoveryEvent`] updates.
144    pub(crate) fn add_listener(&mut self, tx: mpsc::UnboundedSender<DiscoveryEvent>) {
145        self.discovery_listeners.push(tx);
146    }
147
148    /// Notifies all registered listeners with the provided `event`.
149    #[inline]
150    fn notify_listeners(&mut self, event: &DiscoveryEvent) {
151        self.discovery_listeners.retain_mut(|listener| listener.send(event.clone()).is_ok());
152    }
153
154    /// Updates the `eth:ForkId` field in discv4/discv5.
155    pub(crate) fn update_fork_id(&self, fork_id: ForkId) {
156        if let Some(discv4) = &self.discv4 {
157            // use forward-compatible forkid entry
158            discv4.set_eip868_rlp(b"eth".to_vec(), EnrForkIdEntry::from(fork_id))
159        }
160        if let Some(discv5) = &self.discv5 {
161            discv5
162                .encode_and_set_eip868_in_local_enr(b"eth".to_vec(), EnrForkIdEntry::from(fork_id))
163        }
164    }
165
166    /// Bans the [`IpAddr`] in the discovery service.
167    pub(crate) fn ban_ip(&self, ip: IpAddr) {
168        if let Some(discv4) = &self.discv4 {
169            discv4.ban_ip(ip)
170        }
171        if let Some(discv5) = &self.discv5 {
172            discv5.ban_ip(ip)
173        }
174    }
175
176    /// Bans the [`PeerId`] and [`IpAddr`] in the discovery service.
177    pub(crate) fn ban(&self, peer_id: PeerId, ip: IpAddr) {
178        if let Some(discv4) = &self.discv4 {
179            discv4.ban(peer_id, ip)
180        }
181        if let Some(discv5) = &self.discv5 {
182            discv5.ban(peer_id, ip)
183        }
184    }
185
186    /// Returns a shared reference to the discv4.
187    pub fn discv4(&self) -> Option<Discv4> {
188        self.discv4.clone()
189    }
190
191    /// Returns the id with which the local node identifies itself in the network
192    pub(crate) const fn local_id(&self) -> PeerId {
193        self.local_enr.id // local discv4 and discv5 have same id, since signed with same secret key
194    }
195
196    /// Add a node to the discv4 table.
197    pub(crate) fn add_discv4_node(&self, node: NodeRecord) {
198        if let Some(discv4) = &self.discv4 {
199            discv4.add_node(node);
200        }
201    }
202
203    /// Returns discv5 handle.
204    pub fn discv5(&self) -> Option<Discv5> {
205        self.discv5.clone()
206    }
207
208    /// Add a node to the discv4 table.
209    pub(crate) fn add_discv5_node(&self, enr: Enr<SecretKey>) -> Result<(), NetworkError> {
210        if let Some(discv5) = &self.discv5 {
211            discv5.add_node(enr).map_err(NetworkError::Discv5Error)?;
212        }
213
214        Ok(())
215    }
216
217    /// Processes an incoming [`NodeRecord`] update from a discovery service
218    fn on_node_record_update(&mut self, record: NodeRecord, fork_id: Option<ForkId>) {
219        let peer_id = record.id;
220        let tcp_addr = record.tcp_addr();
221        if tcp_addr.port() == 0 {
222            // useless peer for p2p
223            return
224        }
225        let udp_addr = record.udp_addr();
226        let addr = PeerAddr::new(tcp_addr, Some(udp_addr));
227        _ =
228            self.discovered_nodes.get_or_insert(peer_id, || {
229                self.queued_events.push_back(DiscoveryEvent::NewNode(
230                    DiscoveredEvent::EventQueued { peer_id, addr, fork_id },
231                ));
232
233                addr
234            })
235    }
236
237    fn on_discv4_update(&mut self, update: DiscoveryUpdate) {
238        match update {
239            DiscoveryUpdate::Added(record) | DiscoveryUpdate::DiscoveredAtCapacity(record) => {
240                self.on_node_record_update(record, None);
241            }
242            DiscoveryUpdate::EnrForkId(node, fork_id) => {
243                self.queued_events.push_back(DiscoveryEvent::EnrForkId(node.id, fork_id))
244            }
245            DiscoveryUpdate::Removed(peer_id) => {
246                self.discovered_nodes.remove(&peer_id);
247            }
248            DiscoveryUpdate::Batch(updates) => {
249                for update in updates {
250                    self.on_discv4_update(update);
251                }
252            }
253        }
254    }
255
256    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DiscoveryEvent> {
257        loop {
258            // Drain all buffered events first
259            if let Some(event) = self.queued_events.pop_front() {
260                self.notify_listeners(&event);
261                return Poll::Ready(event)
262            }
263
264            // drain the discv4 update stream
265            while let Some(Poll::Ready(Some(update))) =
266                self.discv4_updates.as_mut().map(|updates| updates.poll_next_unpin(cx))
267            {
268                self.on_discv4_update(update)
269            }
270
271            // drain the discv5 update stream
272            while let Some(Poll::Ready(Some(update))) =
273                self.discv5_updates.as_mut().map(|updates| updates.poll_next_unpin(cx))
274            {
275                if let Some(discv5) = self.discv5.as_mut() &&
276                    let Some(DiscoveredPeer { node_record, fork_id }) =
277                        discv5.on_discv5_update(update)
278                {
279                    self.on_node_record_update(node_record, fork_id);
280                }
281            }
282
283            // drain the dns update stream
284            while let Some(Poll::Ready(Some(update))) =
285                self.dns_discovery_updates.as_mut().map(|updates| updates.poll_next_unpin(cx))
286            {
287                self.add_discv4_node(update.node_record);
288                if let Err(err) = self.add_discv5_node(update.enr) {
289                    trace!(target: "net::discovery",
290                        %err,
291                        "failed adding node discovered by dns to discv5"
292                    );
293                }
294                self.on_node_record_update(update.node_record, update.fork_id);
295            }
296
297            if self.queued_events.is_empty() {
298                return Poll::Pending
299            }
300        }
301    }
302}
303
304impl Stream for Discovery {
305    type Item = DiscoveryEvent;
306
307    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
308        Poll::Ready(Some(ready!(self.get_mut().poll(cx))))
309    }
310}
311
312#[cfg(test)]
313impl Discovery {
314    /// Returns a Discovery instance that does nothing and is intended for testing purposes.
315    ///
316    /// NOTE: This instance does nothing
317    pub(crate) fn noop() -> Self {
318        let (_discovery_listeners, _): (mpsc::UnboundedSender<DiscoveryEvent>, _) =
319            mpsc::unbounded_channel();
320
321        Self {
322            discovered_nodes: LruMap::new(0),
323            local_enr: NodeRecord {
324                address: IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
325                tcp_port: 0,
326                udp_port: 0,
327                id: PeerId::random(),
328            },
329            discv4: Default::default(),
330            discv4_updates: Default::default(),
331            discv5: None,
332            discv5_updates: None,
333            queued_events: Default::default(),
334            _discv4_service: Default::default(),
335            _dns_discovery: None,
336            dns_discovery_updates: None,
337            _dns_disc_service: None,
338            discovery_listeners: Default::default(),
339        }
340    }
341}
342
343#[cfg(test)]
344mod tests {
345    use super::*;
346    use secp256k1::SECP256K1;
347    use std::net::{Ipv4Addr, SocketAddrV4};
348
349    #[tokio::test(flavor = "multi_thread")]
350    async fn test_discovery_setup() {
351        let (secret_key, _) = SECP256K1.generate_keypair(&mut rand_08::thread_rng());
352        let discovery_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
353        let _discovery = Discovery::new(
354            discovery_addr,
355            discovery_addr,
356            secret_key,
357            Default::default(),
358            None,
359            Default::default(),
360        )
361        .await
362        .unwrap();
363    }
364
365    use reth_discv4::Discv4ConfigBuilder;
366    use reth_discv5::{enr::EnrCombinedKeyWrapper, enr_to_discv4_id};
367    use tracing::trace;
368
369    async fn start_discovery_node(udp_port_discv4: u16, udp_port_discv5: u16) -> Discovery {
370        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
371
372        let discv4_addr = format!("127.0.0.1:{udp_port_discv4}").parse().unwrap();
373        let discv5_addr: SocketAddr = format!("127.0.0.1:{udp_port_discv5}").parse().unwrap();
374
375        // disable `NatResolver`
376        let discv4_config = Discv4ConfigBuilder::default().external_ip_resolver(None).build();
377
378        let discv5_listen_config = discv5::ListenConfig::from(discv5_addr);
379        let discv5_config = reth_discv5::Config::builder(discv5_addr)
380            .discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build())
381            .build();
382
383        Discovery::new(
384            discv4_addr,
385            discv4_addr,
386            secret_key,
387            Some(discv4_config),
388            Some(discv5_config),
389            None,
390        )
391        .await
392        .expect("should build discv5 with discv4 downgrade")
393    }
394
395    #[tokio::test(flavor = "multi_thread")]
396    async fn discv5_and_discv4_same_pk() {
397        reth_tracing::init_test_tracing();
398
399        // set up test
400        let mut node_1 = start_discovery_node(40014, 40015).await;
401        let discv4_enr_1 = node_1.discv4.as_ref().unwrap().node_record();
402        let discv5_enr_node_1 =
403            node_1.discv5.as_ref().unwrap().with_discv5(|discv5| discv5.local_enr());
404        let discv4_id_1 = discv4_enr_1.id;
405        let discv5_id_1 = discv5_enr_node_1.node_id();
406
407        let mut node_2 = start_discovery_node(40024, 40025).await;
408        let discv4_enr_2 = node_2.discv4.as_ref().unwrap().node_record();
409        let discv5_enr_node_2 =
410            node_2.discv5.as_ref().unwrap().with_discv5(|discv5| discv5.local_enr());
411        let discv4_id_2 = discv4_enr_2.id;
412        let discv5_id_2 = discv5_enr_node_2.node_id();
413
414        trace!(target: "net::discovery::tests",
415            node_1_node_id=format!("{:#}", discv5_id_1),
416            node_2_node_id=format!("{:#}", discv5_id_2),
417            "started nodes"
418        );
419
420        // test
421
422        // assert discovery version 4 and version 5 nodes have same id
423        assert_eq!(discv4_id_1, enr_to_discv4_id(&discv5_enr_node_1).unwrap());
424        assert_eq!(discv4_id_2, enr_to_discv4_id(&discv5_enr_node_2).unwrap());
425
426        // add node_2:discv4 manually to node_1:discv4
427        node_1.add_discv4_node(discv4_enr_2);
428
429        // verify node_2:discv4 discovered node_1:discv4 and vv
430        let event_node_1 = node_1.next().await.unwrap();
431        let event_node_2 = node_2.next().await.unwrap();
432
433        assert_eq!(
434            DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued {
435                peer_id: discv4_id_2,
436                addr: PeerAddr::new(discv4_enr_2.tcp_addr(), Some(discv4_enr_2.udp_addr())),
437                fork_id: None
438            }),
439            event_node_1
440        );
441        assert_eq!(
442            DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued {
443                peer_id: discv4_id_1,
444                addr: PeerAddr::new(discv4_enr_1.tcp_addr(), Some(discv4_enr_1.udp_addr())),
445                fork_id: None
446            }),
447            event_node_2
448        );
449
450        assert_eq!(1, node_1.discovered_nodes.len());
451        assert_eq!(1, node_2.discovered_nodes.len());
452
453        // add node_2:discv5 to node_1:discv5, manual insertion won't emit an event
454        node_1.add_discv5_node(EnrCombinedKeyWrapper(discv5_enr_node_2.clone()).into()).unwrap();
455        // verify node_2 is in KBuckets of node_1:discv5
456        assert!(node_1
457            .discv5
458            .as_ref()
459            .unwrap()
460            .with_discv5(|discv5| discv5.table_entries_id().contains(&discv5_id_2)));
461
462        // manually trigger connection from node_1:discv5 to node_2:discv5
463        node_1
464            .discv5
465            .as_ref()
466            .unwrap()
467            .with_discv5(|discv5| discv5.send_ping(discv5_enr_node_2.clone()))
468            .await
469            .unwrap();
470
471        // this won't emit an event, since the nodes already discovered each other on discv4, the
472        // number of nodes stored for each node on this level remains 1.
473        assert_eq!(1, node_1.discovered_nodes.len());
474        assert_eq!(1, node_2.discovered_nodes.len());
475    }
476}