1use crate::{
2 listener::{ConnectionListener, ListenerEvent},
3 message::PeerMessage,
4 peers::{InboundConnectionError, PeersManager},
5 protocol::IntoRlpxSubProtocol,
6 session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager},
7 state::{NetworkState, StateAction},
8};
9use futures::Stream;
10use reth_eth_wire::{
11 errors::EthStreamError, Capabilities, DisconnectReason, EthNetworkPrimitives, EthVersion,
12 NetworkPrimitives, UnifiedStatus,
13};
14use reth_network_api::{PeerRequest, PeerRequestSender};
15use reth_network_peers::PeerId;
16use std::{
17 io,
18 net::SocketAddr,
19 pin::Pin,
20 sync::Arc,
21 task::{Context, Poll},
22};
23use tracing::trace;
24
25#[cfg_attr(doc, aquamarine::aquamarine)]
26#[derive(Debug)]
50#[must_use = "Swarm does nothing unless polled"]
51pub(crate) struct Swarm<N: NetworkPrimitives = EthNetworkPrimitives> {
52 incoming: ConnectionListener,
54 sessions: SessionManager<N>,
56 state: NetworkState<N>,
58}
59
60impl<N: NetworkPrimitives> Swarm<N> {
63 pub(crate) const fn new(
65 incoming: ConnectionListener,
66 sessions: SessionManager<N>,
67 state: NetworkState<N>,
68 ) -> Self {
69 Self { incoming, sessions, state }
70 }
71
72 pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
74 self.sessions_mut().add_rlpx_sub_protocol(protocol);
75 }
76
77 pub(crate) const fn state(&self) -> &NetworkState<N> {
79 &self.state
80 }
81
82 pub(crate) const fn state_mut(&mut self) -> &mut NetworkState<N> {
84 &mut self.state
85 }
86
87 pub(crate) const fn listener(&self) -> &ConnectionListener {
89 &self.incoming
90 }
91
92 pub(crate) const fn sessions(&self) -> &SessionManager<N> {
94 &self.sessions
95 }
96
97 pub(crate) const fn sessions_mut(&mut self) -> &mut SessionManager<N> {
99 &mut self.sessions
100 }
101
102 pub(crate) const fn peers(&self) -> &PeersManager {
104 self.state.peers()
105 }
106
107 pub(crate) const fn peers_mut(&mut self) -> &mut PeersManager {
109 self.state.peers_mut()
110 }
111}
112
113impl<N: NetworkPrimitives> Swarm<N> {
114 pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_id: PeerId) {
116 self.sessions.dial_outbound(remote_addr, remote_id)
117 }
118
119 fn on_session_event(&mut self, event: SessionEvent<N>) -> Option<SwarmEvent<N>> {
124 match event {
125 SessionEvent::SessionEstablished {
126 peer_id,
127 remote_addr,
128 client_version,
129 capabilities,
130 version,
131 status,
132 messages,
133 direction,
134 timeout,
135 range_info,
136 } => {
137 self.state.on_session_activated(
138 peer_id,
139 capabilities.clone(),
140 status.clone(),
141 messages.clone(),
142 timeout,
143 range_info,
144 );
145 Some(SwarmEvent::SessionEstablished {
146 peer_id,
147 remote_addr,
148 client_version,
149 capabilities,
150 version,
151 messages,
152 status,
153 direction,
154 })
155 }
156 SessionEvent::AlreadyConnected { peer_id, remote_addr, direction } => {
157 trace!(target: "net", ?peer_id, ?remote_addr, ?direction, "already connected");
158 self.state.peers_mut().on_already_connected(direction);
159 None
160 }
161 SessionEvent::ValidMessage { peer_id, message } => {
162 Some(SwarmEvent::ValidMessage { peer_id, message })
163 }
164 SessionEvent::IncomingPendingSessionClosed { remote_addr, error } => {
165 Some(SwarmEvent::IncomingPendingSessionClosed { remote_addr, error })
166 }
167 SessionEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
168 Some(SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error })
169 }
170 SessionEvent::Disconnected { peer_id, remote_addr } => {
171 self.state.on_session_closed(peer_id);
172 Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: None })
173 }
174 SessionEvent::SessionClosedOnConnectionError { peer_id, remote_addr, error } => {
175 self.state.on_session_closed(peer_id);
176 Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: Some(error) })
177 }
178 SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
179 Some(SwarmEvent::OutgoingConnectionError { peer_id, remote_addr, error })
180 }
181 SessionEvent::BadMessage { peer_id } => Some(SwarmEvent::BadMessage { peer_id }),
182 SessionEvent::ProtocolBreach { peer_id } => {
183 Some(SwarmEvent::ProtocolBreach { peer_id })
184 }
185 }
186 }
187
188 fn on_connection(&mut self, event: ListenerEvent) -> Option<SwarmEvent<N>> {
192 match event {
193 ListenerEvent::Error(err) => return Some(SwarmEvent::TcpListenerError(err)),
194 ListenerEvent::ListenerClosed { local_address: address } => {
195 return Some(SwarmEvent::TcpListenerClosed { remote_addr: address })
196 }
197 ListenerEvent::Incoming { stream, remote_addr } => {
198 if self.is_shutting_down() {
200 return None
201 }
202 if let Err(err) = self.peers_mut().on_incoming_pending_session(remote_addr.ip()) {
204 match err {
205 InboundConnectionError::IpBanned => {
206 trace!(target: "net", ?remote_addr, "The incoming ip address is in the ban list");
207 }
208 InboundConnectionError::ExceedsCapacity => {
209 trace!(target: "net", ?remote_addr, "No capacity for incoming connection");
210 self.sessions.try_disconnect_incoming_connection(
211 stream,
212 DisconnectReason::TooManyPeers,
213 );
214 }
215 }
216 return None
217 }
218
219 match self.sessions.on_incoming(stream, remote_addr) {
220 Ok(session_id) => {
221 trace!(target: "net", ?remote_addr, "Incoming connection");
222 return Some(SwarmEvent::IncomingTcpConnection { session_id, remote_addr })
223 }
224 Err(err) => {
225 trace!(target: "net", %err, "Incoming connection rejected, capacity already reached.");
226 self.state_mut()
227 .peers_mut()
228 .on_incoming_pending_session_rejected_internally();
229 }
230 }
231 }
232 }
233 None
234 }
235
236 fn on_state_action(&mut self, event: StateAction<N>) -> Option<SwarmEvent<N>> {
238 match event {
239 StateAction::Connect { remote_addr, peer_id } => {
240 self.dial_outbound(remote_addr, peer_id);
241 return Some(SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id })
242 }
243 StateAction::Disconnect { peer_id, reason } => {
244 self.sessions.disconnect(peer_id, reason);
245 }
246 StateAction::NewBlock { peer_id, block: msg } => {
247 let msg = PeerMessage::NewBlock(msg);
248 self.sessions.send_message(&peer_id, msg);
249 }
250 StateAction::NewBlockHashes { peer_id, hashes } => {
251 let msg = PeerMessage::NewBlockHashes(hashes);
252 self.sessions.send_message(&peer_id, msg);
253 }
254 StateAction::PeerAdded(peer_id) => return Some(SwarmEvent::PeerAdded(peer_id)),
255 StateAction::PeerRemoved(peer_id) => return Some(SwarmEvent::PeerRemoved(peer_id)),
256 StateAction::DiscoveredNode { peer_id, addr, fork_id } => {
257 if self.is_shutting_down() {
258 return None
259 }
260
261 let enforce = self.peers().enforce_enr_fork_id();
268 let allow = match fork_id {
269 Some(f) => self.sessions.is_valid_fork_id(f),
270 None => !enforce,
271 };
272 if allow {
273 self.peers_mut().add_peer(peer_id, addr, fork_id);
274 }
275 }
276 StateAction::DiscoveredEnrForkId { peer_id, addr, fork_id } => {
277 if self.sessions.is_valid_fork_id(fork_id) {
278 self.peers_mut().add_peer(peer_id, addr, Some(fork_id));
279 } else {
280 trace!(target: "net", ?peer_id, remote_fork_id=?fork_id, our_fork_id=?self.sessions.fork_id(), "fork id mismatch, removing peer");
281 self.peers_mut().remove_peer(peer_id);
282 }
283 }
284 }
285 None
286 }
287
288 pub(crate) const fn on_shutdown_requested(&mut self) {
290 self.peers_mut().on_shutdown();
291 }
292
293 #[inline]
295 pub(crate) const fn is_shutting_down(&self) -> bool {
296 self.peers().connection_state().is_shutting_down()
297 }
298
299 pub(crate) const fn on_network_state_change(&mut self, network_state: NetworkConnectionState) {
301 self.peers_mut().on_network_state_change(network_state);
302 }
303}
304
305impl<N: NetworkPrimitives> Stream for Swarm<N> {
306 type Item = SwarmEvent<N>;
307
308 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
316 let this = self.get_mut();
317
318 loop {
322 while let Poll::Ready(action) = this.state.poll(cx) {
323 if let Some(event) = this.on_state_action(action) {
324 return Poll::Ready(Some(event))
325 }
326 }
327
328 match this.sessions.poll(cx) {
330 Poll::Pending => {}
331 Poll::Ready(event) => {
332 if let Some(event) = this.on_session_event(event) {
333 return Poll::Ready(Some(event))
334 }
335 continue
336 }
337 }
338
339 match Pin::new(&mut this.incoming).poll(cx) {
341 Poll::Pending => {}
342 Poll::Ready(event) => {
343 if let Some(event) = this.on_connection(event) {
344 return Poll::Ready(Some(event))
345 }
346 continue
347 }
348 }
349
350 return Poll::Pending
351 }
352 }
353}
354
355pub(crate) enum SwarmEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
358 ValidMessage {
360 peer_id: PeerId,
362 message: PeerMessage<N>,
364 },
365 BadMessage {
367 peer_id: PeerId,
369 },
370 ProtocolBreach {
372 peer_id: PeerId,
374 },
375 TcpListenerClosed {
377 remote_addr: SocketAddr,
379 },
380 TcpListenerError(io::Error),
382 IncomingTcpConnection {
387 session_id: SessionId,
389 remote_addr: SocketAddr,
391 },
392 OutgoingTcpConnection {
394 peer_id: PeerId,
396 remote_addr: SocketAddr,
397 },
398 SessionEstablished {
399 peer_id: PeerId,
400 remote_addr: SocketAddr,
401 client_version: Arc<str>,
402 capabilities: Arc<Capabilities>,
403 version: EthVersion,
405 messages: PeerRequestSender<PeerRequest<N>>,
406 status: Arc<UnifiedStatus>,
407 direction: Direction,
408 },
409 SessionClosed {
410 peer_id: PeerId,
411 remote_addr: SocketAddr,
412 error: Option<EthStreamError>,
414 },
415 PeerAdded(PeerId),
417 PeerRemoved(PeerId),
419 IncomingPendingSessionClosed {
421 remote_addr: SocketAddr,
422 error: Option<PendingSessionHandshakeError>,
423 },
424 OutgoingPendingSessionClosed {
426 remote_addr: SocketAddr,
427 peer_id: PeerId,
428 error: Option<PendingSessionHandshakeError>,
429 },
430 OutgoingConnectionError { remote_addr: SocketAddr, peer_id: PeerId, error: io::Error },
432}
433
434#[derive(Debug, Default)]
439pub enum NetworkConnectionState {
440 #[default]
442 Active,
443 ShuttingDown,
445 Hibernate,
447}
448
449impl NetworkConnectionState {
450 pub(crate) const fn is_active(&self) -> bool {
452 matches!(self, Self::Active)
453 }
454
455 pub(crate) const fn is_shutting_down(&self) -> bool {
457 matches!(self, Self::ShuttingDown)
458 }
459}