Skip to main content

reth_network/session/
conn.rs

1//! Connection types for a session
2
3use futures::{Sink, Stream};
4use reth_ecies::stream::ECIESStream;
5use reth_eth_wire::{
6    errors::EthStreamError,
7    message::EthBroadcastMessage,
8    multiplex::{ProtocolProxy, RlpxSatelliteStream},
9    EthMessage, EthNetworkPrimitives, EthStream, EthVersion, NetworkPrimitives, P2PStream,
10};
11use reth_eth_wire_types::RawCapabilityMessage;
12use std::{
13    pin::Pin,
14    task::{Context, Poll},
15};
16use tokio::net::TcpStream;
17
18/// The type of the underlying peer network connection.
19pub type EthPeerConnection<N> = EthStream<P2PStream<ECIESStream<TcpStream>>, N>;
20
21/// Various connection types that at least support the ETH protocol.
22pub type EthSatelliteConnection<N = EthNetworkPrimitives> =
23    RlpxSatelliteStream<ECIESStream<TcpStream>, EthStream<ProtocolProxy, N>>;
24
25/// Connection types that support the ETH protocol.
26///
27/// This can be either:
28/// - A connection that only supports the ETH protocol
29/// - A connection that supports the ETH protocol and at least one other `RLPx` protocol
30// This type is boxed because the underlying stream is ~6KB,
31// mostly coming from `P2PStream`'s `snap::Encoder` (2072), and `ECIESStream` (3600).
32#[derive(Debug)]
33pub enum EthRlpxConnection<N: NetworkPrimitives = EthNetworkPrimitives> {
34    /// A connection that only supports the ETH protocol.
35    EthOnly(Box<EthPeerConnection<N>>),
36    /// A connection that supports the ETH protocol and __at least one other__ `RLPx` protocol.
37    Satellite(Box<EthSatelliteConnection<N>>),
38}
39
40impl<N: NetworkPrimitives> EthRlpxConnection<N> {
41    /// Returns the negotiated ETH version.
42    #[inline]
43    pub(crate) const fn version(&self) -> EthVersion {
44        match self {
45            Self::EthOnly(conn) => conn.version(),
46            Self::Satellite(conn) => conn.primary().version(),
47        }
48    }
49
50    /// Consumes this type and returns the wrapped [`P2PStream`].
51    #[inline]
52    pub(crate) fn into_inner(self) -> P2PStream<ECIESStream<TcpStream>> {
53        match self {
54            Self::EthOnly(conn) => conn.into_inner(),
55            Self::Satellite(conn) => conn.into_inner(),
56        }
57    }
58
59    /// Returns mutable access to the underlying stream.
60    #[inline]
61    pub(crate) fn inner_mut(&mut self) -> &mut P2PStream<ECIESStream<TcpStream>> {
62        match self {
63            Self::EthOnly(conn) => conn.inner_mut(),
64            Self::Satellite(conn) => conn.inner_mut(),
65        }
66    }
67
68    /// Returns access to the underlying stream.
69    #[inline]
70    pub(crate) const fn inner(&self) -> &P2PStream<ECIESStream<TcpStream>> {
71        match self {
72            Self::EthOnly(conn) => conn.inner(),
73            Self::Satellite(conn) => conn.inner(),
74        }
75    }
76
77    /// Same as [`Sink::start_send`] but accepts a [`EthBroadcastMessage`] instead.
78    #[inline]
79    pub fn start_send_broadcast(
80        &mut self,
81        item: EthBroadcastMessage<N>,
82    ) -> Result<(), EthStreamError> {
83        match self {
84            Self::EthOnly(conn) => conn.start_send_broadcast(item),
85            Self::Satellite(conn) => conn.primary_mut().start_send_broadcast(item),
86        }
87    }
88
89    /// Sends a raw capability message over the connection
90    pub fn start_send_raw(&mut self, msg: RawCapabilityMessage) -> Result<(), EthStreamError> {
91        match self {
92            Self::EthOnly(conn) => conn.start_send_raw(msg),
93            Self::Satellite(conn) => conn.primary_mut().start_send_raw(msg),
94        }
95    }
96
97    /// Sets whether to reject block announcement messages (`NewBlock`, `NewBlockHashes`) before
98    /// RLP decoding to avoid memory amplification from deserializing blocks that will be discarded.
99    pub fn set_reject_block_announcements(&mut self, reject: bool) {
100        match self {
101            Self::EthOnly(conn) => conn.set_reject_block_announcements(reject),
102            Self::Satellite(conn) => conn.primary_mut().set_reject_block_announcements(reject),
103        }
104    }
105}
106
107impl<N: NetworkPrimitives> From<EthPeerConnection<N>> for EthRlpxConnection<N> {
108    #[inline]
109    fn from(conn: EthPeerConnection<N>) -> Self {
110        Self::EthOnly(Box::new(conn))
111    }
112}
113
114impl<N: NetworkPrimitives> From<EthSatelliteConnection<N>> for EthRlpxConnection<N> {
115    #[inline]
116    fn from(conn: EthSatelliteConnection<N>) -> Self {
117        Self::Satellite(Box::new(conn))
118    }
119}
120
121macro_rules! delegate_call {
122    ($self:ident.$method:ident($($args:ident),+)) => {
123        unsafe {
124            match $self.get_unchecked_mut() {
125                Self::EthOnly(l) => Pin::new_unchecked(l).$method($($args),+),
126                Self::Satellite(r) => Pin::new_unchecked(r).$method($($args),+),
127            }
128        }
129    }
130}
131
132impl<N: NetworkPrimitives> Stream for EthRlpxConnection<N> {
133    type Item = Result<EthMessage<N>, EthStreamError>;
134
135    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
136        delegate_call!(self.poll_next(cx))
137    }
138}
139
140impl<N: NetworkPrimitives> Sink<EthMessage<N>> for EthRlpxConnection<N> {
141    type Error = EthStreamError;
142
143    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
144        delegate_call!(self.poll_ready(cx))
145    }
146
147    fn start_send(self: Pin<&mut Self>, item: EthMessage<N>) -> Result<(), Self::Error> {
148        delegate_call!(self.start_send(item))
149    }
150
151    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
152        delegate_call!(self.poll_flush(cx))
153    }
154
155    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
156        delegate_call!(self.poll_close(cx))
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163
164    const fn assert_eth_stream<N, St>()
165    where
166        N: NetworkPrimitives,
167        St: Stream<Item = Result<EthMessage<N>, EthStreamError>> + Sink<EthMessage<N>>,
168    {
169    }
170
171    #[test]
172    const fn test_eth_stream_variants() {
173        assert_eth_stream::<EthNetworkPrimitives, EthSatelliteConnection<EthNetworkPrimitives>>();
174        assert_eth_stream::<EthNetworkPrimitives, EthRlpxConnection<EthNetworkPrimitives>>();
175    }
176}