reth_network/
protocol.rs

1//! Support for handling additional RLPx-based application-level protocols.
2//!
3//! See also <https://github.com/ethereum/devp2p/blob/master/README.md>
4
5use alloy_primitives::bytes::BytesMut;
6use futures::Stream;
7use reth_eth_wire::{
8    capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol,
9};
10use reth_network_api::{Direction, PeerId};
11use std::{
12    fmt,
13    net::SocketAddr,
14    ops::{Deref, DerefMut},
15    pin::Pin,
16};
17
18/// A trait that allows to offer additional RLPx-based application-level protocols when establishing
19/// a peer-to-peer connection.
20pub trait ProtocolHandler: fmt::Debug + Send + Sync + 'static {
21    /// The type responsible for negotiating the protocol with the remote.
22    type ConnectionHandler: ConnectionHandler;
23
24    /// Invoked when a new incoming connection from the remote is requested
25    ///
26    /// If protocols for this outgoing should be announced to the remote, return a connection
27    /// handler.
28    fn on_incoming(&self, socket_addr: SocketAddr) -> Option<Self::ConnectionHandler>;
29
30    /// Invoked when a new outgoing connection to the remote is requested.
31    ///
32    /// If protocols for this outgoing should be announced to the remote, return a connection
33    /// handler.
34    fn on_outgoing(
35        &self,
36        socket_addr: SocketAddr,
37        peer_id: PeerId,
38    ) -> Option<Self::ConnectionHandler>;
39}
40
41/// A trait that allows to authenticate a protocol after the `RLPx` connection was established.
42pub trait ConnectionHandler: Send + Sync + 'static {
43    /// The connection that yields messages to send to the remote.
44    ///
45    /// The connection will be closed when this stream resolves.
46    type Connection: Stream<Item = BytesMut> + Send + 'static;
47
48    /// Returns the protocol to announce when the `RLPx` connection will be established.
49    ///
50    /// This will be negotiated with the remote peer.
51    fn protocol(&self) -> Protocol;
52
53    /// Invoked when the `RLPx` connection has been established by the peer does not share the
54    /// protocol.
55    fn on_unsupported_by_peer(
56        self,
57        supported: &SharedCapabilities,
58        direction: Direction,
59        peer_id: PeerId,
60    ) -> OnNotSupported;
61
62    /// Invoked when the `RLPx` connection was established.
63    ///
64    /// The returned future should resolve when the connection should disconnect.
65    fn into_connection(
66        self,
67        direction: Direction,
68        peer_id: PeerId,
69        conn: ProtocolConnection,
70    ) -> Self::Connection;
71}
72
73/// What to do when a protocol is not supported by the remote.
74#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
75pub enum OnNotSupported {
76    /// Proceed with the connection and ignore the protocol.
77    #[default]
78    KeepAlive,
79    /// Disconnect the connection.
80    Disconnect,
81}
82
83/// A wrapper type for a `RLPx` sub-protocol.
84#[derive(Debug)]
85pub struct RlpxSubProtocol(Box<dyn DynProtocolHandler>);
86
87/// A helper trait to convert a [`ProtocolHandler`] into a dynamic type
88pub trait IntoRlpxSubProtocol {
89    /// Converts the type into a [`RlpxSubProtocol`].
90    fn into_rlpx_sub_protocol(self) -> RlpxSubProtocol;
91}
92
93impl<T> IntoRlpxSubProtocol for T
94where
95    T: ProtocolHandler + Send + Sync + 'static,
96{
97    fn into_rlpx_sub_protocol(self) -> RlpxSubProtocol {
98        RlpxSubProtocol(Box::new(self))
99    }
100}
101
102impl IntoRlpxSubProtocol for RlpxSubProtocol {
103    fn into_rlpx_sub_protocol(self) -> RlpxSubProtocol {
104        self
105    }
106}
107
108/// Additional RLPx-based sub-protocols.
109#[derive(Debug, Default)]
110pub struct RlpxSubProtocols {
111    /// All extra protocols
112    protocols: Vec<RlpxSubProtocol>,
113}
114
115impl RlpxSubProtocols {
116    /// Adds a new protocol.
117    pub fn push(&mut self, protocol: impl IntoRlpxSubProtocol) {
118        self.protocols.push(protocol.into_rlpx_sub_protocol());
119    }
120
121    /// Returns all additional protocol handlers that should be announced to the remote during the
122    /// Rlpx handshake on an incoming connection.
123    pub(crate) fn on_incoming(&self, socket_addr: SocketAddr) -> RlpxSubProtocolHandlers {
124        RlpxSubProtocolHandlers(
125            self.protocols
126                .iter()
127                .filter_map(|protocol| protocol.0.on_incoming(socket_addr))
128                .collect(),
129        )
130    }
131
132    /// Returns all additional protocol handlers that should be announced to the remote during the
133    /// Rlpx handshake on an outgoing connection.
134    pub(crate) fn on_outgoing(
135        &self,
136        socket_addr: SocketAddr,
137        peer_id: PeerId,
138    ) -> RlpxSubProtocolHandlers {
139        RlpxSubProtocolHandlers(
140            self.protocols
141                .iter()
142                .filter_map(|protocol| protocol.0.on_outgoing(socket_addr, peer_id))
143                .collect(),
144        )
145    }
146}
147
148/// A set of additional RLPx-based sub-protocol connection handlers.
149#[derive(Default)]
150pub(crate) struct RlpxSubProtocolHandlers(pub(crate) Vec<Box<dyn DynConnectionHandler>>);
151
152impl RlpxSubProtocolHandlers {
153    /// Returns all handlers.
154    pub(crate) fn into_iter(self) -> impl Iterator<Item = Box<dyn DynConnectionHandler>> {
155        self.0.into_iter()
156    }
157}
158
159impl Deref for RlpxSubProtocolHandlers {
160    type Target = Vec<Box<dyn DynConnectionHandler>>;
161
162    fn deref(&self) -> &Self::Target {
163        &self.0
164    }
165}
166
167impl DerefMut for RlpxSubProtocolHandlers {
168    fn deref_mut(&mut self) -> &mut Self::Target {
169        &mut self.0
170    }
171}
172
173pub(crate) trait DynProtocolHandler: fmt::Debug + Send + Sync + 'static {
174    fn on_incoming(&self, socket_addr: SocketAddr) -> Option<Box<dyn DynConnectionHandler>>;
175
176    fn on_outgoing(
177        &self,
178        socket_addr: SocketAddr,
179        peer_id: PeerId,
180    ) -> Option<Box<dyn DynConnectionHandler>>;
181}
182
183impl<T: ProtocolHandler> DynProtocolHandler for T {
184    fn on_incoming(&self, socket_addr: SocketAddr) -> Option<Box<dyn DynConnectionHandler>> {
185        T::on_incoming(self, socket_addr)
186            .map(|handler| Box::new(handler) as Box<dyn DynConnectionHandler>)
187    }
188
189    fn on_outgoing(
190        &self,
191        socket_addr: SocketAddr,
192        peer_id: PeerId,
193    ) -> Option<Box<dyn DynConnectionHandler>> {
194        T::on_outgoing(self, socket_addr, peer_id)
195            .map(|handler| Box::new(handler) as Box<dyn DynConnectionHandler>)
196    }
197}
198
199/// Wrapper trait for internal ease of use.
200pub(crate) trait DynConnectionHandler: Send + Sync + 'static {
201    fn protocol(&self) -> Protocol;
202
203    fn on_unsupported_by_peer(
204        self: Box<Self>,
205        supported: &SharedCapabilities,
206        direction: Direction,
207        peer_id: PeerId,
208    ) -> OnNotSupported;
209
210    fn into_connection(
211        self: Box<Self>,
212        direction: Direction,
213        peer_id: PeerId,
214        conn: ProtocolConnection,
215    ) -> Pin<Box<dyn Stream<Item = BytesMut> + Send + 'static>>;
216}
217
218impl<T: ConnectionHandler> DynConnectionHandler for T {
219    fn protocol(&self) -> Protocol {
220        T::protocol(self)
221    }
222
223    fn on_unsupported_by_peer(
224        self: Box<Self>,
225        supported: &SharedCapabilities,
226        direction: Direction,
227        peer_id: PeerId,
228    ) -> OnNotSupported {
229        T::on_unsupported_by_peer(*self, supported, direction, peer_id)
230    }
231
232    fn into_connection(
233        self: Box<Self>,
234        direction: Direction,
235        peer_id: PeerId,
236        conn: ProtocolConnection,
237    ) -> Pin<Box<dyn Stream<Item = BytesMut> + Send + 'static>> {
238        Box::pin(T::into_connection(*self, direction, peer_id, conn))
239    }
240}