reth_network/
protocol.rs
1use alloy_primitives::bytes::BytesMut;
6use futures::Stream;
7use reth_eth_wire::{
8 capability::SharedCapabilities, multiplex::ProtocolConnection, protocol::Protocol,
9};
10use reth_network_api::{Direction, PeerId};
11use std::{
12 fmt,
13 net::SocketAddr,
14 ops::{Deref, DerefMut},
15 pin::Pin,
16};
17
18pub trait ProtocolHandler: fmt::Debug + Send + Sync + 'static {
21 type ConnectionHandler: ConnectionHandler;
23
24 fn on_incoming(&self, socket_addr: SocketAddr) -> Option<Self::ConnectionHandler>;
29
30 fn on_outgoing(
35 &self,
36 socket_addr: SocketAddr,
37 peer_id: PeerId,
38 ) -> Option<Self::ConnectionHandler>;
39}
40
41pub trait ConnectionHandler: Send + Sync + 'static {
43 type Connection: Stream<Item = BytesMut> + Send + 'static;
47
48 fn protocol(&self) -> Protocol;
52
53 fn on_unsupported_by_peer(
56 self,
57 supported: &SharedCapabilities,
58 direction: Direction,
59 peer_id: PeerId,
60 ) -> OnNotSupported;
61
62 fn into_connection(
66 self,
67 direction: Direction,
68 peer_id: PeerId,
69 conn: ProtocolConnection,
70 ) -> Self::Connection;
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
75pub enum OnNotSupported {
76 #[default]
78 KeepAlive,
79 Disconnect,
81}
82
83#[derive(Debug)]
85pub struct RlpxSubProtocol(Box<dyn DynProtocolHandler>);
86
87pub trait IntoRlpxSubProtocol {
89 fn into_rlpx_sub_protocol(self) -> RlpxSubProtocol;
91}
92
93impl<T> IntoRlpxSubProtocol for T
94where
95 T: ProtocolHandler + Send + Sync + 'static,
96{
97 fn into_rlpx_sub_protocol(self) -> RlpxSubProtocol {
98 RlpxSubProtocol(Box::new(self))
99 }
100}
101
102impl IntoRlpxSubProtocol for RlpxSubProtocol {
103 fn into_rlpx_sub_protocol(self) -> RlpxSubProtocol {
104 self
105 }
106}
107
108#[derive(Debug, Default)]
110pub struct RlpxSubProtocols {
111 protocols: Vec<RlpxSubProtocol>,
113}
114
115impl RlpxSubProtocols {
116 pub fn push(&mut self, protocol: impl IntoRlpxSubProtocol) {
118 self.protocols.push(protocol.into_rlpx_sub_protocol());
119 }
120
121 pub(crate) fn on_incoming(&self, socket_addr: SocketAddr) -> RlpxSubProtocolHandlers {
124 RlpxSubProtocolHandlers(
125 self.protocols
126 .iter()
127 .filter_map(|protocol| protocol.0.on_incoming(socket_addr))
128 .collect(),
129 )
130 }
131
132 pub(crate) fn on_outgoing(
135 &self,
136 socket_addr: SocketAddr,
137 peer_id: PeerId,
138 ) -> RlpxSubProtocolHandlers {
139 RlpxSubProtocolHandlers(
140 self.protocols
141 .iter()
142 .filter_map(|protocol| protocol.0.on_outgoing(socket_addr, peer_id))
143 .collect(),
144 )
145 }
146}
147
148#[derive(Default)]
150pub(crate) struct RlpxSubProtocolHandlers(pub(crate) Vec<Box<dyn DynConnectionHandler>>);
151
152impl RlpxSubProtocolHandlers {
153 pub(crate) fn into_iter(self) -> impl Iterator<Item = Box<dyn DynConnectionHandler>> {
155 self.0.into_iter()
156 }
157}
158
159impl Deref for RlpxSubProtocolHandlers {
160 type Target = Vec<Box<dyn DynConnectionHandler>>;
161
162 fn deref(&self) -> &Self::Target {
163 &self.0
164 }
165}
166
167impl DerefMut for RlpxSubProtocolHandlers {
168 fn deref_mut(&mut self) -> &mut Self::Target {
169 &mut self.0
170 }
171}
172
173pub(crate) trait DynProtocolHandler: fmt::Debug + Send + Sync + 'static {
174 fn on_incoming(&self, socket_addr: SocketAddr) -> Option<Box<dyn DynConnectionHandler>>;
175
176 fn on_outgoing(
177 &self,
178 socket_addr: SocketAddr,
179 peer_id: PeerId,
180 ) -> Option<Box<dyn DynConnectionHandler>>;
181}
182
183impl<T: ProtocolHandler> DynProtocolHandler for T {
184 fn on_incoming(&self, socket_addr: SocketAddr) -> Option<Box<dyn DynConnectionHandler>> {
185 T::on_incoming(self, socket_addr)
186 .map(|handler| Box::new(handler) as Box<dyn DynConnectionHandler>)
187 }
188
189 fn on_outgoing(
190 &self,
191 socket_addr: SocketAddr,
192 peer_id: PeerId,
193 ) -> Option<Box<dyn DynConnectionHandler>> {
194 T::on_outgoing(self, socket_addr, peer_id)
195 .map(|handler| Box::new(handler) as Box<dyn DynConnectionHandler>)
196 }
197}
198
199pub(crate) trait DynConnectionHandler: Send + Sync + 'static {
201 fn protocol(&self) -> Protocol;
202
203 fn on_unsupported_by_peer(
204 self: Box<Self>,
205 supported: &SharedCapabilities,
206 direction: Direction,
207 peer_id: PeerId,
208 ) -> OnNotSupported;
209
210 fn into_connection(
211 self: Box<Self>,
212 direction: Direction,
213 peer_id: PeerId,
214 conn: ProtocolConnection,
215 ) -> Pin<Box<dyn Stream<Item = BytesMut> + Send + 'static>>;
216}
217
218impl<T: ConnectionHandler> DynConnectionHandler for T {
219 fn protocol(&self) -> Protocol {
220 T::protocol(self)
221 }
222
223 fn on_unsupported_by_peer(
224 self: Box<Self>,
225 supported: &SharedCapabilities,
226 direction: Direction,
227 peer_id: PeerId,
228 ) -> OnNotSupported {
229 T::on_unsupported_by_peer(*self, supported, direction, peer_id)
230 }
231
232 fn into_connection(
233 self: Box<Self>,
234 direction: Direction,
235 peer_id: PeerId,
236 conn: ProtocolConnection,
237 ) -> Pin<Box<dyn Stream<Item = BytesMut> + Send + 'static>> {
238 Box::pin(T::into_connection(*self, direction, peer_id, conn))
239 }
240}