Skip to main content

reth_network/
swarm.rs

1use crate::{
2    listener::{ConnectionListener, ListenerEvent},
3    message::PeerMessage,
4    peers::{InboundConnectionError, PeersManager},
5    protocol::IntoRlpxSubProtocol,
6    session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager},
7    state::{NetworkState, StateAction},
8};
9use futures::Stream;
10use reth_eth_wire::{
11    errors::EthStreamError, Capabilities, DisconnectReason, EthNetworkPrimitives, EthVersion,
12    NetworkPrimitives, UnifiedStatus,
13};
14use reth_network_api::{PeerRequest, PeerRequestSender};
15use reth_network_peers::PeerId;
16use std::{
17    io,
18    net::SocketAddr,
19    pin::Pin,
20    sync::Arc,
21    task::{Context, Poll},
22};
23use tracing::trace;
24
25#[cfg_attr(doc, aquamarine::aquamarine)]
26/// Contains the connectivity related state of the network.
27///
28/// A swarm emits [`SwarmEvent`]s when polled.
29///
30/// It manages the [`ConnectionListener`] and delegates new incoming connections to the
31/// [`SessionManager`]. Outgoing connections are either initiated on demand or triggered by the
32/// [`NetworkState`] and also delegated to the [`NetworkState`].
33///
34/// Following diagram displays the dataflow contained in the [`Swarm`]
35///
36/// The [`ConnectionListener`] yields incoming [`TcpStream`]s from peers that are spawned as session
37/// tasks. After a successful `RLPx` authentication, the task is ready to accept ETH requests or
38/// broadcast messages. A task listens for messages from the [`SessionManager`] which include
39/// broadcast messages like `Transactions` or internal commands, for example to disconnect the
40/// session.
41///
42/// The [`NetworkState`] keeps track of all connected and discovered peers and can initiate outgoing
43/// connections. For each active session, the [`NetworkState`] keeps a sender half of the ETH
44/// request channel for the created session and sends requests it receives from the
45/// [`StateFetcher`], which receives request objects from the client interfaces responsible for
46/// downloading headers and bodies.
47///
48/// `include_mmd!("docs/mermaid/swarm.mmd`")
49#[derive(Debug)]
50#[must_use = "Swarm does nothing unless polled"]
51pub(crate) struct Swarm<N: NetworkPrimitives = EthNetworkPrimitives> {
52    /// Listens for new incoming connections.
53    incoming: ConnectionListener,
54    /// All sessions.
55    sessions: SessionManager<N>,
56    /// Tracks the entire state of the network and handles events received from the sessions.
57    state: NetworkState<N>,
58}
59
60// === impl Swarm ===
61
62impl<N: NetworkPrimitives> Swarm<N> {
63    /// Configures a new swarm instance.
64    pub(crate) const fn new(
65        incoming: ConnectionListener,
66        sessions: SessionManager<N>,
67        state: NetworkState<N>,
68    ) -> Self {
69        Self { incoming, sessions, state }
70    }
71
72    /// Adds a protocol handler to the `RLPx` sub-protocol list.
73    pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
74        self.sessions_mut().add_rlpx_sub_protocol(protocol);
75    }
76
77    /// Access to the state.
78    pub(crate) const fn state(&self) -> &NetworkState<N> {
79        &self.state
80    }
81
82    /// Mutable access to the state.
83    pub(crate) const fn state_mut(&mut self) -> &mut NetworkState<N> {
84        &mut self.state
85    }
86
87    /// Access to the [`ConnectionListener`].
88    pub(crate) const fn listener(&self) -> &ConnectionListener {
89        &self.incoming
90    }
91
92    /// Access to the [`SessionManager`].
93    pub(crate) const fn sessions(&self) -> &SessionManager<N> {
94        &self.sessions
95    }
96
97    /// Mutable access to the [`SessionManager`].
98    pub(crate) const fn sessions_mut(&mut self) -> &mut SessionManager<N> {
99        &mut self.sessions
100    }
101
102    /// Access to the [`PeersManager`].
103    pub(crate) const fn peers(&self) -> &PeersManager {
104        self.state.peers()
105    }
106
107    /// Mutable access to the [`PeersManager`].
108    pub(crate) const fn peers_mut(&mut self) -> &mut PeersManager {
109        self.state.peers_mut()
110    }
111}
112
113impl<N: NetworkPrimitives> Swarm<N> {
114    /// Triggers a new outgoing connection to the given node
115    pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_id: PeerId) {
116        self.sessions.dial_outbound(remote_addr, remote_id)
117    }
118
119    /// Handles a polled [`SessionEvent`]
120    ///
121    /// This either updates the state or produces a new [`SwarmEvent`] that is bubbled up to the
122    /// manager.
123    fn on_session_event(&mut self, event: SessionEvent<N>) -> Option<SwarmEvent<N>> {
124        match event {
125            SessionEvent::SessionEstablished {
126                peer_id,
127                remote_addr,
128                client_version,
129                capabilities,
130                version,
131                status,
132                messages,
133                direction,
134                timeout,
135                range_info,
136            } => {
137                self.state.on_session_activated(
138                    peer_id,
139                    capabilities.clone(),
140                    status.clone(),
141                    messages.clone(),
142                    timeout,
143                    range_info,
144                );
145                Some(SwarmEvent::SessionEstablished {
146                    peer_id,
147                    remote_addr,
148                    client_version,
149                    capabilities,
150                    version,
151                    messages,
152                    status,
153                    direction,
154                })
155            }
156            SessionEvent::AlreadyConnected { peer_id, remote_addr, direction } => {
157                trace!(target: "net", ?peer_id, ?remote_addr, ?direction, "already connected");
158                self.state.peers_mut().on_already_connected(direction);
159                None
160            }
161            SessionEvent::ValidMessage { peer_id, message } => {
162                Some(SwarmEvent::ValidMessage { peer_id, message })
163            }
164            SessionEvent::IncomingPendingSessionClosed { remote_addr, error } => {
165                Some(SwarmEvent::IncomingPendingSessionClosed { remote_addr, error })
166            }
167            SessionEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
168                Some(SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error })
169            }
170            SessionEvent::Disconnected { peer_id, remote_addr } => {
171                self.state.on_session_closed(peer_id);
172                Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: None })
173            }
174            SessionEvent::SessionClosedOnConnectionError { peer_id, remote_addr, error } => {
175                self.state.on_session_closed(peer_id);
176                Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: Some(error) })
177            }
178            SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
179                Some(SwarmEvent::OutgoingConnectionError { peer_id, remote_addr, error })
180            }
181            SessionEvent::BadMessage { peer_id } => Some(SwarmEvent::BadMessage { peer_id }),
182            SessionEvent::ProtocolBreach { peer_id } => {
183                Some(SwarmEvent::ProtocolBreach { peer_id })
184            }
185        }
186    }
187
188    /// Callback for events produced by [`ConnectionListener`].
189    ///
190    /// Depending on the event, this will produce a new [`SwarmEvent`].
191    fn on_connection(&mut self, event: ListenerEvent) -> Option<SwarmEvent<N>> {
192        match event {
193            ListenerEvent::Error(err) => return Some(SwarmEvent::TcpListenerError(err)),
194            ListenerEvent::ListenerClosed { local_address: address } => {
195                return Some(SwarmEvent::TcpListenerClosed { remote_addr: address })
196            }
197            ListenerEvent::Incoming { stream, remote_addr } => {
198                // Reject incoming connection if node is shutting down.
199                if self.is_shutting_down() {
200                    return None
201                }
202                // ensure we can handle an incoming connection from this address
203                if let Err(err) = self.peers_mut().on_incoming_pending_session(remote_addr.ip()) {
204                    match err {
205                        InboundConnectionError::IpBanned => {
206                            trace!(target: "net", ?remote_addr, "The incoming ip address is in the ban list");
207                        }
208                        InboundConnectionError::ExceedsCapacity => {
209                            trace!(target: "net", ?remote_addr, "No capacity for incoming connection");
210                            self.sessions.try_disconnect_incoming_connection(
211                                stream,
212                                DisconnectReason::TooManyPeers,
213                            );
214                        }
215                    }
216                    return None
217                }
218
219                match self.sessions.on_incoming(stream, remote_addr) {
220                    Ok(session_id) => {
221                        trace!(target: "net", ?remote_addr, "Incoming connection");
222                        return Some(SwarmEvent::IncomingTcpConnection { session_id, remote_addr })
223                    }
224                    Err(err) => {
225                        trace!(target: "net", %err, "Incoming connection rejected, capacity already reached.");
226                        self.state_mut()
227                            .peers_mut()
228                            .on_incoming_pending_session_rejected_internally();
229                    }
230                }
231            }
232        }
233        None
234    }
235
236    /// Hook for actions pulled from the state
237    fn on_state_action(&mut self, event: StateAction<N>) -> Option<SwarmEvent<N>> {
238        match event {
239            StateAction::Connect { remote_addr, peer_id } => {
240                self.dial_outbound(remote_addr, peer_id);
241                return Some(SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id })
242            }
243            StateAction::Disconnect { peer_id, reason } => {
244                self.sessions.disconnect(peer_id, reason);
245            }
246            StateAction::NewBlock { peer_id, block: msg } => {
247                let msg = PeerMessage::NewBlock(msg);
248                self.sessions.send_message(&peer_id, msg);
249            }
250            StateAction::NewBlockHashes { peer_id, hashes } => {
251                let msg = PeerMessage::NewBlockHashes(hashes);
252                self.sessions.send_message(&peer_id, msg);
253            }
254            StateAction::PeerAdded(peer_id) => return Some(SwarmEvent::PeerAdded(peer_id)),
255            StateAction::PeerRemoved(peer_id) => return Some(SwarmEvent::PeerRemoved(peer_id)),
256            StateAction::DiscoveredNode { peer_id, addr, fork_id } => {
257                if self.is_shutting_down() {
258                    return None
259                }
260
261                // When `enforce_enr_fork_id` is enabled, peers discovered without a confirmed
262                // fork ID (via EIP-868 ENR) are deferred — they'll only be added once a
263                // `DiscoveredEnrForkId` event arrives with a validated fork ID.
264                //
265                // When disabled (default), peers without a fork ID are admitted immediately.
266                // Peers that *do* carry a fork ID are always validated against ours.
267                let enforce = self.peers().enforce_enr_fork_id();
268                let allow = match fork_id {
269                    Some(f) => self.sessions.is_valid_fork_id(f),
270                    None => !enforce,
271                };
272                if allow {
273                    self.peers_mut().add_peer(peer_id, addr, fork_id);
274                }
275            }
276            StateAction::DiscoveredEnrForkId { peer_id, addr, fork_id } => {
277                if self.sessions.is_valid_fork_id(fork_id) {
278                    self.peers_mut().add_peer(peer_id, addr, Some(fork_id));
279                } else {
280                    trace!(target: "net", ?peer_id, remote_fork_id=?fork_id, our_fork_id=?self.sessions.fork_id(), "fork id mismatch, removing peer");
281                    self.peers_mut().remove_peer(peer_id);
282                }
283            }
284        }
285        None
286    }
287
288    /// Set network connection state to `ShuttingDown`
289    pub(crate) const fn on_shutdown_requested(&mut self) {
290        self.peers_mut().on_shutdown();
291    }
292
293    /// Checks if the node's network connection state is '`ShuttingDown`'
294    #[inline]
295    pub(crate) const fn is_shutting_down(&self) -> bool {
296        self.peers().connection_state().is_shutting_down()
297    }
298
299    /// Set network connection state to `Hibernate` or `Active`
300    pub(crate) const fn on_network_state_change(&mut self, network_state: NetworkConnectionState) {
301        self.peers_mut().on_network_state_change(network_state);
302    }
303}
304
305impl<N: NetworkPrimitives> Stream for Swarm<N> {
306    type Item = SwarmEvent<N>;
307
308    /// This advances all components.
309    ///
310    /// Processes, delegates (internal) commands received from the
311    /// [`NetworkManager`](crate::NetworkManager), then polls the [`SessionManager`] which
312    /// yields messages produced by individual peer sessions that are then handled. Least
313    /// priority are incoming connections that are handled and delegated to
314    /// the [`SessionManager`] to turn them into a session.
315    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
316        let this = self.get_mut();
317
318        // This loop advances the network's state prioritizing local work [NetworkState] over work
319        // coming in from the network [SessionManager], [ConnectionListener]
320        // Existing connections are prioritized over new __incoming__ connections
321        loop {
322            while let Poll::Ready(action) = this.state.poll(cx) {
323                if let Some(event) = this.on_state_action(action) {
324                    return Poll::Ready(Some(event))
325                }
326            }
327
328            // poll all sessions
329            match this.sessions.poll(cx) {
330                Poll::Pending => {}
331                Poll::Ready(event) => {
332                    if let Some(event) = this.on_session_event(event) {
333                        return Poll::Ready(Some(event))
334                    }
335                    continue
336                }
337            }
338
339            // poll listener for incoming connections
340            match Pin::new(&mut this.incoming).poll(cx) {
341                Poll::Pending => {}
342                Poll::Ready(event) => {
343                    if let Some(event) = this.on_connection(event) {
344                        return Poll::Ready(Some(event))
345                    }
346                    continue
347                }
348            }
349
350            return Poll::Pending
351        }
352    }
353}
354
355/// All events created or delegated by the [`Swarm`] that represents changes to the state of the
356/// network.
357pub(crate) enum SwarmEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
358    /// Events related to the actual network protocol.
359    ValidMessage {
360        /// The peer that sent the message
361        peer_id: PeerId,
362        /// Message received from the peer
363        message: PeerMessage<N>,
364    },
365    /// Received a bad message from the peer.
366    BadMessage {
367        /// Identifier of the remote peer.
368        peer_id: PeerId,
369    },
370    /// Remote peer is considered in protocol violation
371    ProtocolBreach {
372        /// Identifier of the remote peer.
373        peer_id: PeerId,
374    },
375    /// The underlying tcp listener closed.
376    TcpListenerClosed {
377        /// Address of the closed listener.
378        remote_addr: SocketAddr,
379    },
380    /// The underlying tcp listener encountered an error that we bubble up.
381    TcpListenerError(io::Error),
382    /// Received an incoming tcp connection.
383    ///
384    /// This represents the first step in the session authentication process. The swarm will
385    /// produce subsequent events once the stream has been authenticated, or was rejected.
386    IncomingTcpConnection {
387        /// The internal session identifier under which this connection is currently tracked.
388        session_id: SessionId,
389        /// Address of the remote peer.
390        remote_addr: SocketAddr,
391    },
392    /// An outbound connection is initiated.
393    OutgoingTcpConnection {
394        /// Address of the remote peer.
395        peer_id: PeerId,
396        remote_addr: SocketAddr,
397    },
398    SessionEstablished {
399        peer_id: PeerId,
400        remote_addr: SocketAddr,
401        client_version: Arc<str>,
402        capabilities: Arc<Capabilities>,
403        /// negotiated eth version
404        version: EthVersion,
405        messages: PeerRequestSender<PeerRequest<N>>,
406        status: Arc<UnifiedStatus>,
407        direction: Direction,
408    },
409    SessionClosed {
410        peer_id: PeerId,
411        remote_addr: SocketAddr,
412        /// Whether the session was closed due to an error
413        error: Option<EthStreamError>,
414    },
415    /// Admin rpc: new peer added
416    PeerAdded(PeerId),
417    /// Admin rpc: peer removed
418    PeerRemoved(PeerId),
419    /// Closed an incoming pending session during authentication.
420    IncomingPendingSessionClosed {
421        remote_addr: SocketAddr,
422        error: Option<PendingSessionHandshakeError>,
423    },
424    /// Closed an outgoing pending session during authentication.
425    OutgoingPendingSessionClosed {
426        remote_addr: SocketAddr,
427        peer_id: PeerId,
428        error: Option<PendingSessionHandshakeError>,
429    },
430    /// Failed to establish a tcp stream to the given address/node
431    OutgoingConnectionError { remote_addr: SocketAddr, peer_id: PeerId, error: io::Error },
432}
433
434/// Represents the state of the connection of the node. If shutting down,
435/// new connections won't be established.
436/// When in hibernation mode, the node will not initiate new outbound connections. This is
437/// beneficial for sync stages that do not require a network connection.
438#[derive(Debug, Default)]
439pub enum NetworkConnectionState {
440    /// Node is active, new outbound connections will be established.
441    #[default]
442    Active,
443    /// Node is shutting down, no new outbound connections will be established.
444    ShuttingDown,
445    /// Hibernate Network connection, no new outbound connections will be established.
446    Hibernate,
447}
448
449impl NetworkConnectionState {
450    /// Returns true if the node is active.
451    pub(crate) const fn is_active(&self) -> bool {
452        matches!(self, Self::Active)
453    }
454
455    /// Returns true if the node is shutting down.
456    pub(crate) const fn is_shutting_down(&self) -> bool {
457        matches!(self, Self::ShuttingDown)
458    }
459}