reth_network/
swarm.rs

1use crate::{
2    listener::{ConnectionListener, ListenerEvent},
3    message::PeerMessage,
4    peers::InboundConnectionError,
5    protocol::IntoRlpxSubProtocol,
6    session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager},
7    state::{NetworkState, StateAction},
8};
9use futures::Stream;
10use reth_eth_wire::{
11    errors::EthStreamError, Capabilities, DisconnectReason, EthNetworkPrimitives, EthVersion,
12    NetworkPrimitives, Status,
13};
14use reth_network_api::{PeerRequest, PeerRequestSender};
15use reth_network_peers::PeerId;
16use std::{
17    io,
18    net::SocketAddr,
19    pin::Pin,
20    sync::Arc,
21    task::{Context, Poll},
22};
23use tracing::trace;
24
25#[cfg_attr(doc, aquamarine::aquamarine)]
26/// Contains the connectivity related state of the network.
27///
28/// A swarm emits [`SwarmEvent`]s when polled.
29///
30/// It manages the [`ConnectionListener`] and delegates new incoming connections to the
31/// [`SessionManager`]. Outgoing connections are either initiated on demand or triggered by the
32/// [`NetworkState`] and also delegated to the [`NetworkState`].
33///
34/// Following diagram displays the dataflow contained in the [`Swarm`]
35///
36/// The [`ConnectionListener`] yields incoming [`TcpStream`]s from peers that are spawned as session
37/// tasks. After a successful `RLPx` authentication, the task is ready to accept ETH requests or
38/// broadcast messages. A task listens for messages from the [`SessionManager`] which include
39/// broadcast messages like `Transactions` or internal commands, for example to disconnect the
40/// session.
41///
42/// The [`NetworkState`] keeps track of all connected and discovered peers and can initiate outgoing
43/// connections. For each active session, the [`NetworkState`] keeps a sender half of the ETH
44/// request channel for the created session and sends requests it receives from the
45/// [`StateFetcher`], which receives request objects from the client interfaces responsible for
46/// downloading headers and bodies.
47///
48/// `include_mmd!("docs/mermaid/swarm.mmd`")
49#[derive(Debug)]
50#[must_use = "Swarm does nothing unless polled"]
51pub(crate) struct Swarm<N: NetworkPrimitives = EthNetworkPrimitives> {
52    /// Listens for new incoming connections.
53    incoming: ConnectionListener,
54    /// All sessions.
55    sessions: SessionManager<N>,
56    /// Tracks the entire state of the network and handles events received from the sessions.
57    state: NetworkState<N>,
58}
59
60// === impl Swarm ===
61
62impl<N: NetworkPrimitives> Swarm<N> {
63    /// Configures a new swarm instance.
64    pub(crate) const fn new(
65        incoming: ConnectionListener,
66        sessions: SessionManager<N>,
67        state: NetworkState<N>,
68    ) -> Self {
69        Self { incoming, sessions, state }
70    }
71
72    /// Adds a protocol handler to the `RLPx` sub-protocol list.
73    pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
74        self.sessions_mut().add_rlpx_sub_protocol(protocol);
75    }
76
77    /// Access to the state.
78    pub(crate) const fn state(&self) -> &NetworkState<N> {
79        &self.state
80    }
81
82    /// Mutable access to the state.
83    pub(crate) const fn state_mut(&mut self) -> &mut NetworkState<N> {
84        &mut self.state
85    }
86
87    /// Access to the [`ConnectionListener`].
88    pub(crate) const fn listener(&self) -> &ConnectionListener {
89        &self.incoming
90    }
91
92    /// Access to the [`SessionManager`].
93    pub(crate) const fn sessions(&self) -> &SessionManager<N> {
94        &self.sessions
95    }
96
97    /// Mutable access to the [`SessionManager`].
98    pub(crate) const fn sessions_mut(&mut self) -> &mut SessionManager<N> {
99        &mut self.sessions
100    }
101}
102
103impl<N: NetworkPrimitives> Swarm<N> {
104    /// Triggers a new outgoing connection to the given node
105    pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_id: PeerId) {
106        self.sessions.dial_outbound(remote_addr, remote_id)
107    }
108
109    /// Handles a polled [`SessionEvent`]
110    ///
111    /// This either updates the state or produces a new [`SwarmEvent`] that is bubbled up to the
112    /// manager.
113    fn on_session_event(&mut self, event: SessionEvent<N>) -> Option<SwarmEvent<N>> {
114        match event {
115            SessionEvent::SessionEstablished {
116                peer_id,
117                remote_addr,
118                client_version,
119                capabilities,
120                version,
121                status,
122                messages,
123                direction,
124                timeout,
125            } => {
126                self.state.on_session_activated(
127                    peer_id,
128                    capabilities.clone(),
129                    status.clone(),
130                    messages.clone(),
131                    timeout,
132                );
133                Some(SwarmEvent::SessionEstablished {
134                    peer_id,
135                    remote_addr,
136                    client_version,
137                    capabilities,
138                    version,
139                    messages,
140                    status,
141                    direction,
142                })
143            }
144            SessionEvent::AlreadyConnected { peer_id, remote_addr, direction } => {
145                trace!(target: "net", ?peer_id, ?remote_addr, ?direction, "already connected");
146                self.state.peers_mut().on_already_connected(direction);
147                None
148            }
149            SessionEvent::ValidMessage { peer_id, message } => {
150                Some(SwarmEvent::ValidMessage { peer_id, message })
151            }
152            SessionEvent::IncomingPendingSessionClosed { remote_addr, error } => {
153                Some(SwarmEvent::IncomingPendingSessionClosed { remote_addr, error })
154            }
155            SessionEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
156                Some(SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error })
157            }
158            SessionEvent::Disconnected { peer_id, remote_addr } => {
159                self.state.on_session_closed(peer_id);
160                Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: None })
161            }
162            SessionEvent::SessionClosedOnConnectionError { peer_id, remote_addr, error } => {
163                self.state.on_session_closed(peer_id);
164                Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: Some(error) })
165            }
166            SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
167                Some(SwarmEvent::OutgoingConnectionError { peer_id, remote_addr, error })
168            }
169            SessionEvent::BadMessage { peer_id } => Some(SwarmEvent::BadMessage { peer_id }),
170            SessionEvent::ProtocolBreach { peer_id } => {
171                Some(SwarmEvent::ProtocolBreach { peer_id })
172            }
173        }
174    }
175
176    /// Callback for events produced by [`ConnectionListener`].
177    ///
178    /// Depending on the event, this will produce a new [`SwarmEvent`].
179    fn on_connection(&mut self, event: ListenerEvent) -> Option<SwarmEvent<N>> {
180        match event {
181            ListenerEvent::Error(err) => return Some(SwarmEvent::TcpListenerError(err)),
182            ListenerEvent::ListenerClosed { local_address: address } => {
183                return Some(SwarmEvent::TcpListenerClosed { remote_addr: address })
184            }
185            ListenerEvent::Incoming { stream, remote_addr } => {
186                // Reject incoming connection if node is shutting down.
187                if self.is_shutting_down() {
188                    return None
189                }
190                // ensure we can handle an incoming connection from this address
191                if let Err(err) =
192                    self.state_mut().peers_mut().on_incoming_pending_session(remote_addr.ip())
193                {
194                    match err {
195                        InboundConnectionError::IpBanned => {
196                            trace!(target: "net", ?remote_addr, "The incoming ip address is in the ban list");
197                        }
198                        InboundConnectionError::ExceedsCapacity => {
199                            trace!(target: "net", ?remote_addr, "No capacity for incoming connection");
200                            self.sessions.try_disconnect_incoming_connection(
201                                stream,
202                                DisconnectReason::TooManyPeers,
203                            );
204                        }
205                    }
206                    return None
207                }
208
209                match self.sessions.on_incoming(stream, remote_addr) {
210                    Ok(session_id) => {
211                        trace!(target: "net", ?remote_addr, "Incoming connection");
212                        return Some(SwarmEvent::IncomingTcpConnection { session_id, remote_addr })
213                    }
214                    Err(err) => {
215                        trace!(target: "net", %err, "Incoming connection rejected, capacity already reached.");
216                        self.state_mut()
217                            .peers_mut()
218                            .on_incoming_pending_session_rejected_internally();
219                    }
220                }
221            }
222        }
223        None
224    }
225
226    /// Hook for actions pulled from the state
227    fn on_state_action(&mut self, event: StateAction<N>) -> Option<SwarmEvent<N>> {
228        match event {
229            StateAction::Connect { remote_addr, peer_id } => {
230                self.dial_outbound(remote_addr, peer_id);
231                return Some(SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id })
232            }
233            StateAction::Disconnect { peer_id, reason } => {
234                self.sessions.disconnect(peer_id, reason);
235            }
236            StateAction::NewBlock { peer_id, block: msg } => {
237                let msg = PeerMessage::NewBlock(msg);
238                self.sessions.send_message(&peer_id, msg);
239            }
240            StateAction::NewBlockHashes { peer_id, hashes } => {
241                let msg = PeerMessage::NewBlockHashes(hashes);
242                self.sessions.send_message(&peer_id, msg);
243            }
244            StateAction::PeerAdded(peer_id) => return Some(SwarmEvent::PeerAdded(peer_id)),
245            StateAction::PeerRemoved(peer_id) => return Some(SwarmEvent::PeerRemoved(peer_id)),
246            StateAction::DiscoveredNode { peer_id, addr, fork_id } => {
247                // Don't try to connect to peer if node is shutting down
248                if self.is_shutting_down() {
249                    return None
250                }
251                // Insert peer only if no fork id or a valid fork id
252                if fork_id.map_or_else(|| true, |f| self.sessions.is_valid_fork_id(f)) {
253                    self.state_mut().peers_mut().add_peer(peer_id, addr, fork_id);
254                }
255            }
256            StateAction::DiscoveredEnrForkId { peer_id, fork_id } => {
257                if self.sessions.is_valid_fork_id(fork_id) {
258                    self.state_mut().peers_mut().set_discovered_fork_id(peer_id, fork_id);
259                } else {
260                    self.state_mut().peers_mut().remove_peer(peer_id);
261                }
262            }
263        }
264        None
265    }
266
267    /// Set network connection state to `ShuttingDown`
268    pub(crate) const fn on_shutdown_requested(&mut self) {
269        self.state_mut().peers_mut().on_shutdown();
270    }
271
272    /// Checks if the node's network connection state is '`ShuttingDown`'
273    #[inline]
274    pub(crate) const fn is_shutting_down(&self) -> bool {
275        self.state().peers().connection_state().is_shutting_down()
276    }
277
278    /// Set network connection state to `Hibernate` or `Active`
279    pub(crate) const fn on_network_state_change(&mut self, network_state: NetworkConnectionState) {
280        self.state_mut().peers_mut().on_network_state_change(network_state);
281    }
282}
283
284impl<N: NetworkPrimitives> Stream for Swarm<N> {
285    type Item = SwarmEvent<N>;
286
287    /// This advances all components.
288    ///
289    /// Processes, delegates (internal) commands received from the
290    /// [`NetworkManager`](crate::NetworkManager), then polls the [`SessionManager`] which
291    /// yields messages produced by individual peer sessions that are then handled. Least
292    /// priority are incoming connections that are handled and delegated to
293    /// the [`SessionManager`] to turn them into a session.
294    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
295        let this = self.get_mut();
296
297        // This loop advances the network's state prioritizing local work [NetworkState] over work
298        // coming in from the network [SessionManager], [ConnectionListener]
299        // Existing connections are prioritized over new __incoming__ connections
300        loop {
301            while let Poll::Ready(action) = this.state.poll(cx) {
302                if let Some(event) = this.on_state_action(action) {
303                    return Poll::Ready(Some(event))
304                }
305            }
306
307            // poll all sessions
308            match this.sessions.poll(cx) {
309                Poll::Pending => {}
310                Poll::Ready(event) => {
311                    if let Some(event) = this.on_session_event(event) {
312                        return Poll::Ready(Some(event))
313                    }
314                    continue
315                }
316            }
317
318            // poll listener for incoming connections
319            match Pin::new(&mut this.incoming).poll(cx) {
320                Poll::Pending => {}
321                Poll::Ready(event) => {
322                    if let Some(event) = this.on_connection(event) {
323                        return Poll::Ready(Some(event))
324                    }
325                    continue
326                }
327            }
328
329            return Poll::Pending
330        }
331    }
332}
333
334/// All events created or delegated by the [`Swarm`] that represents changes to the state of the
335/// network.
336pub(crate) enum SwarmEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
337    /// Events related to the actual network protocol.
338    ValidMessage {
339        /// The peer that sent the message
340        peer_id: PeerId,
341        /// Message received from the peer
342        message: PeerMessage<N>,
343    },
344    /// Received a bad message from the peer.
345    BadMessage {
346        /// Identifier of the remote peer.
347        peer_id: PeerId,
348    },
349    /// Remote peer is considered in protocol violation
350    ProtocolBreach {
351        /// Identifier of the remote peer.
352        peer_id: PeerId,
353    },
354    /// The underlying tcp listener closed.
355    TcpListenerClosed {
356        /// Address of the closed listener.
357        remote_addr: SocketAddr,
358    },
359    /// The underlying tcp listener encountered an error that we bubble up.
360    TcpListenerError(io::Error),
361    /// Received an incoming tcp connection.
362    ///
363    /// This represents the first step in the session authentication process. The swarm will
364    /// produce subsequent events once the stream has been authenticated, or was rejected.
365    IncomingTcpConnection {
366        /// The internal session identifier under which this connection is currently tracked.
367        session_id: SessionId,
368        /// Address of the remote peer.
369        remote_addr: SocketAddr,
370    },
371    /// An outbound connection is initiated.
372    OutgoingTcpConnection {
373        /// Address of the remote peer.
374        peer_id: PeerId,
375        remote_addr: SocketAddr,
376    },
377    SessionEstablished {
378        peer_id: PeerId,
379        remote_addr: SocketAddr,
380        client_version: Arc<str>,
381        capabilities: Arc<Capabilities>,
382        /// negotiated eth version
383        version: EthVersion,
384        messages: PeerRequestSender<PeerRequest<N>>,
385        status: Arc<Status>,
386        direction: Direction,
387    },
388    SessionClosed {
389        peer_id: PeerId,
390        remote_addr: SocketAddr,
391        /// Whether the session was closed due to an error
392        error: Option<EthStreamError>,
393    },
394    /// Admin rpc: new peer added
395    PeerAdded(PeerId),
396    /// Admin rpc: peer removed
397    PeerRemoved(PeerId),
398    /// Closed an incoming pending session during authentication.
399    IncomingPendingSessionClosed {
400        remote_addr: SocketAddr,
401        error: Option<PendingSessionHandshakeError>,
402    },
403    /// Closed an outgoing pending session during authentication.
404    OutgoingPendingSessionClosed {
405        remote_addr: SocketAddr,
406        peer_id: PeerId,
407        error: Option<PendingSessionHandshakeError>,
408    },
409    /// Failed to establish a tcp stream to the given address/node
410    OutgoingConnectionError { remote_addr: SocketAddr, peer_id: PeerId, error: io::Error },
411}
412
413/// Represents the state of the connection of the node. If shutting down,
414/// new connections won't be established.
415/// When in hibernation mode, the node will not initiate new outbound connections. This is
416/// beneficial for sync stages that do not require a network connection.
417#[derive(Debug, Default)]
418pub enum NetworkConnectionState {
419    /// Node is active, new outbound connections will be established.
420    #[default]
421    Active,
422    /// Node is shutting down, no new outbound connections will be established.
423    ShuttingDown,
424    /// Hibernate Network connection, no new outbound connections will be established.
425    Hibernate,
426}
427
428impl NetworkConnectionState {
429    /// Returns true if the node is active.
430    pub(crate) const fn is_active(&self) -> bool {
431        matches!(self, Self::Active)
432    }
433
434    /// Returns true if the node is shutting down.
435    pub(crate) const fn is_shutting_down(&self) -> bool {
436        matches!(self, Self::ShuttingDown)
437    }
438}