1use crate::{
2 listener::{ConnectionListener, ListenerEvent},
3 message::PeerMessage,
4 peers::InboundConnectionError,
5 protocol::IntoRlpxSubProtocol,
6 session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager},
7 state::{NetworkState, StateAction},
8};
9use futures::Stream;
10use reth_eth_wire::{
11 errors::EthStreamError, Capabilities, DisconnectReason, EthNetworkPrimitives, EthVersion,
12 NetworkPrimitives, UnifiedStatus,
13};
14use reth_network_api::{PeerRequest, PeerRequestSender};
15use reth_network_peers::PeerId;
16use std::{
17 io,
18 net::SocketAddr,
19 pin::Pin,
20 sync::Arc,
21 task::{Context, Poll},
22};
23use tracing::{debug, trace};
24
25#[cfg_attr(doc, aquamarine::aquamarine)]
26#[derive(Debug)]
50#[must_use = "Swarm does nothing unless polled"]
51pub(crate) struct Swarm<N: NetworkPrimitives = EthNetworkPrimitives> {
52 incoming: ConnectionListener,
54 sessions: SessionManager<N>,
56 state: NetworkState<N>,
58}
59
60impl<N: NetworkPrimitives> Swarm<N> {
63 pub(crate) const fn new(
65 incoming: ConnectionListener,
66 sessions: SessionManager<N>,
67 state: NetworkState<N>,
68 ) -> Self {
69 Self { incoming, sessions, state }
70 }
71
72 pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
74 self.sessions_mut().add_rlpx_sub_protocol(protocol);
75 }
76
77 pub(crate) const fn state(&self) -> &NetworkState<N> {
79 &self.state
80 }
81
82 pub(crate) const fn state_mut(&mut self) -> &mut NetworkState<N> {
84 &mut self.state
85 }
86
87 pub(crate) const fn listener(&self) -> &ConnectionListener {
89 &self.incoming
90 }
91
92 pub(crate) const fn sessions(&self) -> &SessionManager<N> {
94 &self.sessions
95 }
96
97 pub(crate) const fn sessions_mut(&mut self) -> &mut SessionManager<N> {
99 &mut self.sessions
100 }
101}
102
103impl<N: NetworkPrimitives> Swarm<N> {
104 pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_id: PeerId) {
106 self.sessions.dial_outbound(remote_addr, remote_id)
107 }
108
109 fn on_session_event(&mut self, event: SessionEvent<N>) -> Option<SwarmEvent<N>> {
114 match event {
115 SessionEvent::SessionEstablished {
116 peer_id,
117 remote_addr,
118 client_version,
119 capabilities,
120 version,
121 status,
122 messages,
123 direction,
124 timeout,
125 range_info,
126 } => {
127 self.state.on_session_activated(
128 peer_id,
129 capabilities.clone(),
130 status.clone(),
131 messages.clone(),
132 timeout,
133 range_info,
134 );
135 Some(SwarmEvent::SessionEstablished {
136 peer_id,
137 remote_addr,
138 client_version,
139 capabilities,
140 version,
141 messages,
142 status,
143 direction,
144 })
145 }
146 SessionEvent::AlreadyConnected { peer_id, remote_addr, direction } => {
147 trace!(target: "net", ?peer_id, ?remote_addr, ?direction, "already connected");
148 self.state.peers_mut().on_already_connected(direction);
149 None
150 }
151 SessionEvent::ValidMessage { peer_id, message } => {
152 Some(SwarmEvent::ValidMessage { peer_id, message })
153 }
154 SessionEvent::IncomingPendingSessionClosed { remote_addr, error } => {
155 Some(SwarmEvent::IncomingPendingSessionClosed { remote_addr, error })
156 }
157 SessionEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
158 Some(SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error })
159 }
160 SessionEvent::Disconnected { peer_id, remote_addr } => {
161 self.state.on_session_closed(peer_id);
162 Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: None })
163 }
164 SessionEvent::SessionClosedOnConnectionError { peer_id, remote_addr, error } => {
165 self.state.on_session_closed(peer_id);
166 Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: Some(error) })
167 }
168 SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
169 Some(SwarmEvent::OutgoingConnectionError { peer_id, remote_addr, error })
170 }
171 SessionEvent::BadMessage { peer_id } => Some(SwarmEvent::BadMessage { peer_id }),
172 SessionEvent::ProtocolBreach { peer_id } => {
173 Some(SwarmEvent::ProtocolBreach { peer_id })
174 }
175 }
176 }
177
178 fn on_connection(&mut self, event: ListenerEvent) -> Option<SwarmEvent<N>> {
182 match event {
183 ListenerEvent::Error(err) => return Some(SwarmEvent::TcpListenerError(err)),
184 ListenerEvent::ListenerClosed { local_address: address } => {
185 return Some(SwarmEvent::TcpListenerClosed { remote_addr: address })
186 }
187 ListenerEvent::Incoming { stream, remote_addr } => {
188 if self.is_shutting_down() {
190 return None
191 }
192 if let Err(err) =
194 self.state_mut().peers_mut().on_incoming_pending_session(remote_addr.ip())
195 {
196 match err {
197 InboundConnectionError::IpBanned => {
198 trace!(target: "net", ?remote_addr, "The incoming ip address is in the ban list");
199 }
200 InboundConnectionError::ExceedsCapacity => {
201 trace!(target: "net", ?remote_addr, "No capacity for incoming connection");
202 self.sessions.try_disconnect_incoming_connection(
203 stream,
204 DisconnectReason::TooManyPeers,
205 );
206 }
207 }
208 return None
209 }
210
211 match self.sessions.on_incoming(stream, remote_addr) {
212 Ok(session_id) => {
213 trace!(target: "net", ?remote_addr, "Incoming connection");
214 return Some(SwarmEvent::IncomingTcpConnection { session_id, remote_addr })
215 }
216 Err(err) => {
217 trace!(target: "net", %err, "Incoming connection rejected, capacity already reached.");
218 self.state_mut()
219 .peers_mut()
220 .on_incoming_pending_session_rejected_internally();
221 }
222 }
223 }
224 }
225 None
226 }
227
228 fn on_state_action(&mut self, event: StateAction<N>) -> Option<SwarmEvent<N>> {
230 match event {
231 StateAction::Connect { remote_addr, peer_id } => {
232 self.dial_outbound(remote_addr, peer_id);
233 return Some(SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id })
234 }
235 StateAction::Disconnect { peer_id, reason } => {
236 self.sessions.disconnect(peer_id, reason);
237 }
238 StateAction::NewBlock { peer_id, block: msg } => {
239 let msg = PeerMessage::NewBlock(msg);
240 self.sessions.send_message(&peer_id, msg);
241 }
242 StateAction::NewBlockHashes { peer_id, hashes } => {
243 let msg = PeerMessage::NewBlockHashes(hashes);
244 self.sessions.send_message(&peer_id, msg);
245 }
246 StateAction::PeerAdded(peer_id) => return Some(SwarmEvent::PeerAdded(peer_id)),
247 StateAction::PeerRemoved(peer_id) => return Some(SwarmEvent::PeerRemoved(peer_id)),
248 StateAction::DiscoveredNode { peer_id, addr, fork_id } => {
249 if self.is_shutting_down() {
251 return None
252 }
253 if fork_id.map_or_else(|| true, |f| self.sessions.is_valid_fork_id(f)) {
255 self.state_mut().peers_mut().add_peer(peer_id, addr, fork_id);
256 }
257 }
258 StateAction::DiscoveredEnrForkId { peer_id, fork_id } => {
259 if self.sessions.is_valid_fork_id(fork_id) {
260 self.state_mut().peers_mut().set_discovered_fork_id(peer_id, fork_id);
261 } else {
262 debug!(target: "net", ?peer_id, remote_fork_id=?fork_id, our_fork_id=?self.sessions.fork_id(), "fork id mismatch, removing peer");
263 self.state_mut().peers_mut().remove_peer(peer_id);
264 }
265 }
266 }
267 None
268 }
269
270 pub(crate) const fn on_shutdown_requested(&mut self) {
272 self.state_mut().peers_mut().on_shutdown();
273 }
274
275 #[inline]
277 pub(crate) const fn is_shutting_down(&self) -> bool {
278 self.state().peers().connection_state().is_shutting_down()
279 }
280
281 pub(crate) const fn on_network_state_change(&mut self, network_state: NetworkConnectionState) {
283 self.state_mut().peers_mut().on_network_state_change(network_state);
284 }
285}
286
287impl<N: NetworkPrimitives> Stream for Swarm<N> {
288 type Item = SwarmEvent<N>;
289
290 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
298 let this = self.get_mut();
299
300 loop {
304 while let Poll::Ready(action) = this.state.poll(cx) {
305 if let Some(event) = this.on_state_action(action) {
306 return Poll::Ready(Some(event))
307 }
308 }
309
310 match this.sessions.poll(cx) {
312 Poll::Pending => {}
313 Poll::Ready(event) => {
314 if let Some(event) = this.on_session_event(event) {
315 return Poll::Ready(Some(event))
316 }
317 continue
318 }
319 }
320
321 match Pin::new(&mut this.incoming).poll(cx) {
323 Poll::Pending => {}
324 Poll::Ready(event) => {
325 if let Some(event) = this.on_connection(event) {
326 return Poll::Ready(Some(event))
327 }
328 continue
329 }
330 }
331
332 return Poll::Pending
333 }
334 }
335}
336
337pub(crate) enum SwarmEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
340 ValidMessage {
342 peer_id: PeerId,
344 message: PeerMessage<N>,
346 },
347 BadMessage {
349 peer_id: PeerId,
351 },
352 ProtocolBreach {
354 peer_id: PeerId,
356 },
357 TcpListenerClosed {
359 remote_addr: SocketAddr,
361 },
362 TcpListenerError(io::Error),
364 IncomingTcpConnection {
369 session_id: SessionId,
371 remote_addr: SocketAddr,
373 },
374 OutgoingTcpConnection {
376 peer_id: PeerId,
378 remote_addr: SocketAddr,
379 },
380 SessionEstablished {
381 peer_id: PeerId,
382 remote_addr: SocketAddr,
383 client_version: Arc<str>,
384 capabilities: Arc<Capabilities>,
385 version: EthVersion,
387 messages: PeerRequestSender<PeerRequest<N>>,
388 status: Arc<UnifiedStatus>,
389 direction: Direction,
390 },
391 SessionClosed {
392 peer_id: PeerId,
393 remote_addr: SocketAddr,
394 error: Option<EthStreamError>,
396 },
397 PeerAdded(PeerId),
399 PeerRemoved(PeerId),
401 IncomingPendingSessionClosed {
403 remote_addr: SocketAddr,
404 error: Option<PendingSessionHandshakeError>,
405 },
406 OutgoingPendingSessionClosed {
408 remote_addr: SocketAddr,
409 peer_id: PeerId,
410 error: Option<PendingSessionHandshakeError>,
411 },
412 OutgoingConnectionError { remote_addr: SocketAddr, peer_id: PeerId, error: io::Error },
414}
415
416#[derive(Debug, Default)]
421pub enum NetworkConnectionState {
422 #[default]
424 Active,
425 ShuttingDown,
427 Hibernate,
429}
430
431impl NetworkConnectionState {
432 pub(crate) const fn is_active(&self) -> bool {
434 matches!(self, Self::Active)
435 }
436
437 pub(crate) const fn is_shutting_down(&self) -> bool {
439 matches!(self, Self::ShuttingDown)
440 }
441}