1use crate::{
2 listener::{ConnectionListener, ListenerEvent},
3 message::PeerMessage,
4 peers::InboundConnectionError,
5 protocol::IntoRlpxSubProtocol,
6 session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager},
7 state::{NetworkState, StateAction},
8};
9use futures::Stream;
10use reth_eth_wire::{
11 errors::EthStreamError, Capabilities, DisconnectReason, EthNetworkPrimitives, EthVersion,
12 NetworkPrimitives, Status,
13};
14use reth_network_api::{PeerRequest, PeerRequestSender};
15use reth_network_peers::PeerId;
16use std::{
17 io,
18 net::SocketAddr,
19 pin::Pin,
20 sync::Arc,
21 task::{Context, Poll},
22};
23use tracing::trace;
24
25#[cfg_attr(doc, aquamarine::aquamarine)]
26#[derive(Debug)]
50#[must_use = "Swarm does nothing unless polled"]
51pub(crate) struct Swarm<N: NetworkPrimitives = EthNetworkPrimitives> {
52 incoming: ConnectionListener,
54 sessions: SessionManager<N>,
56 state: NetworkState<N>,
58}
59
60impl<N: NetworkPrimitives> Swarm<N> {
63 pub(crate) const fn new(
65 incoming: ConnectionListener,
66 sessions: SessionManager<N>,
67 state: NetworkState<N>,
68 ) -> Self {
69 Self { incoming, sessions, state }
70 }
71
72 pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
74 self.sessions_mut().add_rlpx_sub_protocol(protocol);
75 }
76
77 pub(crate) const fn state(&self) -> &NetworkState<N> {
79 &self.state
80 }
81
82 pub(crate) const fn state_mut(&mut self) -> &mut NetworkState<N> {
84 &mut self.state
85 }
86
87 pub(crate) const fn listener(&self) -> &ConnectionListener {
89 &self.incoming
90 }
91
92 pub(crate) const fn sessions(&self) -> &SessionManager<N> {
94 &self.sessions
95 }
96
97 pub(crate) const fn sessions_mut(&mut self) -> &mut SessionManager<N> {
99 &mut self.sessions
100 }
101}
102
103impl<N: NetworkPrimitives> Swarm<N> {
104 pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_id: PeerId) {
106 self.sessions.dial_outbound(remote_addr, remote_id)
107 }
108
109 fn on_session_event(&mut self, event: SessionEvent<N>) -> Option<SwarmEvent<N>> {
114 match event {
115 SessionEvent::SessionEstablished {
116 peer_id,
117 remote_addr,
118 client_version,
119 capabilities,
120 version,
121 status,
122 messages,
123 direction,
124 timeout,
125 } => {
126 self.state.on_session_activated(
127 peer_id,
128 capabilities.clone(),
129 status.clone(),
130 messages.clone(),
131 timeout,
132 );
133 Some(SwarmEvent::SessionEstablished {
134 peer_id,
135 remote_addr,
136 client_version,
137 capabilities,
138 version,
139 messages,
140 status,
141 direction,
142 })
143 }
144 SessionEvent::AlreadyConnected { peer_id, remote_addr, direction } => {
145 trace!(target: "net", ?peer_id, ?remote_addr, ?direction, "already connected");
146 self.state.peers_mut().on_already_connected(direction);
147 None
148 }
149 SessionEvent::ValidMessage { peer_id, message } => {
150 Some(SwarmEvent::ValidMessage { peer_id, message })
151 }
152 SessionEvent::IncomingPendingSessionClosed { remote_addr, error } => {
153 Some(SwarmEvent::IncomingPendingSessionClosed { remote_addr, error })
154 }
155 SessionEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
156 Some(SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error })
157 }
158 SessionEvent::Disconnected { peer_id, remote_addr } => {
159 self.state.on_session_closed(peer_id);
160 Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: None })
161 }
162 SessionEvent::SessionClosedOnConnectionError { peer_id, remote_addr, error } => {
163 self.state.on_session_closed(peer_id);
164 Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: Some(error) })
165 }
166 SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
167 Some(SwarmEvent::OutgoingConnectionError { peer_id, remote_addr, error })
168 }
169 SessionEvent::BadMessage { peer_id } => Some(SwarmEvent::BadMessage { peer_id }),
170 SessionEvent::ProtocolBreach { peer_id } => {
171 Some(SwarmEvent::ProtocolBreach { peer_id })
172 }
173 }
174 }
175
176 fn on_connection(&mut self, event: ListenerEvent) -> Option<SwarmEvent<N>> {
180 match event {
181 ListenerEvent::Error(err) => return Some(SwarmEvent::TcpListenerError(err)),
182 ListenerEvent::ListenerClosed { local_address: address } => {
183 return Some(SwarmEvent::TcpListenerClosed { remote_addr: address })
184 }
185 ListenerEvent::Incoming { stream, remote_addr } => {
186 if self.is_shutting_down() {
188 return None
189 }
190 if let Err(err) =
192 self.state_mut().peers_mut().on_incoming_pending_session(remote_addr.ip())
193 {
194 match err {
195 InboundConnectionError::IpBanned => {
196 trace!(target: "net", ?remote_addr, "The incoming ip address is in the ban list");
197 }
198 InboundConnectionError::ExceedsCapacity => {
199 trace!(target: "net", ?remote_addr, "No capacity for incoming connection");
200 self.sessions.try_disconnect_incoming_connection(
201 stream,
202 DisconnectReason::TooManyPeers,
203 );
204 }
205 }
206 return None
207 }
208
209 match self.sessions.on_incoming(stream, remote_addr) {
210 Ok(session_id) => {
211 trace!(target: "net", ?remote_addr, "Incoming connection");
212 return Some(SwarmEvent::IncomingTcpConnection { session_id, remote_addr })
213 }
214 Err(err) => {
215 trace!(target: "net", %err, "Incoming connection rejected, capacity already reached.");
216 self.state_mut()
217 .peers_mut()
218 .on_incoming_pending_session_rejected_internally();
219 }
220 }
221 }
222 }
223 None
224 }
225
226 fn on_state_action(&mut self, event: StateAction<N>) -> Option<SwarmEvent<N>> {
228 match event {
229 StateAction::Connect { remote_addr, peer_id } => {
230 self.dial_outbound(remote_addr, peer_id);
231 return Some(SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id })
232 }
233 StateAction::Disconnect { peer_id, reason } => {
234 self.sessions.disconnect(peer_id, reason);
235 }
236 StateAction::NewBlock { peer_id, block: msg } => {
237 let msg = PeerMessage::NewBlock(msg);
238 self.sessions.send_message(&peer_id, msg);
239 }
240 StateAction::NewBlockHashes { peer_id, hashes } => {
241 let msg = PeerMessage::NewBlockHashes(hashes);
242 self.sessions.send_message(&peer_id, msg);
243 }
244 StateAction::PeerAdded(peer_id) => return Some(SwarmEvent::PeerAdded(peer_id)),
245 StateAction::PeerRemoved(peer_id) => return Some(SwarmEvent::PeerRemoved(peer_id)),
246 StateAction::DiscoveredNode { peer_id, addr, fork_id } => {
247 if self.is_shutting_down() {
249 return None
250 }
251 if fork_id.map_or_else(|| true, |f| self.sessions.is_valid_fork_id(f)) {
253 self.state_mut().peers_mut().add_peer(peer_id, addr, fork_id);
254 }
255 }
256 StateAction::DiscoveredEnrForkId { peer_id, fork_id } => {
257 if self.sessions.is_valid_fork_id(fork_id) {
258 self.state_mut().peers_mut().set_discovered_fork_id(peer_id, fork_id);
259 } else {
260 self.state_mut().peers_mut().remove_peer(peer_id);
261 }
262 }
263 }
264 None
265 }
266
267 pub(crate) const fn on_shutdown_requested(&mut self) {
269 self.state_mut().peers_mut().on_shutdown();
270 }
271
272 #[inline]
274 pub(crate) const fn is_shutting_down(&self) -> bool {
275 self.state().peers().connection_state().is_shutting_down()
276 }
277
278 pub(crate) const fn on_network_state_change(&mut self, network_state: NetworkConnectionState) {
280 self.state_mut().peers_mut().on_network_state_change(network_state);
281 }
282}
283
284impl<N: NetworkPrimitives> Stream for Swarm<N> {
285 type Item = SwarmEvent<N>;
286
287 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
295 let this = self.get_mut();
296
297 loop {
301 while let Poll::Ready(action) = this.state.poll(cx) {
302 if let Some(event) = this.on_state_action(action) {
303 return Poll::Ready(Some(event))
304 }
305 }
306
307 match this.sessions.poll(cx) {
309 Poll::Pending => {}
310 Poll::Ready(event) => {
311 if let Some(event) = this.on_session_event(event) {
312 return Poll::Ready(Some(event))
313 }
314 continue
315 }
316 }
317
318 match Pin::new(&mut this.incoming).poll(cx) {
320 Poll::Pending => {}
321 Poll::Ready(event) => {
322 if let Some(event) = this.on_connection(event) {
323 return Poll::Ready(Some(event))
324 }
325 continue
326 }
327 }
328
329 return Poll::Pending
330 }
331 }
332}
333
334pub(crate) enum SwarmEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
337 ValidMessage {
339 peer_id: PeerId,
341 message: PeerMessage<N>,
343 },
344 BadMessage {
346 peer_id: PeerId,
348 },
349 ProtocolBreach {
351 peer_id: PeerId,
353 },
354 TcpListenerClosed {
356 remote_addr: SocketAddr,
358 },
359 TcpListenerError(io::Error),
361 IncomingTcpConnection {
366 session_id: SessionId,
368 remote_addr: SocketAddr,
370 },
371 OutgoingTcpConnection {
373 peer_id: PeerId,
375 remote_addr: SocketAddr,
376 },
377 SessionEstablished {
378 peer_id: PeerId,
379 remote_addr: SocketAddr,
380 client_version: Arc<str>,
381 capabilities: Arc<Capabilities>,
382 version: EthVersion,
384 messages: PeerRequestSender<PeerRequest<N>>,
385 status: Arc<Status>,
386 direction: Direction,
387 },
388 SessionClosed {
389 peer_id: PeerId,
390 remote_addr: SocketAddr,
391 error: Option<EthStreamError>,
393 },
394 PeerAdded(PeerId),
396 PeerRemoved(PeerId),
398 IncomingPendingSessionClosed {
400 remote_addr: SocketAddr,
401 error: Option<PendingSessionHandshakeError>,
402 },
403 OutgoingPendingSessionClosed {
405 remote_addr: SocketAddr,
406 peer_id: PeerId,
407 error: Option<PendingSessionHandshakeError>,
408 },
409 OutgoingConnectionError { remote_addr: SocketAddr, peer_id: PeerId, error: io::Error },
411}
412
413#[derive(Debug, Default)]
418pub enum NetworkConnectionState {
419 #[default]
421 Active,
422 ShuttingDown,
424 Hibernate,
426}
427
428impl NetworkConnectionState {
429 pub(crate) const fn is_active(&self) -> bool {
431 matches!(self, Self::Active)
432 }
433
434 pub(crate) const fn is_shutting_down(&self) -> bool {
436 matches!(self, Self::ShuttingDown)
437 }
438}