1use crate::{
2 listener::{ConnectionListener, ListenerEvent},
3 message::PeerMessage,
4 peers::InboundConnectionError,
5 protocol::IntoRlpxSubProtocol,
6 session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager},
7 state::{NetworkState, StateAction},
8};
9use futures::Stream;
10use reth_eth_wire::{
11 errors::EthStreamError, Capabilities, DisconnectReason, EthNetworkPrimitives, EthVersion,
12 NetworkPrimitives, UnifiedStatus,
13};
14use reth_network_api::{PeerRequest, PeerRequestSender};
15use reth_network_peers::PeerId;
16use std::{
17 io,
18 net::SocketAddr,
19 pin::Pin,
20 sync::Arc,
21 task::{Context, Poll},
22};
23use tracing::trace;
24
25#[cfg_attr(doc, aquamarine::aquamarine)]
26#[derive(Debug)]
50#[must_use = "Swarm does nothing unless polled"]
51pub(crate) struct Swarm<N: NetworkPrimitives = EthNetworkPrimitives> {
52 incoming: ConnectionListener,
54 sessions: SessionManager<N>,
56 state: NetworkState<N>,
58}
59
60impl<N: NetworkPrimitives> Swarm<N> {
63 pub(crate) const fn new(
65 incoming: ConnectionListener,
66 sessions: SessionManager<N>,
67 state: NetworkState<N>,
68 ) -> Self {
69 Self { incoming, sessions, state }
70 }
71
72 pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
74 self.sessions_mut().add_rlpx_sub_protocol(protocol);
75 }
76
77 pub(crate) const fn state(&self) -> &NetworkState<N> {
79 &self.state
80 }
81
82 pub(crate) const fn state_mut(&mut self) -> &mut NetworkState<N> {
84 &mut self.state
85 }
86
87 pub(crate) const fn listener(&self) -> &ConnectionListener {
89 &self.incoming
90 }
91
92 pub(crate) const fn sessions(&self) -> &SessionManager<N> {
94 &self.sessions
95 }
96
97 pub(crate) const fn sessions_mut(&mut self) -> &mut SessionManager<N> {
99 &mut self.sessions
100 }
101}
102
103impl<N: NetworkPrimitives> Swarm<N> {
104 pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_id: PeerId) {
106 self.sessions.dial_outbound(remote_addr, remote_id)
107 }
108
109 fn on_session_event(&mut self, event: SessionEvent<N>) -> Option<SwarmEvent<N>> {
114 match event {
115 SessionEvent::SessionEstablished {
116 peer_id,
117 remote_addr,
118 client_version,
119 capabilities,
120 version,
121 status,
122 messages,
123 direction,
124 timeout,
125 range_info,
126 } => {
127 self.state.on_session_activated(
128 peer_id,
129 capabilities.clone(),
130 status.clone(),
131 messages.clone(),
132 timeout,
133 range_info,
134 );
135 Some(SwarmEvent::SessionEstablished {
136 peer_id,
137 remote_addr,
138 client_version,
139 capabilities,
140 version,
141 messages,
142 status,
143 direction,
144 })
145 }
146 SessionEvent::AlreadyConnected { peer_id, remote_addr, direction } => {
147 trace!(target: "net", ?peer_id, ?remote_addr, ?direction, "already connected");
148 self.state.peers_mut().on_already_connected(direction);
149 None
150 }
151 SessionEvent::ValidMessage { peer_id, message } => {
152 Some(SwarmEvent::ValidMessage { peer_id, message })
153 }
154 SessionEvent::IncomingPendingSessionClosed { remote_addr, error } => {
155 Some(SwarmEvent::IncomingPendingSessionClosed { remote_addr, error })
156 }
157 SessionEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
158 Some(SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error })
159 }
160 SessionEvent::Disconnected { peer_id, remote_addr } => {
161 self.state.on_session_closed(peer_id);
162 Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: None })
163 }
164 SessionEvent::SessionClosedOnConnectionError { peer_id, remote_addr, error } => {
165 self.state.on_session_closed(peer_id);
166 Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: Some(error) })
167 }
168 SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
169 Some(SwarmEvent::OutgoingConnectionError { peer_id, remote_addr, error })
170 }
171 SessionEvent::BadMessage { peer_id } => Some(SwarmEvent::BadMessage { peer_id }),
172 SessionEvent::ProtocolBreach { peer_id } => {
173 Some(SwarmEvent::ProtocolBreach { peer_id })
174 }
175 }
176 }
177
178 fn on_connection(&mut self, event: ListenerEvent) -> Option<SwarmEvent<N>> {
182 match event {
183 ListenerEvent::Error(err) => return Some(SwarmEvent::TcpListenerError(err)),
184 ListenerEvent::ListenerClosed { local_address: address } => {
185 return Some(SwarmEvent::TcpListenerClosed { remote_addr: address })
186 }
187 ListenerEvent::Incoming { stream, remote_addr } => {
188 if self.is_shutting_down() {
190 return None
191 }
192 if let Err(err) =
194 self.state_mut().peers_mut().on_incoming_pending_session(remote_addr.ip())
195 {
196 match err {
197 InboundConnectionError::IpBanned => {
198 trace!(target: "net", ?remote_addr, "The incoming ip address is in the ban list");
199 }
200 InboundConnectionError::ExceedsCapacity => {
201 trace!(target: "net", ?remote_addr, "No capacity for incoming connection");
202 self.sessions.try_disconnect_incoming_connection(
203 stream,
204 DisconnectReason::TooManyPeers,
205 );
206 }
207 }
208 return None
209 }
210
211 match self.sessions.on_incoming(stream, remote_addr) {
212 Ok(session_id) => {
213 trace!(target: "net", ?remote_addr, "Incoming connection");
214 return Some(SwarmEvent::IncomingTcpConnection { session_id, remote_addr })
215 }
216 Err(err) => {
217 trace!(target: "net", %err, "Incoming connection rejected, capacity already reached.");
218 self.state_mut()
219 .peers_mut()
220 .on_incoming_pending_session_rejected_internally();
221 }
222 }
223 }
224 }
225 None
226 }
227
228 fn on_state_action(&mut self, event: StateAction<N>) -> Option<SwarmEvent<N>> {
230 match event {
231 StateAction::Connect { remote_addr, peer_id } => {
232 self.dial_outbound(remote_addr, peer_id);
233 return Some(SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id })
234 }
235 StateAction::Disconnect { peer_id, reason } => {
236 self.sessions.disconnect(peer_id, reason);
237 }
238 StateAction::NewBlock { peer_id, block: msg } => {
239 let msg = PeerMessage::NewBlock(msg);
240 self.sessions.send_message(&peer_id, msg);
241 }
242 StateAction::NewBlockHashes { peer_id, hashes } => {
243 let msg = PeerMessage::NewBlockHashes(hashes);
244 self.sessions.send_message(&peer_id, msg);
245 }
246 StateAction::PeerAdded(peer_id) => return Some(SwarmEvent::PeerAdded(peer_id)),
247 StateAction::PeerRemoved(peer_id) => return Some(SwarmEvent::PeerRemoved(peer_id)),
248 StateAction::DiscoveredNode { peer_id, addr, fork_id } => {
249 if self.is_shutting_down() {
250 return None
251 }
252
253 let enforce = self.state().peers().enforce_enr_fork_id();
260 let allow = match fork_id {
261 Some(f) => self.sessions.is_valid_fork_id(f),
262 None => !enforce,
263 };
264 if allow {
265 self.state_mut().peers_mut().add_peer(peer_id, addr, fork_id);
266 }
267 }
268 StateAction::DiscoveredEnrForkId { peer_id, addr, fork_id } => {
269 if self.sessions.is_valid_fork_id(fork_id) {
270 self.state_mut().peers_mut().add_peer(peer_id, addr, Some(fork_id));
271 } else {
272 trace!(target: "net", ?peer_id, remote_fork_id=?fork_id, our_fork_id=?self.sessions.fork_id(), "fork id mismatch, removing peer");
273 self.state_mut().peers_mut().remove_peer(peer_id);
274 }
275 }
276 }
277 None
278 }
279
280 pub(crate) const fn on_shutdown_requested(&mut self) {
282 self.state_mut().peers_mut().on_shutdown();
283 }
284
285 #[inline]
287 pub(crate) const fn is_shutting_down(&self) -> bool {
288 self.state().peers().connection_state().is_shutting_down()
289 }
290
291 pub(crate) const fn on_network_state_change(&mut self, network_state: NetworkConnectionState) {
293 self.state_mut().peers_mut().on_network_state_change(network_state);
294 }
295}
296
297impl<N: NetworkPrimitives> Stream for Swarm<N> {
298 type Item = SwarmEvent<N>;
299
300 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
308 let this = self.get_mut();
309
310 loop {
314 while let Poll::Ready(action) = this.state.poll(cx) {
315 if let Some(event) = this.on_state_action(action) {
316 return Poll::Ready(Some(event))
317 }
318 }
319
320 match this.sessions.poll(cx) {
322 Poll::Pending => {}
323 Poll::Ready(event) => {
324 if let Some(event) = this.on_session_event(event) {
325 return Poll::Ready(Some(event))
326 }
327 continue
328 }
329 }
330
331 match Pin::new(&mut this.incoming).poll(cx) {
333 Poll::Pending => {}
334 Poll::Ready(event) => {
335 if let Some(event) = this.on_connection(event) {
336 return Poll::Ready(Some(event))
337 }
338 continue
339 }
340 }
341
342 return Poll::Pending
343 }
344 }
345}
346
347pub(crate) enum SwarmEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
350 ValidMessage {
352 peer_id: PeerId,
354 message: PeerMessage<N>,
356 },
357 BadMessage {
359 peer_id: PeerId,
361 },
362 ProtocolBreach {
364 peer_id: PeerId,
366 },
367 TcpListenerClosed {
369 remote_addr: SocketAddr,
371 },
372 TcpListenerError(io::Error),
374 IncomingTcpConnection {
379 session_id: SessionId,
381 remote_addr: SocketAddr,
383 },
384 OutgoingTcpConnection {
386 peer_id: PeerId,
388 remote_addr: SocketAddr,
389 },
390 SessionEstablished {
391 peer_id: PeerId,
392 remote_addr: SocketAddr,
393 client_version: Arc<str>,
394 capabilities: Arc<Capabilities>,
395 version: EthVersion,
397 messages: PeerRequestSender<PeerRequest<N>>,
398 status: Arc<UnifiedStatus>,
399 direction: Direction,
400 },
401 SessionClosed {
402 peer_id: PeerId,
403 remote_addr: SocketAddr,
404 error: Option<EthStreamError>,
406 },
407 PeerAdded(PeerId),
409 PeerRemoved(PeerId),
411 IncomingPendingSessionClosed {
413 remote_addr: SocketAddr,
414 error: Option<PendingSessionHandshakeError>,
415 },
416 OutgoingPendingSessionClosed {
418 remote_addr: SocketAddr,
419 peer_id: PeerId,
420 error: Option<PendingSessionHandshakeError>,
421 },
422 OutgoingConnectionError { remote_addr: SocketAddr, peer_id: PeerId, error: io::Error },
424}
425
426#[derive(Debug, Default)]
431pub enum NetworkConnectionState {
432 #[default]
434 Active,
435 ShuttingDown,
437 Hibernate,
439}
440
441impl NetworkConnectionState {
442 pub(crate) const fn is_active(&self) -> bool {
444 matches!(self, Self::Active)
445 }
446
447 pub(crate) const fn is_shutting_down(&self) -> bool {
449 matches!(self, Self::ShuttingDown)
450 }
451}