reth_eth_wire/
multiplex.rs

1//! Rlpx protocol multiplexer and satellite stream
2//!
3//! A Satellite is a Stream that primarily drives a single `RLPx` subprotocol but can also handle
4//! additional subprotocols.
5//!
6//! Most of other subprotocols are "dependent satellite" protocols of "eth" and not a fully standalone protocol, for example "snap", See also [snap protocol](https://github.com/ethereum/devp2p/blob/298d7a77c3bf833641579ecbbb5b13f0311eeeea/caps/snap.md?plain=1#L71)
7//! Hence it is expected that the primary protocol is "eth" and the additional protocols are
8//! "dependent satellite" protocols.
9
10use std::{
11    collections::VecDeque,
12    fmt,
13    future::Future,
14    io,
15    pin::{pin, Pin},
16    task::{ready, Context, Poll},
17};
18
19use crate::{
20    capability::{SharedCapabilities, SharedCapability, UnsupportedCapabilityError},
21    errors::{EthStreamError, P2PStreamError},
22    p2pstream::DisconnectP2P,
23    CanDisconnect, Capability, DisconnectReason, EthStream, P2PStream, Status, UnauthedEthStream,
24};
25use bytes::{Bytes, BytesMut};
26use futures::{Sink, SinkExt, Stream, StreamExt, TryStream, TryStreamExt};
27use reth_eth_wire_types::NetworkPrimitives;
28use reth_ethereum_forks::ForkFilter;
29use tokio::sync::{mpsc, mpsc::UnboundedSender};
30use tokio_stream::wrappers::UnboundedReceiverStream;
31
32/// A Stream and Sink type that wraps a raw rlpx stream [`P2PStream`] and handles message ID
33/// multiplexing.
34#[derive(Debug)]
35pub struct RlpxProtocolMultiplexer<St> {
36    inner: MultiplexInner<St>,
37}
38
39impl<St> RlpxProtocolMultiplexer<St> {
40    /// Wraps the raw p2p stream
41    pub fn new(conn: P2PStream<St>) -> Self {
42        Self {
43            inner: MultiplexInner {
44                conn,
45                protocols: Default::default(),
46                out_buffer: Default::default(),
47            },
48        }
49    }
50
51    /// Installs a new protocol on top of the raw p2p stream.
52    ///
53    /// This accepts a closure that receives a [`ProtocolConnection`] that will yield messages for
54    /// the given capability.
55    pub fn install_protocol<F, Proto>(
56        &mut self,
57        cap: &Capability,
58        f: F,
59    ) -> Result<(), UnsupportedCapabilityError>
60    where
61        F: FnOnce(ProtocolConnection) -> Proto,
62        Proto: Stream<Item = BytesMut> + Send + 'static,
63    {
64        self.inner.install_protocol(cap, f)
65    }
66
67    /// Returns the [`SharedCapabilities`] of the underlying raw p2p stream
68    pub const fn shared_capabilities(&self) -> &SharedCapabilities {
69        self.inner.shared_capabilities()
70    }
71
72    /// Converts this multiplexer into a [`RlpxSatelliteStream`] with the given primary protocol.
73    pub fn into_satellite_stream<F, Primary>(
74        self,
75        cap: &Capability,
76        primary: F,
77    ) -> Result<RlpxSatelliteStream<St, Primary>, P2PStreamError>
78    where
79        F: FnOnce(ProtocolProxy) -> Primary,
80    {
81        let Ok(shared_cap) = self.shared_capabilities().ensure_matching_capability(cap).cloned()
82        else {
83            return Err(P2PStreamError::CapabilityNotShared)
84        };
85
86        let (to_primary, from_wire) = mpsc::unbounded_channel();
87        let (to_wire, from_primary) = mpsc::unbounded_channel();
88        let proxy = ProtocolProxy {
89            shared_cap: shared_cap.clone(),
90            from_wire: UnboundedReceiverStream::new(from_wire),
91            to_wire,
92        };
93
94        let st = primary(proxy);
95        Ok(RlpxSatelliteStream {
96            inner: self.inner,
97            primary: PrimaryProtocol {
98                to_primary,
99                from_primary: UnboundedReceiverStream::new(from_primary),
100                st,
101                shared_cap,
102            },
103        })
104    }
105
106    /// Converts this multiplexer into a [`RlpxSatelliteStream`] with the given primary protocol.
107    ///
108    /// Returns an error if the primary protocol is not supported by the remote or the handshake
109    /// failed.
110    pub async fn into_satellite_stream_with_handshake<F, Fut, Err, Primary>(
111        self,
112        cap: &Capability,
113        handshake: F,
114    ) -> Result<RlpxSatelliteStream<St, Primary>, Err>
115    where
116        F: FnOnce(ProtocolProxy) -> Fut,
117        Fut: Future<Output = Result<Primary, Err>>,
118        St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
119        P2PStreamError: Into<Err>,
120    {
121        self.into_satellite_stream_with_tuple_handshake(cap, move |proxy| async move {
122            let st = handshake(proxy).await?;
123            Ok((st, ()))
124        })
125        .await
126        .map(|(st, _)| st)
127    }
128
129    /// Converts this multiplexer into a [`RlpxSatelliteStream`] with the given primary protocol.
130    ///
131    /// Returns an error if the primary protocol is not supported by the remote or the handshake
132    /// failed.
133    ///
134    /// This accepts a closure that does a handshake with the remote peer and returns a tuple of the
135    /// primary stream and extra data.
136    ///
137    /// See also [`UnauthedEthStream::handshake`]
138    pub async fn into_satellite_stream_with_tuple_handshake<F, Fut, Err, Primary, Extra>(
139        mut self,
140        cap: &Capability,
141        handshake: F,
142    ) -> Result<(RlpxSatelliteStream<St, Primary>, Extra), Err>
143    where
144        F: FnOnce(ProtocolProxy) -> Fut,
145        Fut: Future<Output = Result<(Primary, Extra), Err>>,
146        St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
147        P2PStreamError: Into<Err>,
148    {
149        let Ok(shared_cap) = self.shared_capabilities().ensure_matching_capability(cap).cloned()
150        else {
151            return Err(P2PStreamError::CapabilityNotShared.into())
152        };
153
154        let (to_primary, from_wire) = mpsc::unbounded_channel();
155        let (to_wire, mut from_primary) = mpsc::unbounded_channel();
156        let proxy = ProtocolProxy {
157            shared_cap: shared_cap.clone(),
158            from_wire: UnboundedReceiverStream::new(from_wire),
159            to_wire,
160        };
161
162        let f = handshake(proxy);
163        let mut f = pin!(f);
164
165        // this polls the connection and the primary stream concurrently until the handshake is
166        // complete
167        loop {
168            tokio::select! {
169                Some(Ok(msg)) = self.inner.conn.next() => {
170                    // Ensure the message belongs to the primary protocol
171                    let Some(offset) = msg.first().copied()
172                    else {
173                        return Err(P2PStreamError::EmptyProtocolMessage.into())
174                    };
175                    if let Some(cap) = self.shared_capabilities().find_by_relative_offset(offset).cloned() {
176                            if cap == shared_cap {
177                                // delegate to primary
178                                let _ = to_primary.send(msg);
179                            } else {
180                                // delegate to satellite
181                                self.inner.delegate_message(&cap, msg);
182                            }
183                        } else {
184                           return Err(P2PStreamError::UnknownReservedMessageId(offset).into())
185                        }
186                }
187                Some(msg) = from_primary.recv() => {
188                    self.inner.conn.send(msg).await.map_err(Into::into)?;
189                }
190                res = &mut f => {
191                    let (st, extra) = res?;
192                    return Ok((RlpxSatelliteStream {
193                            inner: self.inner,
194                            primary: PrimaryProtocol {
195                                to_primary,
196                                from_primary: UnboundedReceiverStream::new(from_primary),
197                                st,
198                                shared_cap,
199                            }
200                    }, extra))
201                }
202            }
203        }
204    }
205
206    /// Converts this multiplexer into a [`RlpxSatelliteStream`] with eth protocol as the given
207    /// primary protocol.
208    pub async fn into_eth_satellite_stream<N: NetworkPrimitives>(
209        self,
210        status: Status,
211        fork_filter: ForkFilter,
212    ) -> Result<(RlpxSatelliteStream<St, EthStream<ProtocolProxy, N>>, Status), EthStreamError>
213    where
214        St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
215    {
216        let eth_cap = self.inner.conn.shared_capabilities().eth_version()?;
217        self.into_satellite_stream_with_tuple_handshake(
218            &Capability::eth(eth_cap),
219            move |proxy| async move {
220                UnauthedEthStream::new(proxy).handshake(status, fork_filter).await
221            },
222        )
223        .await
224    }
225}
226
227#[derive(Debug)]
228struct MultiplexInner<St> {
229    /// The raw p2p stream
230    conn: P2PStream<St>,
231    /// All the subprotocols that are multiplexed on top of the raw p2p stream
232    protocols: Vec<ProtocolStream>,
233    /// Buffer for outgoing messages on the wire.
234    out_buffer: VecDeque<Bytes>,
235}
236
237impl<St> MultiplexInner<St> {
238    const fn shared_capabilities(&self) -> &SharedCapabilities {
239        self.conn.shared_capabilities()
240    }
241
242    /// Delegates a message to the matching protocol.
243    fn delegate_message(&self, cap: &SharedCapability, msg: BytesMut) -> bool {
244        for proto in &self.protocols {
245            if proto.shared_cap == *cap {
246                proto.send_raw(msg);
247                return true
248            }
249        }
250        false
251    }
252
253    fn install_protocol<F, Proto>(
254        &mut self,
255        cap: &Capability,
256        f: F,
257    ) -> Result<(), UnsupportedCapabilityError>
258    where
259        F: FnOnce(ProtocolConnection) -> Proto,
260        Proto: Stream<Item = BytesMut> + Send + 'static,
261    {
262        let shared_cap =
263            self.conn.shared_capabilities().ensure_matching_capability(cap).cloned()?;
264        let (to_satellite, rx) = mpsc::unbounded_channel();
265        let proto_conn = ProtocolConnection { from_wire: UnboundedReceiverStream::new(rx) };
266        let st = f(proto_conn);
267        let st = ProtocolStream { shared_cap, to_satellite, satellite_st: Box::pin(st) };
268        self.protocols.push(st);
269        Ok(())
270    }
271}
272
273/// Represents a protocol in the multiplexer that is used as the primary protocol.
274#[derive(Debug)]
275struct PrimaryProtocol<Primary> {
276    /// Channel to send messages to the primary protocol.
277    to_primary: UnboundedSender<BytesMut>,
278    /// Receiver for messages from the primary protocol.
279    from_primary: UnboundedReceiverStream<Bytes>,
280    /// Shared capability of the primary protocol.
281    shared_cap: SharedCapability,
282    /// The primary stream.
283    st: Primary,
284}
285
286/// A Stream and Sink type that acts as a wrapper around a primary `RLPx` subprotocol (e.g. "eth")
287///
288/// Only emits and sends _non-empty_ messages
289#[derive(Debug)]
290pub struct ProtocolProxy {
291    shared_cap: SharedCapability,
292    /// Receives _non-empty_ messages from the wire
293    from_wire: UnboundedReceiverStream<BytesMut>,
294    /// Sends _non-empty_ messages from the wire
295    to_wire: UnboundedSender<Bytes>,
296}
297
298impl ProtocolProxy {
299    /// Sends a _non-empty_ message on the wire.
300    fn try_send(&self, msg: Bytes) -> Result<(), io::Error> {
301        if msg.is_empty() {
302            // message must not be empty
303            return Err(io::ErrorKind::InvalidInput.into())
304        }
305        self.to_wire.send(self.mask_msg_id(msg)?).map_err(|_| io::ErrorKind::BrokenPipe.into())
306    }
307
308    /// Masks the message ID of a message to be sent on the wire.
309    #[inline]
310    fn mask_msg_id(&self, msg: Bytes) -> Result<Bytes, io::Error> {
311        if msg.is_empty() {
312            // message must not be empty
313            return Err(io::ErrorKind::InvalidInput.into())
314        }
315
316        let offset = self.shared_cap.relative_message_id_offset();
317        if offset == 0 {
318            return Ok(msg);
319        }
320
321        let mut masked = Vec::from(msg);
322        masked[0] = masked[0].checked_add(offset).ok_or(io::ErrorKind::InvalidInput)?;
323        Ok(masked.into())
324    }
325
326    /// Unmasks the message ID of a message received from the wire.
327    #[inline]
328    fn unmask_id(&self, mut msg: BytesMut) -> Result<BytesMut, io::Error> {
329        if msg.is_empty() {
330            // message must not be empty
331            return Err(io::ErrorKind::InvalidInput.into())
332        }
333        msg[0] = msg[0]
334            .checked_sub(self.shared_cap.relative_message_id_offset())
335            .ok_or(io::ErrorKind::InvalidInput)?;
336        Ok(msg)
337    }
338}
339
340impl Stream for ProtocolProxy {
341    type Item = Result<BytesMut, io::Error>;
342
343    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
344        let msg = ready!(self.from_wire.poll_next_unpin(cx));
345        Poll::Ready(msg.map(|msg| self.get_mut().unmask_id(msg)))
346    }
347}
348
349impl Sink<Bytes> for ProtocolProxy {
350    type Error = io::Error;
351
352    fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
353        Poll::Ready(Ok(()))
354    }
355
356    fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
357        self.get_mut().try_send(item)
358    }
359
360    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
361        Poll::Ready(Ok(()))
362    }
363
364    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
365        Poll::Ready(Ok(()))
366    }
367}
368
369impl CanDisconnect<Bytes> for ProtocolProxy {
370    fn disconnect(
371        &mut self,
372        _reason: DisconnectReason,
373    ) -> Pin<Box<dyn Future<Output = Result<(), <Self as Sink<Bytes>>::Error>> + Send + '_>> {
374        // TODO handle disconnects
375        Box::pin(async move { Ok(()) })
376    }
377}
378
379/// A connection channel to receive _`non_empty`_ messages for the negotiated protocol.
380///
381/// This is a [Stream] that returns raw bytes of the received messages for this protocol.
382#[derive(Debug)]
383pub struct ProtocolConnection {
384    from_wire: UnboundedReceiverStream<BytesMut>,
385}
386
387impl Stream for ProtocolConnection {
388    type Item = BytesMut;
389
390    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
391        self.from_wire.poll_next_unpin(cx)
392    }
393}
394
395/// A Stream and Sink type that acts as a wrapper around a primary `RLPx` subprotocol (e.g. "eth")
396/// [`EthStream`] and can also handle additional subprotocols.
397#[derive(Debug)]
398pub struct RlpxSatelliteStream<St, Primary> {
399    inner: MultiplexInner<St>,
400    primary: PrimaryProtocol<Primary>,
401}
402
403impl<St, Primary> RlpxSatelliteStream<St, Primary> {
404    /// Installs a new protocol on top of the raw p2p stream.
405    ///
406    /// This accepts a closure that receives a [`ProtocolConnection`] that will yield messages for
407    /// the given capability.
408    pub fn install_protocol<F, Proto>(
409        &mut self,
410        cap: &Capability,
411        f: F,
412    ) -> Result<(), UnsupportedCapabilityError>
413    where
414        F: FnOnce(ProtocolConnection) -> Proto,
415        Proto: Stream<Item = BytesMut> + Send + 'static,
416    {
417        self.inner.install_protocol(cap, f)
418    }
419
420    /// Returns the primary protocol.
421    #[inline]
422    pub const fn primary(&self) -> &Primary {
423        &self.primary.st
424    }
425
426    /// Returns mutable access to the primary protocol.
427    #[inline]
428    pub const fn primary_mut(&mut self) -> &mut Primary {
429        &mut self.primary.st
430    }
431
432    /// Returns the underlying [`P2PStream`].
433    #[inline]
434    pub const fn inner(&self) -> &P2PStream<St> {
435        &self.inner.conn
436    }
437
438    /// Returns mutable access to the underlying [`P2PStream`].
439    #[inline]
440    pub const fn inner_mut(&mut self) -> &mut P2PStream<St> {
441        &mut self.inner.conn
442    }
443
444    /// Consumes this type and returns the wrapped [`P2PStream`].
445    #[inline]
446    pub fn into_inner(self) -> P2PStream<St> {
447        self.inner.conn
448    }
449}
450
451impl<St, Primary, PrimaryErr> Stream for RlpxSatelliteStream<St, Primary>
452where
453    St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
454    Primary: TryStream<Error = PrimaryErr> + Unpin,
455    P2PStreamError: Into<PrimaryErr>,
456{
457    type Item = Result<Primary::Ok, Primary::Error>;
458
459    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
460        let this = self.get_mut();
461
462        loop {
463            // first drain the primary stream
464            if let Poll::Ready(Some(msg)) = this.primary.st.try_poll_next_unpin(cx) {
465                return Poll::Ready(Some(msg))
466            }
467
468            let mut conn_ready = true;
469            loop {
470                match this.inner.conn.poll_ready_unpin(cx) {
471                    Poll::Ready(Ok(())) => {
472                        if let Some(msg) = this.inner.out_buffer.pop_front() {
473                            if let Err(err) = this.inner.conn.start_send_unpin(msg) {
474                                return Poll::Ready(Some(Err(err.into())))
475                            }
476                        } else {
477                            break
478                        }
479                    }
480                    Poll::Ready(Err(err)) => {
481                        if let Err(disconnect_err) =
482                            this.inner.conn.start_disconnect(DisconnectReason::DisconnectRequested)
483                        {
484                            return Poll::Ready(Some(Err(disconnect_err.into())))
485                        }
486                        return Poll::Ready(Some(Err(err.into())))
487                    }
488                    Poll::Pending => {
489                        conn_ready = false;
490                        break
491                    }
492                }
493            }
494
495            // advance primary out
496            loop {
497                match this.primary.from_primary.poll_next_unpin(cx) {
498                    Poll::Ready(Some(msg)) => {
499                        this.inner.out_buffer.push_back(msg);
500                    }
501                    Poll::Ready(None) => {
502                        // primary closed
503                        return Poll::Ready(None)
504                    }
505                    Poll::Pending => break,
506                }
507            }
508
509            // advance all satellites
510            for idx in (0..this.inner.protocols.len()).rev() {
511                let mut proto = this.inner.protocols.swap_remove(idx);
512                loop {
513                    match proto.poll_next_unpin(cx) {
514                        Poll::Ready(Some(Err(err))) => {
515                            return Poll::Ready(Some(Err(P2PStreamError::Io(err).into())))
516                        }
517                        Poll::Ready(Some(Ok(msg))) => {
518                            this.inner.out_buffer.push_back(msg);
519                        }
520                        Poll::Ready(None) => return Poll::Ready(None),
521                        Poll::Pending => {
522                            this.inner.protocols.push(proto);
523                            break
524                        }
525                    }
526                }
527            }
528
529            let mut delegated = false;
530            loop {
531                // pull messages from connection
532                match this.inner.conn.poll_next_unpin(cx) {
533                    Poll::Ready(Some(Ok(msg))) => {
534                        delegated = true;
535                        let Some(offset) = msg.first().copied() else {
536                            return Poll::Ready(Some(Err(
537                                P2PStreamError::EmptyProtocolMessage.into()
538                            )))
539                        };
540                        // delegate the multiplexed message to the correct protocol
541                        if let Some(cap) =
542                            this.inner.conn.shared_capabilities().find_by_relative_offset(offset)
543                        {
544                            if cap == &this.primary.shared_cap {
545                                // delegate to primary
546                                let _ = this.primary.to_primary.send(msg);
547                            } else {
548                                // delegate to installed satellite if any
549                                for proto in &this.inner.protocols {
550                                    if proto.shared_cap == *cap {
551                                        proto.send_raw(msg);
552                                        break
553                                    }
554                                }
555                            }
556                        } else {
557                            return Poll::Ready(Some(Err(P2PStreamError::UnknownReservedMessageId(
558                                offset,
559                            )
560                            .into())))
561                        }
562                    }
563                    Poll::Ready(Some(Err(err))) => return Poll::Ready(Some(Err(err.into()))),
564                    Poll::Ready(None) => {
565                        // connection closed
566                        return Poll::Ready(None)
567                    }
568                    Poll::Pending => break,
569                }
570            }
571
572            if !conn_ready || (!delegated && this.inner.out_buffer.is_empty()) {
573                return Poll::Pending
574            }
575        }
576    }
577}
578
579impl<St, Primary, T> Sink<T> for RlpxSatelliteStream<St, Primary>
580where
581    St: Stream<Item = io::Result<BytesMut>> + Sink<Bytes, Error = io::Error> + Unpin,
582    Primary: Sink<T> + Unpin,
583    P2PStreamError: Into<<Primary as Sink<T>>::Error>,
584{
585    type Error = <Primary as Sink<T>>::Error;
586
587    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
588        let this = self.get_mut();
589        if let Err(err) = ready!(this.inner.conn.poll_ready_unpin(cx)) {
590            return Poll::Ready(Err(err.into()))
591        }
592        if let Err(err) = ready!(this.primary.st.poll_ready_unpin(cx)) {
593            return Poll::Ready(Err(err))
594        }
595        Poll::Ready(Ok(()))
596    }
597
598    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
599        self.get_mut().primary.st.start_send_unpin(item)
600    }
601
602    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
603        self.get_mut().inner.conn.poll_flush_unpin(cx).map_err(Into::into)
604    }
605
606    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
607        self.get_mut().inner.conn.poll_close_unpin(cx).map_err(Into::into)
608    }
609}
610
611/// Wraps a `RLPx` subprotocol and handles message ID multiplexing.
612struct ProtocolStream {
613    shared_cap: SharedCapability,
614    /// the channel shared with the satellite stream
615    to_satellite: UnboundedSender<BytesMut>,
616    satellite_st: Pin<Box<dyn Stream<Item = BytesMut> + Send>>,
617}
618
619impl ProtocolStream {
620    /// Masks the message ID of a message to be sent on the wire.
621    #[inline]
622    fn mask_msg_id(&self, mut msg: BytesMut) -> Result<Bytes, io::Error> {
623        if msg.is_empty() {
624            // message must not be empty
625            return Err(io::ErrorKind::InvalidInput.into())
626        }
627        msg[0] = msg[0]
628            .checked_add(self.shared_cap.relative_message_id_offset())
629            .ok_or(io::ErrorKind::InvalidInput)?;
630        Ok(msg.freeze())
631    }
632
633    /// Unmasks the message ID of a message received from the wire.
634    #[inline]
635    fn unmask_id(&self, mut msg: BytesMut) -> Result<BytesMut, io::Error> {
636        if msg.is_empty() {
637            // message must not be empty
638            return Err(io::ErrorKind::InvalidInput.into())
639        }
640        msg[0] = msg[0]
641            .checked_sub(self.shared_cap.relative_message_id_offset())
642            .ok_or(io::ErrorKind::InvalidInput)?;
643        Ok(msg)
644    }
645
646    /// Sends the message to the satellite stream.
647    fn send_raw(&self, msg: BytesMut) {
648        let _ = self.unmask_id(msg).map(|msg| self.to_satellite.send(msg));
649    }
650}
651
652impl Stream for ProtocolStream {
653    type Item = Result<Bytes, io::Error>;
654
655    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
656        let this = self.get_mut();
657        let msg = ready!(this.satellite_st.as_mut().poll_next(cx));
658        Poll::Ready(msg.map(|msg| this.mask_msg_id(msg)))
659    }
660}
661
662impl fmt::Debug for ProtocolStream {
663    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
664        f.debug_struct("ProtocolStream").field("cap", &self.shared_cap).finish_non_exhaustive()
665    }
666}
667
668#[cfg(test)]
669mod tests {
670    use super::*;
671    use crate::{
672        test_utils::{
673            connect_passthrough, eth_handshake, eth_hello,
674            proto::{test_hello, TestProtoMessage},
675        },
676        UnauthedP2PStream,
677    };
678    use reth_eth_wire_types::EthNetworkPrimitives;
679    use tokio::{net::TcpListener, sync::oneshot};
680    use tokio_util::codec::Decoder;
681
682    #[tokio::test]
683    async fn eth_satellite() {
684        reth_tracing::init_test_tracing();
685        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
686        let local_addr = listener.local_addr().unwrap();
687        let (status, fork_filter) = eth_handshake();
688        let other_status = status;
689        let other_fork_filter = fork_filter.clone();
690        let _handle = tokio::spawn(async move {
691            let (incoming, _) = listener.accept().await.unwrap();
692            let stream = crate::PassthroughCodec::default().framed(incoming);
693            let (server_hello, _) = eth_hello();
694            let (p2p_stream, _) =
695                UnauthedP2PStream::new(stream).handshake(server_hello).await.unwrap();
696
697            let (_eth_stream, _) = UnauthedEthStream::new(p2p_stream)
698                .handshake::<EthNetworkPrimitives>(other_status, other_fork_filter)
699                .await
700                .unwrap();
701
702            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
703        });
704
705        let conn = connect_passthrough(local_addr, eth_hello().0).await;
706        let eth = conn.shared_capabilities().eth().unwrap().clone();
707
708        let multiplexer = RlpxProtocolMultiplexer::new(conn);
709        let _satellite = multiplexer
710            .into_satellite_stream_with_handshake(
711                eth.capability().as_ref(),
712                move |proxy| async move {
713                    UnauthedEthStream::new(proxy)
714                        .handshake::<EthNetworkPrimitives>(status, fork_filter)
715                        .await
716                },
717            )
718            .await
719            .unwrap();
720    }
721
722    /// A test that install a satellite stream eth+test protocol and sends messages between them.
723    #[tokio::test(flavor = "multi_thread")]
724    async fn eth_test_protocol_satellite() {
725        reth_tracing::init_test_tracing();
726        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
727        let local_addr = listener.local_addr().unwrap();
728        let (status, fork_filter) = eth_handshake();
729        let other_status = status;
730        let other_fork_filter = fork_filter.clone();
731        let _handle = tokio::spawn(async move {
732            let (incoming, _) = listener.accept().await.unwrap();
733            let stream = crate::PassthroughCodec::default().framed(incoming);
734            let (server_hello, _) = test_hello();
735            let (conn, _) = UnauthedP2PStream::new(stream).handshake(server_hello).await.unwrap();
736
737            let (mut st, _their_status) = RlpxProtocolMultiplexer::new(conn)
738                .into_eth_satellite_stream::<EthNetworkPrimitives>(other_status, other_fork_filter)
739                .await
740                .unwrap();
741
742            st.install_protocol(&TestProtoMessage::capability(), |mut conn| {
743                async_stream::stream! {
744                    yield TestProtoMessage::ping().encoded();
745                    let msg = conn.next().await.unwrap();
746                    let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap();
747                    assert_eq!(msg, TestProtoMessage::pong());
748
749                    yield TestProtoMessage::message("hello").encoded();
750                    let msg = conn.next().await.unwrap();
751                    let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap();
752                    assert_eq!(msg, TestProtoMessage::message("good bye!"));
753
754                    yield TestProtoMessage::message("good bye!").encoded();
755
756                    futures::future::pending::<()>().await;
757                    unreachable!()
758                }
759            })
760            .unwrap();
761
762            loop {
763                let _ = st.next().await;
764            }
765        });
766
767        let conn = connect_passthrough(local_addr, test_hello().0).await;
768        let (mut st, _their_status) = RlpxProtocolMultiplexer::new(conn)
769            .into_eth_satellite_stream::<EthNetworkPrimitives>(status, fork_filter)
770            .await
771            .unwrap();
772
773        let (tx, mut rx) = oneshot::channel();
774
775        st.install_protocol(&TestProtoMessage::capability(), |mut conn| {
776            async_stream::stream! {
777                let msg = conn.next().await.unwrap();
778                let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap();
779                assert_eq!(msg, TestProtoMessage::ping());
780
781                yield TestProtoMessage::pong().encoded();
782
783                let msg = conn.next().await.unwrap();
784                let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap();
785                assert_eq!(msg, TestProtoMessage::message("hello"));
786
787                yield TestProtoMessage::message("good bye!").encoded();
788
789                let msg = conn.next().await.unwrap();
790                let msg = TestProtoMessage::decode_message(&mut &msg[..]).unwrap();
791                assert_eq!(msg, TestProtoMessage::message("good bye!"));
792
793                tx.send(()).unwrap();
794
795                futures::future::pending::<()>().await;
796                unreachable!()
797            }
798        })
799        .unwrap();
800
801        loop {
802            tokio::select! {
803                _ = &mut rx => {
804                    break
805                }
806               _ = st.next() => {
807                }
808            }
809        }
810    }
811}