Skip to main content

reth_network/
discovery.rs

1//! Discovery support for the network.
2
3use crate::{
4    cache::LruMap,
5    error::{NetworkError, ServiceKind},
6};
7use enr::Enr;
8use futures::StreamExt;
9use reth_discv4::{DiscoveryUpdate, Discv4, Discv4Config};
10use reth_discv5::{DiscoveredPeer, Discv5};
11use reth_dns_discovery::{
12    DnsDiscoveryConfig, DnsDiscoveryHandle, DnsDiscoveryService, DnsNodeRecordUpdate, DnsResolver,
13};
14use reth_ethereum_forks::{EnrForkIdEntry, ForkId};
15use reth_network_api::{DiscoveredEvent, DiscoveryEvent};
16use reth_network_peers::{NodeRecord, PeerId};
17use reth_network_types::PeerAddr;
18use secp256k1::SecretKey;
19use std::{
20    collections::VecDeque,
21    net::{IpAddr, SocketAddr},
22    pin::Pin,
23    sync::Arc,
24    task::{ready, Context, Poll},
25};
26use tokio::{net::UdpSocket, sync::mpsc, task::JoinHandle};
27use tokio_stream::{wrappers::ReceiverStream, Stream};
28use tracing::{debug, trace};
29
30/// Default max capacity for cache of discovered peers.
31///
32/// Default is 10 000 peers.
33pub const DEFAULT_MAX_CAPACITY_DISCOVERED_PEERS_CACHE: u32 = 10_000;
34
35/// An abstraction over the configured discovery protocol.
36///
37/// Listens for new discovered nodes and emits events for discovered nodes and their
38/// address.
39#[derive(Debug)]
40pub struct Discovery {
41    /// All nodes discovered via discovery protocol.
42    ///
43    /// These nodes can be ephemeral and are updated via the discovery protocol.
44    discovered_nodes: LruMap<PeerId, PeerAddr>,
45    /// Local ENR of the discovery v4 service (discv5 ENR has same [`PeerId`]).
46    local_enr: NodeRecord,
47    /// Handler to interact with the Discovery v4 service
48    discv4: Option<Discv4>,
49    /// All KAD table updates from the discv4 service.
50    discv4_updates: Option<ReceiverStream<DiscoveryUpdate>>,
51    /// The handle to the spawned discv4 service
52    _discv4_service: Option<JoinHandle<()>>,
53    /// Handler to interact with the Discovery v5 service
54    discv5: Option<Discv5>,
55    /// All KAD table updates from the discv5 service.
56    discv5_updates: Option<ReceiverStream<discv5::Event>>,
57    /// Background task that, in shared-port mode, drains `UnrecognizedFrame`s from discv5 and
58    /// feeds them into the discv4 ingress so packets advance without polling `Discovery`.
59    _discv5_forwarder: Option<JoinHandle<()>>,
60    /// Handler to interact with the DNS discovery service
61    _dns_discovery: Option<DnsDiscoveryHandle>,
62    /// Updates from the DNS discovery service.
63    dns_discovery_updates: Option<ReceiverStream<DnsNodeRecordUpdate>>,
64    /// The handle to the spawned DNS discovery service
65    _dns_disc_service: Option<JoinHandle<()>>,
66    /// Events buffered until polled.
67    queued_events: VecDeque<DiscoveryEvent>,
68    /// List of listeners subscribed to discovery events.
69    discovery_listeners: Vec<mpsc::UnboundedSender<DiscoveryEvent>>,
70}
71
72impl Discovery {
73    /// Spawns the discovery service.
74    ///
75    /// This will spawn the [`reth_discv4::Discv4Service`] onto a new task and establish a listener
76    /// channel to receive all discovered nodes.
77    pub async fn new(
78        tcp_addr: SocketAddr,
79        discovery_v4_addr: SocketAddr,
80        sk: SecretKey,
81        discv4_config: Option<Discv4Config>,
82        mut discv5_config: Option<reth_discv5::Config>, // contains discv5 listen address
83        dns_discovery_config: Option<DnsDiscoveryConfig>,
84    ) -> Result<Self, NetworkError> {
85        // setup discv4 with the discovery address and tcp port
86        let local_enr =
87            NodeRecord::from_secret_key(discovery_v4_addr, &sk).with_tcp_port(tcp_addr.port());
88
89        // For IPv6 we set IPV6_V6ONLY=true so an IPv4 sibling socket on the same port doesn't
90        // clash with the IPv6 one (Linux's default of V6ONLY=0 has IPv6 also claim the IPv4
91        // port via mapped addresses), matching how discv5 binds its `DualStack` sockets.
92        let bind_socket = async |addr: SocketAddr| {
93            let result = match addr {
94                SocketAddr::V4(_) => UdpSocket::bind(addr).await,
95                SocketAddr::V6(_) => {
96                    use socket2::{Domain, Protocol, Socket, Type};
97                    (|| {
98                        let socket = Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))?;
99                        socket.set_only_v6(true)?;
100                        socket.set_nonblocking(true)?;
101                        socket.bind(&addr.into())?;
102                        UdpSocket::from_std(socket.into())
103                    })()
104                }
105            };
106            result
107                .map(Arc::new)
108                .map_err(|err| NetworkError::from_io_error(err, ServiceKind::Discovery(addr)))
109        };
110
111        // In shared-port mode, bind the shared socket and start discv4 without its own receive
112        // loop. Unrecognized frames from discv5 will be forwarded to the ingress handler.
113        let (discv4, discv4_updates, _discv4_service, discv4_ingress, shared_socket) =
114            if let Some(config) = discv4_config {
115                if let Some(discv5_config) = &mut discv5_config &&
116                    discv5_config.has_matching_socket(discovery_v4_addr)
117                {
118                    let socket = bind_socket(discovery_v4_addr).await?;
119
120                    let (discv4, mut discv4_service, ingress) = Discv4::bind_shared(
121                        socket.clone(),
122                        local_enr,
123                        sk,
124                        config,
125                    )
126                    .map_err(|err| {
127                        NetworkError::from_io_error(err, ServiceKind::Discovery(discovery_v4_addr))
128                    })?;
129
130                    let discv4_updates = discv4_service.update_stream();
131                    let discv4_service = discv4_service.spawn();
132                    debug!(target:"net", ?discovery_v4_addr, "started discovery v4 (shared port)");
133                    (
134                        Some(discv4),
135                        Some(discv4_updates),
136                        Some(discv4_service),
137                        Some(ingress),
138                        Some(socket),
139                    )
140                } else {
141                    let (discv4, mut discv4_service) =
142                        Discv4::bind(discovery_v4_addr, local_enr, sk, config).await.map_err(
143                            |err| {
144                                NetworkError::from_io_error(
145                                    err,
146                                    ServiceKind::Discovery(discovery_v4_addr),
147                                )
148                            },
149                        )?;
150                    let discv4_updates = discv4_service.update_stream();
151                    // spawn the service
152                    let discv4_service = discv4_service.spawn();
153
154                    debug!(target:"net", ?discovery_v4_addr, "started discovery v4");
155
156                    (Some(discv4), Some(discv4_updates), Some(discv4_service), None, None)
157                }
158            } else {
159                (None, None, None, None, None)
160            };
161
162        // Start discv5, wiring in the shared socket if in shared-port mode.
163        let (discv5, discv5_updates) = if let Some(mut config) = discv5_config {
164            if let Some(socket) = shared_socket {
165                let discv5_cfg = config.discv5_config_mut();
166
167                // The shared socket covers discv4's address family; bind the opposite family
168                // only if discv5 was configured for dual-stack.
169                let (mut ipv4, mut ipv6) = (None, None);
170                if discovery_v4_addr.is_ipv4() {
171                    ipv4 = Some(socket);
172                    if let Some(addr) = reth_discv5::config::ipv6(&discv5_cfg.listen_config) {
173                        ipv6 = Some(bind_socket(SocketAddr::V6(addr)).await?);
174                    }
175                } else {
176                    ipv6 = Some(socket);
177                    if let Some(addr) = reth_discv5::config::ipv4(&discv5_cfg.listen_config) {
178                        ipv4 = Some(bind_socket(SocketAddr::V4(addr)).await?);
179                    }
180                }
181
182                discv5_cfg.listen_config = discv5::ListenConfig::FromSockets { ipv4, ipv6 };
183            }
184
185            let (discv5, discv5_updates) = Discv5::start(&sk, config).await?;
186            debug!(target:"net", discovery_v5_enr=?discv5.local_enr(), "started discovery v5");
187            (Some(discv5), Some(discv5_updates))
188        } else {
189            (None, None)
190        };
191
192        // In shared-port mode, spawn a task that peels `UnrecognizedFrame` events off the discv5
193        // update stream and feeds them into discv4's ingress. Other events are forwarded through
194        // a new channel that `Discovery::poll` reads. This keeps both protocols moving without
195        // requiring the main `Discovery::poll` loop to be driven for packets to be routed.
196        let (discv5_updates, _discv5_forwarder) = match (discv4_ingress, discv5_updates) {
197            (Some(mut ingress), Some(mut updates)) => {
198                let (tx, rx) = mpsc::channel(updates.max_capacity());
199                let handle = tokio::spawn(async move {
200                    while let Some(event) = updates.recv().await {
201                        if let discv5::Event::UnrecognizedFrame(frame) = &event {
202                            ingress.handle_packet(&frame.packet, frame.src_address).await;
203                            continue;
204                        }
205                        if tx.send(event).await.is_err() {
206                            break;
207                        }
208                    }
209                });
210                (Some(ReceiverStream::new(rx)), Some(handle))
211            }
212            (_, updates) => (updates.map(ReceiverStream::new), None),
213        };
214
215        // setup DNS discovery
216        let (_dns_discovery, dns_discovery_updates, _dns_disc_service) =
217            if let Some(dns_config) = dns_discovery_config {
218                let (mut service, dns_disc) = DnsDiscoveryService::new_pair(
219                    Arc::new(DnsResolver::from_system_conf()?),
220                    dns_config,
221                );
222                let dns_discovery_updates = service.node_record_stream();
223                let dns_disc_service = service.spawn();
224                (Some(dns_disc), Some(dns_discovery_updates), Some(dns_disc_service))
225            } else {
226                (None, None, None)
227            };
228
229        Ok(Self {
230            discovery_listeners: Default::default(),
231            local_enr,
232            discv4,
233            discv4_updates,
234            _discv4_service,
235            discv5,
236            discv5_updates,
237            _discv5_forwarder,
238            discovered_nodes: LruMap::new(DEFAULT_MAX_CAPACITY_DISCOVERED_PEERS_CACHE),
239            queued_events: Default::default(),
240            _dns_disc_service,
241            _dns_discovery,
242            dns_discovery_updates,
243        })
244    }
245
246    /// Registers a listener for receiving [`DiscoveryEvent`] updates.
247    pub(crate) fn add_listener(&mut self, tx: mpsc::UnboundedSender<DiscoveryEvent>) {
248        self.discovery_listeners.push(tx);
249    }
250
251    /// Notifies all registered listeners with the provided `event`.
252    #[inline]
253    fn notify_listeners(&mut self, event: &DiscoveryEvent) {
254        self.discovery_listeners.retain_mut(|listener| listener.send(event.clone()).is_ok());
255    }
256
257    /// Updates the `eth:ForkId` field in discv4/discv5.
258    pub(crate) fn update_fork_id(&self, fork_id: ForkId) {
259        if let Some(discv4) = &self.discv4 {
260            // use forward-compatible forkid entry
261            discv4.set_eip868_rlp(b"eth".to_vec(), EnrForkIdEntry::from(fork_id))
262        }
263        if let Some(discv5) = &self.discv5 {
264            discv5
265                .encode_and_set_eip868_in_local_enr(b"eth".to_vec(), EnrForkIdEntry::from(fork_id))
266        }
267    }
268
269    /// Bans the [`IpAddr`] in the discovery service.
270    pub(crate) fn ban_ip(&self, ip: IpAddr) {
271        if let Some(discv4) = &self.discv4 {
272            discv4.ban_ip(ip)
273        }
274        if let Some(discv5) = &self.discv5 {
275            discv5.ban_ip(ip)
276        }
277    }
278
279    /// Bans the [`PeerId`] and [`IpAddr`] in the discovery service.
280    pub(crate) fn ban(&self, peer_id: PeerId, ip: IpAddr) {
281        if let Some(discv4) = &self.discv4 {
282            discv4.ban(peer_id, ip)
283        }
284        if let Some(discv5) = &self.discv5 {
285            discv5.ban(peer_id, ip)
286        }
287    }
288
289    /// Returns a shared reference to the discv4.
290    pub fn discv4(&self) -> Option<Discv4> {
291        self.discv4.clone()
292    }
293
294    /// Returns the id with which the local node identifies itself in the network
295    pub(crate) const fn local_id(&self) -> PeerId {
296        self.local_enr.id // local discv4 and discv5 have same id, since signed with same secret key
297    }
298
299    /// Add a node to the discv4 table.
300    pub(crate) fn add_discv4_node(&self, node: NodeRecord) {
301        if let Some(discv4) = &self.discv4 {
302            discv4.add_node(node);
303        }
304    }
305
306    /// Returns discv5 handle.
307    pub fn discv5(&self) -> Option<Discv5> {
308        self.discv5.clone()
309    }
310
311    /// Add a node to the discv4 table.
312    pub(crate) fn add_discv5_node(&self, enr: Enr<SecretKey>) -> Result<(), NetworkError> {
313        if let Some(discv5) = &self.discv5 {
314            discv5.add_node(enr).map_err(NetworkError::Discv5Error)?;
315        }
316
317        Ok(())
318    }
319
320    /// Processes an incoming [`NodeRecord`] update from a discovery service
321    fn on_node_record_update(&mut self, record: NodeRecord, fork_id: Option<ForkId>) {
322        let peer_id = record.id;
323        let tcp_addr = record.tcp_addr();
324        if tcp_addr.port() == 0 {
325            // useless peer for p2p
326            return
327        }
328        let udp_addr = record.udp_addr();
329        let addr = PeerAddr::new(tcp_addr, Some(udp_addr));
330        _ =
331            self.discovered_nodes.get_or_insert(peer_id, || {
332                self.queued_events.push_back(DiscoveryEvent::NewNode(
333                    DiscoveredEvent::EventQueued { peer_id, addr, fork_id },
334                ));
335
336                addr
337            })
338    }
339
340    fn on_discv4_update(&mut self, update: DiscoveryUpdate) {
341        match update {
342            DiscoveryUpdate::Added(record) | DiscoveryUpdate::DiscoveredAtCapacity(record) => {
343                self.on_node_record_update(record, None);
344            }
345            DiscoveryUpdate::EnrForkId(node, fork_id) => {
346                self.queued_events.push_back(DiscoveryEvent::EnrForkId(node, fork_id))
347            }
348            DiscoveryUpdate::Removed(peer_id) => {
349                self.discovered_nodes.remove(&peer_id);
350            }
351            DiscoveryUpdate::Batch(updates) => {
352                for update in updates {
353                    self.on_discv4_update(update);
354                }
355            }
356        }
357    }
358
359    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DiscoveryEvent> {
360        loop {
361            // Drain all buffered events first
362            if let Some(event) = self.queued_events.pop_front() {
363                self.notify_listeners(&event);
364                return Poll::Ready(event)
365            }
366
367            // drain the discv4 update stream
368            while let Some(Poll::Ready(Some(update))) =
369                self.discv4_updates.as_mut().map(|updates| updates.poll_next_unpin(cx))
370            {
371                self.on_discv4_update(update)
372            }
373
374            // drain the discv5 update stream
375            while let Some(Poll::Ready(Some(update))) =
376                self.discv5_updates.as_mut().map(|updates| updates.poll_next_unpin(cx))
377            {
378                if let Some(discv5) = self.discv5.as_mut() &&
379                    let Some(DiscoveredPeer { node_record, fork_id }) =
380                        discv5.on_discv5_update(update)
381                {
382                    self.on_node_record_update(node_record, fork_id);
383                }
384            }
385
386            // drain the dns update stream
387            while let Some(Poll::Ready(Some(update))) =
388                self.dns_discovery_updates.as_mut().map(|updates| updates.poll_next_unpin(cx))
389            {
390                self.add_discv4_node(update.node_record);
391                if let Err(err) = self.add_discv5_node(update.enr) {
392                    trace!(target: "net::discovery",
393                        %err,
394                        "failed adding node discovered by dns to discv5"
395                    );
396                }
397                self.on_node_record_update(update.node_record, update.fork_id);
398            }
399
400            if self.queued_events.is_empty() {
401                return Poll::Pending
402            }
403        }
404    }
405}
406
407impl Drop for Discovery {
408    fn drop(&mut self) {
409        if let Some(discv4) = &self.discv4 {
410            discv4.terminate();
411        }
412        if let Some(handle) = self._discv4_service.take() {
413            handle.abort();
414        }
415        if let Some(handle) = self._discv5_forwarder.take() {
416            handle.abort();
417        }
418        if let Some(handle) = self._dns_disc_service.take() {
419            handle.abort();
420        }
421    }
422}
423
424impl Stream for Discovery {
425    type Item = DiscoveryEvent;
426
427    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
428        Poll::Ready(Some(ready!(self.get_mut().poll(cx))))
429    }
430}
431
432#[cfg(test)]
433impl Discovery {
434    /// Returns a Discovery instance that does nothing and is intended for testing purposes.
435    ///
436    /// NOTE: This instance does nothing
437    pub(crate) fn noop() -> Self {
438        let (_discovery_listeners, _): (mpsc::UnboundedSender<DiscoveryEvent>, _) =
439            mpsc::unbounded_channel();
440
441        Self {
442            discovered_nodes: LruMap::new(0),
443            local_enr: NodeRecord {
444                address: IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
445                tcp_port: 0,
446                udp_port: 0,
447                id: PeerId::random(),
448            },
449            discv4: Default::default(),
450            discv4_updates: Default::default(),
451            _discv4_service: Default::default(),
452            _discv5_forwarder: None,
453            discv5: None,
454            discv5_updates: None,
455            queued_events: Default::default(),
456            _dns_discovery: None,
457            dns_discovery_updates: None,
458            _dns_disc_service: None,
459            discovery_listeners: Default::default(),
460        }
461    }
462}
463
464#[cfg(test)]
465mod tests {
466    use super::*;
467    use secp256k1::SECP256K1;
468    use std::net::{Ipv4Addr, SocketAddrV4};
469
470    #[tokio::test(flavor = "multi_thread")]
471    async fn test_discovery_setup() {
472        let (secret_key, _) = SECP256K1.generate_keypair(&mut rand_08::thread_rng());
473        let discovery_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
474        let _discovery = Discovery::new(
475            discovery_addr,
476            discovery_addr,
477            secret_key,
478            Default::default(),
479            None,
480            Default::default(),
481        )
482        .await
483        .unwrap();
484    }
485
486    use reth_discv4::Discv4ConfigBuilder;
487    use reth_discv5::{enr::EnrCombinedKeyWrapper, enr_to_discv4_id};
488    use tracing::trace;
489
490    async fn start_discovery_node(udp_port_discv4: u16, udp_port_discv5: u16) -> Discovery {
491        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
492
493        let discv4_addr = format!("127.0.0.1:{udp_port_discv4}").parse().unwrap();
494        let discv5_addr: SocketAddr = format!("127.0.0.1:{udp_port_discv5}").parse().unwrap();
495
496        // disable `NatResolver`
497        let discv4_config = Discv4ConfigBuilder::default().external_ip_resolver(None).build();
498
499        let discv5_listen_config = discv5::ListenConfig::from(discv5_addr);
500        let discv5_config = reth_discv5::Config::builder(discv5_addr)
501            .discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build())
502            .build();
503
504        Discovery::new(
505            discv4_addr,
506            discv4_addr,
507            secret_key,
508            Some(discv4_config),
509            Some(discv5_config),
510            None,
511        )
512        .await
513        .expect("should build discv5 with discv4 downgrade")
514    }
515
516    #[tokio::test(flavor = "multi_thread")]
517    async fn discv5_and_discv4_same_pk() {
518        reth_tracing::init_test_tracing();
519
520        // set up test
521        let mut node_1 = start_discovery_node(40014, 40015).await;
522        let discv4_enr_1 = node_1.discv4.as_ref().unwrap().node_record();
523        let discv5_enr_node_1 =
524            node_1.discv5.as_ref().unwrap().with_discv5(|discv5| discv5.local_enr());
525        let discv4_id_1 = discv4_enr_1.id;
526        let discv5_id_1 = discv5_enr_node_1.node_id();
527
528        let mut node_2 = start_discovery_node(40024, 40025).await;
529        let discv4_enr_2 = node_2.discv4.as_ref().unwrap().node_record();
530        let discv5_enr_node_2 =
531            node_2.discv5.as_ref().unwrap().with_discv5(|discv5| discv5.local_enr());
532        let discv4_id_2 = discv4_enr_2.id;
533        let discv5_id_2 = discv5_enr_node_2.node_id();
534
535        trace!(target: "net::discovery::tests",
536            node_1_node_id=format!("{:#}", discv5_id_1),
537            node_2_node_id=format!("{:#}", discv5_id_2),
538            "started nodes"
539        );
540
541        // test
542
543        // assert discovery version 4 and version 5 nodes have same id
544        assert_eq!(discv4_id_1, enr_to_discv4_id(&discv5_enr_node_1).unwrap());
545        assert_eq!(discv4_id_2, enr_to_discv4_id(&discv5_enr_node_2).unwrap());
546
547        // add node_2:discv4 manually to node_1:discv4
548        node_1.add_discv4_node(discv4_enr_2);
549
550        // verify node_2:discv4 discovered node_1:discv4 and vv
551        let event_node_1 = node_1.next().await.unwrap();
552        let event_node_2 = node_2.next().await.unwrap();
553
554        assert_eq!(
555            DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued {
556                peer_id: discv4_id_2,
557                addr: PeerAddr::new(discv4_enr_2.tcp_addr(), Some(discv4_enr_2.udp_addr())),
558                fork_id: None
559            }),
560            event_node_1
561        );
562        assert_eq!(
563            DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued {
564                peer_id: discv4_id_1,
565                addr: PeerAddr::new(discv4_enr_1.tcp_addr(), Some(discv4_enr_1.udp_addr())),
566                fork_id: None
567            }),
568            event_node_2
569        );
570
571        assert_eq!(1, node_1.discovered_nodes.len());
572        assert_eq!(1, node_2.discovered_nodes.len());
573
574        // add node_2:discv5 to node_1:discv5, manual insertion won't emit an event
575        node_1.add_discv5_node(EnrCombinedKeyWrapper(discv5_enr_node_2.clone()).into()).unwrap();
576        // verify node_2 is in KBuckets of node_1:discv5
577        assert!(node_1
578            .discv5
579            .as_ref()
580            .unwrap()
581            .with_discv5(|discv5| discv5.table_entries_id().contains(&discv5_id_2)));
582
583        // manually trigger connection from node_1:discv5 to node_2:discv5
584        node_1
585            .discv5
586            .as_ref()
587            .unwrap()
588            .with_discv5(|discv5| discv5.send_ping(discv5_enr_node_2.clone()))
589            .await
590            .unwrap();
591
592        // this won't emit an event, since the nodes already discovered each other on discv4, the
593        // number of nodes stored for each node on this level remains 1.
594        assert_eq!(1, node_1.discovered_nodes.len());
595        assert_eq!(1, node_2.discovered_nodes.len());
596    }
597
598    /// Starts a discovery node with discv4 and discv5 sharing the same UDP port.
599    async fn start_shared_port_node(port: u16) -> Discovery {
600        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
601        let disc_addr: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
602        // Use a non-zero TCP port so the node record isn't filtered out by
603        // `on_node_record_update` (which drops peers with tcp port == 0).
604        let tcp_addr: SocketAddr = "127.0.0.1:30303".parse().unwrap();
605
606        let discv4_config = Discv4ConfigBuilder::default().external_ip_resolver(None).build();
607
608        let discv5_listen_config = discv5::ListenConfig::from(disc_addr);
609        let discv5_config = reth_discv5::Config::builder(tcp_addr)
610            .discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build())
611            .build();
612
613        // Both protocols use the same address, triggering shared-port mode
614        Discovery::new(
615            tcp_addr,
616            disc_addr,
617            secret_key,
618            Some(discv4_config),
619            Some(discv5_config),
620            None,
621        )
622        .await
623        .expect("should start with shared port")
624    }
625
626    #[tokio::test(flavor = "multi_thread")]
627    async fn test_shared_port_setup() {
628        reth_tracing::init_test_tracing();
629
630        // Use port 0 so the OS picks a free port
631        let node = start_shared_port_node(0).await;
632
633        // Both protocols should be active
634        assert!(node.discv4.is_some(), "discv4 should be running");
635        assert!(node.discv5.is_some(), "discv5 should be running");
636    }
637
638    #[tokio::test(flavor = "multi_thread")]
639    async fn test_shared_port_discv5_discovery() {
640        reth_tracing::init_test_tracing();
641
642        let mut node_1 = start_shared_port_node(0).await;
643        let mut node_2 = start_shared_port_node(0).await;
644
645        let discv5_enr_1 = node_1.discv5.as_ref().unwrap().with_discv5(|discv5| discv5.local_enr());
646        let discv5_enr_2 = node_2.discv5.as_ref().unwrap().with_discv5(|discv5| discv5.local_enr());
647
648        let peer_id_1 = enr_to_discv4_id(&discv5_enr_1).unwrap();
649        let peer_id_2 = enr_to_discv4_id(&discv5_enr_2).unwrap();
650
651        // Add node_2's ENR to node_1's discv5 kbuckets and trigger a ping to establish a session.
652        // send_ping awaits the PONG, so the handshake completes before we poll the Discovery
653        // stream. The discv5 service runs its own background task.
654        node_1.add_discv5_node(EnrCombinedKeyWrapper(discv5_enr_2.clone()).into()).unwrap();
655        node_1
656            .discv5
657            .as_ref()
658            .unwrap()
659            .with_discv5(|discv5| discv5.send_ping(discv5_enr_2))
660            .await
661            .unwrap();
662
663        // Both SessionEstablished events should now be buffered in the update channels.
664        // Drive both nodes concurrently to collect them.
665        let mut event_1 = None;
666        let mut event_2 = None;
667        let timeout = tokio::time::sleep(std::time::Duration::from_secs(5));
668        tokio::pin!(timeout);
669        loop {
670            tokio::select! {
671                ev = node_1.next(), if event_1.is_none() => {
672                    event_1 = ev;
673                }
674                ev = node_2.next(), if event_2.is_none() => {
675                    event_2 = ev;
676                }
677                _ = &mut timeout => {
678                    panic!("timed out waiting for discv5 discovery events");
679                }
680            }
681            if event_1.is_some() && event_2.is_some() {
682                break;
683            }
684        }
685
686        assert!(matches!(
687            event_1.unwrap(),
688            DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { peer_id, .. })
689                if peer_id == peer_id_2
690        ));
691        assert!(matches!(
692            event_2.unwrap(),
693            DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { peer_id, .. })
694                if peer_id == peer_id_1
695        ));
696    }
697
698    #[tokio::test(flavor = "multi_thread")]
699    async fn test_shared_port_discv4_discovery() {
700        reth_tracing::init_test_tracing();
701
702        let mut node_1 = start_shared_port_node(0).await;
703        let mut node_2 = start_shared_port_node(0).await;
704
705        let enr_1 = node_1.discv4.as_ref().unwrap().node_record();
706        let enr_2 = node_2.discv4.as_ref().unwrap().node_record();
707
708        // Introduce node_2 to node_1 via discv4
709        node_1.add_discv4_node(enr_2);
710
711        // Both nodes should discover each other via discv4 ping/pong
712        let event_1 = node_1.next().await.unwrap();
713        let event_2 = node_2.next().await.unwrap();
714
715        assert_eq!(
716            DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued {
717                peer_id: enr_2.id,
718                addr: PeerAddr::new(enr_2.tcp_addr(), Some(enr_2.udp_addr())),
719                fork_id: None
720            }),
721            event_1
722        );
723        assert_eq!(
724            DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued {
725                peer_id: enr_1.id,
726                addr: PeerAddr::new(enr_1.tcp_addr(), Some(enr_1.udp_addr())),
727                fork_id: None
728            }),
729            event_2
730        );
731    }
732
733    /// Verifies that shared-port mode binds correctly when discv5 is configured for dual-stack.
734    /// On Linux this exercises the IPv6 V6ONLY path: without it, the IPv4 sibling would clash
735    /// with the IPv6 socket bound to the same port.
736    #[tokio::test(flavor = "multi_thread")]
737    async fn test_shared_port_dual_stack() {
738        reth_tracing::init_test_tracing();
739
740        // Find a port that's free on the v4 wildcard so we can use it for both v4 and v6.
741        let probe = UdpSocket::bind("0.0.0.0:0").await.expect("probe bind");
742        let port = probe.local_addr().unwrap().port();
743        drop(probe);
744
745        let secret_key = SecretKey::new(&mut rand_08::thread_rng());
746        let v4_addr: SocketAddr = format!("0.0.0.0:{port}").parse().unwrap();
747        let tcp_addr: SocketAddr = "0.0.0.0:30303".parse().unwrap();
748
749        let discv4_config = Discv4ConfigBuilder::default().external_ip_resolver(None).build();
750
751        let discv5_listen_config = discv5::ListenConfig::DualStack {
752            ipv4: std::net::Ipv4Addr::UNSPECIFIED,
753            ipv4_port: port,
754            ipv6: std::net::Ipv6Addr::UNSPECIFIED,
755            ipv6_port: port,
756        };
757        let discv5_config = reth_discv5::Config::builder(tcp_addr)
758            .discv5_config(discv5::ConfigBuilder::new(discv5_listen_config).build())
759            .build();
760
761        Discovery::new(
762            tcp_addr,
763            v4_addr,
764            secret_key,
765            Some(discv4_config),
766            Some(discv5_config),
767            None,
768        )
769        .await
770        .expect("discovery should start with shared port + dual-stack");
771    }
772}