reth_network/session/
conn.rs1use futures::{Sink, Stream};
4use reth_ecies::stream::ECIESStream;
5use reth_eth_wire::{
6 errors::EthStreamError,
7 message::EthBroadcastMessage,
8 multiplex::{ProtocolProxy, RlpxSatelliteStream},
9 EthMessage, EthNetworkPrimitives, EthStream, EthVersion, NetworkPrimitives, P2PStream,
10};
11use reth_eth_wire_types::RawCapabilityMessage;
12use std::{
13 pin::Pin,
14 task::{Context, Poll},
15};
16use tokio::net::TcpStream;
17
18pub type EthPeerConnection<N> = EthStream<P2PStream<ECIESStream<TcpStream>>, N>;
20
21pub type EthSatelliteConnection<N = EthNetworkPrimitives> =
23 RlpxSatelliteStream<ECIESStream<TcpStream>, EthStream<ProtocolProxy, N>>;
24
25#[derive(Debug)]
33pub enum EthRlpxConnection<N: NetworkPrimitives = EthNetworkPrimitives> {
34 EthOnly(Box<EthPeerConnection<N>>),
36 Satellite(Box<EthSatelliteConnection<N>>),
38}
39
40impl<N: NetworkPrimitives> EthRlpxConnection<N> {
41 #[inline]
43 pub(crate) const fn version(&self) -> EthVersion {
44 match self {
45 Self::EthOnly(conn) => conn.version(),
46 Self::Satellite(conn) => conn.primary().version(),
47 }
48 }
49
50 #[inline]
52 pub(crate) fn into_inner(self) -> P2PStream<ECIESStream<TcpStream>> {
53 match self {
54 Self::EthOnly(conn) => conn.into_inner(),
55 Self::Satellite(conn) => conn.into_inner(),
56 }
57 }
58
59 #[inline]
61 pub(crate) fn inner_mut(&mut self) -> &mut P2PStream<ECIESStream<TcpStream>> {
62 match self {
63 Self::EthOnly(conn) => conn.inner_mut(),
64 Self::Satellite(conn) => conn.inner_mut(),
65 }
66 }
67
68 #[inline]
70 pub(crate) const fn inner(&self) -> &P2PStream<ECIESStream<TcpStream>> {
71 match self {
72 Self::EthOnly(conn) => conn.inner(),
73 Self::Satellite(conn) => conn.inner(),
74 }
75 }
76
77 #[inline]
79 pub fn start_send_broadcast(
80 &mut self,
81 item: EthBroadcastMessage<N>,
82 ) -> Result<(), EthStreamError> {
83 match self {
84 Self::EthOnly(conn) => conn.start_send_broadcast(item),
85 Self::Satellite(conn) => conn.primary_mut().start_send_broadcast(item),
86 }
87 }
88
89 pub fn start_send_raw(&mut self, msg: RawCapabilityMessage) -> Result<(), EthStreamError> {
91 match self {
92 Self::EthOnly(conn) => conn.start_send_raw(msg),
93 Self::Satellite(conn) => conn.primary_mut().start_send_raw(msg),
94 }
95 }
96
97 pub fn set_reject_block_announcements(&mut self, reject: bool) {
100 match self {
101 Self::EthOnly(conn) => conn.set_reject_block_announcements(reject),
102 Self::Satellite(conn) => conn.primary_mut().set_reject_block_announcements(reject),
103 }
104 }
105}
106
107impl<N: NetworkPrimitives> From<EthPeerConnection<N>> for EthRlpxConnection<N> {
108 #[inline]
109 fn from(conn: EthPeerConnection<N>) -> Self {
110 Self::EthOnly(Box::new(conn))
111 }
112}
113
114impl<N: NetworkPrimitives> From<EthSatelliteConnection<N>> for EthRlpxConnection<N> {
115 #[inline]
116 fn from(conn: EthSatelliteConnection<N>) -> Self {
117 Self::Satellite(Box::new(conn))
118 }
119}
120
121macro_rules! delegate_call {
122 ($self:ident.$method:ident($($args:ident),+)) => {
123 unsafe {
124 match $self.get_unchecked_mut() {
125 Self::EthOnly(l) => Pin::new_unchecked(l).$method($($args),+),
126 Self::Satellite(r) => Pin::new_unchecked(r).$method($($args),+),
127 }
128 }
129 }
130}
131
132impl<N: NetworkPrimitives> Stream for EthRlpxConnection<N> {
133 type Item = Result<EthMessage<N>, EthStreamError>;
134
135 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
136 delegate_call!(self.poll_next(cx))
137 }
138}
139
140impl<N: NetworkPrimitives> Sink<EthMessage<N>> for EthRlpxConnection<N> {
141 type Error = EthStreamError;
142
143 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
144 delegate_call!(self.poll_ready(cx))
145 }
146
147 fn start_send(self: Pin<&mut Self>, item: EthMessage<N>) -> Result<(), Self::Error> {
148 delegate_call!(self.start_send(item))
149 }
150
151 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
152 delegate_call!(self.poll_flush(cx))
153 }
154
155 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
156 delegate_call!(self.poll_close(cx))
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163
164 const fn assert_eth_stream<N, St>()
165 where
166 N: NetworkPrimitives,
167 St: Stream<Item = Result<EthMessage<N>, EthStreamError>> + Sink<EthMessage<N>>,
168 {
169 }
170
171 #[test]
172 const fn test_eth_stream_variants() {
173 assert_eth_stream::<EthNetworkPrimitives, EthSatelliteConnection<EthNetworkPrimitives>>();
174 assert_eth_stream::<EthNetworkPrimitives, EthRlpxConnection<EthNetworkPrimitives>>();
175 }
176}