reth_network/
swarm.rs

1use crate::{
2    listener::{ConnectionListener, ListenerEvent},
3    message::PeerMessage,
4    peers::InboundConnectionError,
5    protocol::IntoRlpxSubProtocol,
6    session::{Direction, PendingSessionHandshakeError, SessionEvent, SessionId, SessionManager},
7    state::{NetworkState, StateAction},
8};
9use futures::Stream;
10use reth_eth_wire::{
11    errors::EthStreamError, Capabilities, DisconnectReason, EthNetworkPrimitives, EthVersion,
12    NetworkPrimitives, UnifiedStatus,
13};
14use reth_network_api::{PeerRequest, PeerRequestSender};
15use reth_network_peers::PeerId;
16use std::{
17    io,
18    net::SocketAddr,
19    pin::Pin,
20    sync::Arc,
21    task::{Context, Poll},
22};
23use tracing::{debug, trace};
24
25#[cfg_attr(doc, aquamarine::aquamarine)]
26/// Contains the connectivity related state of the network.
27///
28/// A swarm emits [`SwarmEvent`]s when polled.
29///
30/// It manages the [`ConnectionListener`] and delegates new incoming connections to the
31/// [`SessionManager`]. Outgoing connections are either initiated on demand or triggered by the
32/// [`NetworkState`] and also delegated to the [`NetworkState`].
33///
34/// Following diagram displays the dataflow contained in the [`Swarm`]
35///
36/// The [`ConnectionListener`] yields incoming [`TcpStream`]s from peers that are spawned as session
37/// tasks. After a successful `RLPx` authentication, the task is ready to accept ETH requests or
38/// broadcast messages. A task listens for messages from the [`SessionManager`] which include
39/// broadcast messages like `Transactions` or internal commands, for example to disconnect the
40/// session.
41///
42/// The [`NetworkState`] keeps track of all connected and discovered peers and can initiate outgoing
43/// connections. For each active session, the [`NetworkState`] keeps a sender half of the ETH
44/// request channel for the created session and sends requests it receives from the
45/// [`StateFetcher`], which receives request objects from the client interfaces responsible for
46/// downloading headers and bodies.
47///
48/// `include_mmd!("docs/mermaid/swarm.mmd`")
49#[derive(Debug)]
50#[must_use = "Swarm does nothing unless polled"]
51pub(crate) struct Swarm<N: NetworkPrimitives = EthNetworkPrimitives> {
52    /// Listens for new incoming connections.
53    incoming: ConnectionListener,
54    /// All sessions.
55    sessions: SessionManager<N>,
56    /// Tracks the entire state of the network and handles events received from the sessions.
57    state: NetworkState<N>,
58}
59
60// === impl Swarm ===
61
62impl<N: NetworkPrimitives> Swarm<N> {
63    /// Configures a new swarm instance.
64    pub(crate) const fn new(
65        incoming: ConnectionListener,
66        sessions: SessionManager<N>,
67        state: NetworkState<N>,
68    ) -> Self {
69        Self { incoming, sessions, state }
70    }
71
72    /// Adds a protocol handler to the `RLPx` sub-protocol list.
73    pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
74        self.sessions_mut().add_rlpx_sub_protocol(protocol);
75    }
76
77    /// Access to the state.
78    pub(crate) const fn state(&self) -> &NetworkState<N> {
79        &self.state
80    }
81
82    /// Mutable access to the state.
83    pub(crate) const fn state_mut(&mut self) -> &mut NetworkState<N> {
84        &mut self.state
85    }
86
87    /// Access to the [`ConnectionListener`].
88    pub(crate) const fn listener(&self) -> &ConnectionListener {
89        &self.incoming
90    }
91
92    /// Access to the [`SessionManager`].
93    pub(crate) const fn sessions(&self) -> &SessionManager<N> {
94        &self.sessions
95    }
96
97    /// Mutable access to the [`SessionManager`].
98    pub(crate) const fn sessions_mut(&mut self) -> &mut SessionManager<N> {
99        &mut self.sessions
100    }
101}
102
103impl<N: NetworkPrimitives> Swarm<N> {
104    /// Triggers a new outgoing connection to the given node
105    pub(crate) fn dial_outbound(&mut self, remote_addr: SocketAddr, remote_id: PeerId) {
106        self.sessions.dial_outbound(remote_addr, remote_id)
107    }
108
109    /// Handles a polled [`SessionEvent`]
110    ///
111    /// This either updates the state or produces a new [`SwarmEvent`] that is bubbled up to the
112    /// manager.
113    fn on_session_event(&mut self, event: SessionEvent<N>) -> Option<SwarmEvent<N>> {
114        match event {
115            SessionEvent::SessionEstablished {
116                peer_id,
117                remote_addr,
118                client_version,
119                capabilities,
120                version,
121                status,
122                messages,
123                direction,
124                timeout,
125                range_info,
126            } => {
127                self.state.on_session_activated(
128                    peer_id,
129                    capabilities.clone(),
130                    status.clone(),
131                    messages.clone(),
132                    timeout,
133                    range_info,
134                );
135                Some(SwarmEvent::SessionEstablished {
136                    peer_id,
137                    remote_addr,
138                    client_version,
139                    capabilities,
140                    version,
141                    messages,
142                    status,
143                    direction,
144                })
145            }
146            SessionEvent::AlreadyConnected { peer_id, remote_addr, direction } => {
147                trace!(target: "net", ?peer_id, ?remote_addr, ?direction, "already connected");
148                self.state.peers_mut().on_already_connected(direction);
149                None
150            }
151            SessionEvent::ValidMessage { peer_id, message } => {
152                Some(SwarmEvent::ValidMessage { peer_id, message })
153            }
154            SessionEvent::IncomingPendingSessionClosed { remote_addr, error } => {
155                Some(SwarmEvent::IncomingPendingSessionClosed { remote_addr, error })
156            }
157            SessionEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
158                Some(SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error })
159            }
160            SessionEvent::Disconnected { peer_id, remote_addr } => {
161                self.state.on_session_closed(peer_id);
162                Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: None })
163            }
164            SessionEvent::SessionClosedOnConnectionError { peer_id, remote_addr, error } => {
165                self.state.on_session_closed(peer_id);
166                Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: Some(error) })
167            }
168            SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
169                Some(SwarmEvent::OutgoingConnectionError { peer_id, remote_addr, error })
170            }
171            SessionEvent::BadMessage { peer_id } => Some(SwarmEvent::BadMessage { peer_id }),
172            SessionEvent::ProtocolBreach { peer_id } => {
173                Some(SwarmEvent::ProtocolBreach { peer_id })
174            }
175        }
176    }
177
178    /// Callback for events produced by [`ConnectionListener`].
179    ///
180    /// Depending on the event, this will produce a new [`SwarmEvent`].
181    fn on_connection(&mut self, event: ListenerEvent) -> Option<SwarmEvent<N>> {
182        match event {
183            ListenerEvent::Error(err) => return Some(SwarmEvent::TcpListenerError(err)),
184            ListenerEvent::ListenerClosed { local_address: address } => {
185                return Some(SwarmEvent::TcpListenerClosed { remote_addr: address })
186            }
187            ListenerEvent::Incoming { stream, remote_addr } => {
188                // Reject incoming connection if node is shutting down.
189                if self.is_shutting_down() {
190                    return None
191                }
192                // ensure we can handle an incoming connection from this address
193                if let Err(err) =
194                    self.state_mut().peers_mut().on_incoming_pending_session(remote_addr.ip())
195                {
196                    match err {
197                        InboundConnectionError::IpBanned => {
198                            trace!(target: "net", ?remote_addr, "The incoming ip address is in the ban list");
199                        }
200                        InboundConnectionError::ExceedsCapacity => {
201                            trace!(target: "net", ?remote_addr, "No capacity for incoming connection");
202                            self.sessions.try_disconnect_incoming_connection(
203                                stream,
204                                DisconnectReason::TooManyPeers,
205                            );
206                        }
207                    }
208                    return None
209                }
210
211                match self.sessions.on_incoming(stream, remote_addr) {
212                    Ok(session_id) => {
213                        trace!(target: "net", ?remote_addr, "Incoming connection");
214                        return Some(SwarmEvent::IncomingTcpConnection { session_id, remote_addr })
215                    }
216                    Err(err) => {
217                        trace!(target: "net", %err, "Incoming connection rejected, capacity already reached.");
218                        self.state_mut()
219                            .peers_mut()
220                            .on_incoming_pending_session_rejected_internally();
221                    }
222                }
223            }
224        }
225        None
226    }
227
228    /// Hook for actions pulled from the state
229    fn on_state_action(&mut self, event: StateAction<N>) -> Option<SwarmEvent<N>> {
230        match event {
231            StateAction::Connect { remote_addr, peer_id } => {
232                self.dial_outbound(remote_addr, peer_id);
233                return Some(SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id })
234            }
235            StateAction::Disconnect { peer_id, reason } => {
236                self.sessions.disconnect(peer_id, reason);
237            }
238            StateAction::NewBlock { peer_id, block: msg } => {
239                let msg = PeerMessage::NewBlock(msg);
240                self.sessions.send_message(&peer_id, msg);
241            }
242            StateAction::NewBlockHashes { peer_id, hashes } => {
243                let msg = PeerMessage::NewBlockHashes(hashes);
244                self.sessions.send_message(&peer_id, msg);
245            }
246            StateAction::PeerAdded(peer_id) => return Some(SwarmEvent::PeerAdded(peer_id)),
247            StateAction::PeerRemoved(peer_id) => return Some(SwarmEvent::PeerRemoved(peer_id)),
248            StateAction::DiscoveredNode { peer_id, addr, fork_id } => {
249                // Don't try to connect to peer if node is shutting down
250                if self.is_shutting_down() {
251                    return None
252                }
253                // Insert peer only if no fork id or a valid fork id
254                if fork_id.map_or_else(|| true, |f| self.sessions.is_valid_fork_id(f)) {
255                    self.state_mut().peers_mut().add_peer(peer_id, addr, fork_id);
256                }
257            }
258            StateAction::DiscoveredEnrForkId { peer_id, fork_id } => {
259                if self.sessions.is_valid_fork_id(fork_id) {
260                    self.state_mut().peers_mut().set_discovered_fork_id(peer_id, fork_id);
261                } else {
262                    debug!(target: "net", ?peer_id, remote_fork_id=?fork_id, our_fork_id=?self.sessions.fork_id(), "fork id mismatch, removing peer");
263                    self.state_mut().peers_mut().remove_peer(peer_id);
264                }
265            }
266        }
267        None
268    }
269
270    /// Set network connection state to `ShuttingDown`
271    pub(crate) const fn on_shutdown_requested(&mut self) {
272        self.state_mut().peers_mut().on_shutdown();
273    }
274
275    /// Checks if the node's network connection state is '`ShuttingDown`'
276    #[inline]
277    pub(crate) const fn is_shutting_down(&self) -> bool {
278        self.state().peers().connection_state().is_shutting_down()
279    }
280
281    /// Set network connection state to `Hibernate` or `Active`
282    pub(crate) const fn on_network_state_change(&mut self, network_state: NetworkConnectionState) {
283        self.state_mut().peers_mut().on_network_state_change(network_state);
284    }
285}
286
287impl<N: NetworkPrimitives> Stream for Swarm<N> {
288    type Item = SwarmEvent<N>;
289
290    /// This advances all components.
291    ///
292    /// Processes, delegates (internal) commands received from the
293    /// [`NetworkManager`](crate::NetworkManager), then polls the [`SessionManager`] which
294    /// yields messages produced by individual peer sessions that are then handled. Least
295    /// priority are incoming connections that are handled and delegated to
296    /// the [`SessionManager`] to turn them into a session.
297    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
298        let this = self.get_mut();
299
300        // This loop advances the network's state prioritizing local work [NetworkState] over work
301        // coming in from the network [SessionManager], [ConnectionListener]
302        // Existing connections are prioritized over new __incoming__ connections
303        loop {
304            while let Poll::Ready(action) = this.state.poll(cx) {
305                if let Some(event) = this.on_state_action(action) {
306                    return Poll::Ready(Some(event))
307                }
308            }
309
310            // poll all sessions
311            match this.sessions.poll(cx) {
312                Poll::Pending => {}
313                Poll::Ready(event) => {
314                    if let Some(event) = this.on_session_event(event) {
315                        return Poll::Ready(Some(event))
316                    }
317                    continue
318                }
319            }
320
321            // poll listener for incoming connections
322            match Pin::new(&mut this.incoming).poll(cx) {
323                Poll::Pending => {}
324                Poll::Ready(event) => {
325                    if let Some(event) = this.on_connection(event) {
326                        return Poll::Ready(Some(event))
327                    }
328                    continue
329                }
330            }
331
332            return Poll::Pending
333        }
334    }
335}
336
337/// All events created or delegated by the [`Swarm`] that represents changes to the state of the
338/// network.
339pub(crate) enum SwarmEvent<N: NetworkPrimitives = EthNetworkPrimitives> {
340    /// Events related to the actual network protocol.
341    ValidMessage {
342        /// The peer that sent the message
343        peer_id: PeerId,
344        /// Message received from the peer
345        message: PeerMessage<N>,
346    },
347    /// Received a bad message from the peer.
348    BadMessage {
349        /// Identifier of the remote peer.
350        peer_id: PeerId,
351    },
352    /// Remote peer is considered in protocol violation
353    ProtocolBreach {
354        /// Identifier of the remote peer.
355        peer_id: PeerId,
356    },
357    /// The underlying tcp listener closed.
358    TcpListenerClosed {
359        /// Address of the closed listener.
360        remote_addr: SocketAddr,
361    },
362    /// The underlying tcp listener encountered an error that we bubble up.
363    TcpListenerError(io::Error),
364    /// Received an incoming tcp connection.
365    ///
366    /// This represents the first step in the session authentication process. The swarm will
367    /// produce subsequent events once the stream has been authenticated, or was rejected.
368    IncomingTcpConnection {
369        /// The internal session identifier under which this connection is currently tracked.
370        session_id: SessionId,
371        /// Address of the remote peer.
372        remote_addr: SocketAddr,
373    },
374    /// An outbound connection is initiated.
375    OutgoingTcpConnection {
376        /// Address of the remote peer.
377        peer_id: PeerId,
378        remote_addr: SocketAddr,
379    },
380    SessionEstablished {
381        peer_id: PeerId,
382        remote_addr: SocketAddr,
383        client_version: Arc<str>,
384        capabilities: Arc<Capabilities>,
385        /// negotiated eth version
386        version: EthVersion,
387        messages: PeerRequestSender<PeerRequest<N>>,
388        status: Arc<UnifiedStatus>,
389        direction: Direction,
390    },
391    SessionClosed {
392        peer_id: PeerId,
393        remote_addr: SocketAddr,
394        /// Whether the session was closed due to an error
395        error: Option<EthStreamError>,
396    },
397    /// Admin rpc: new peer added
398    PeerAdded(PeerId),
399    /// Admin rpc: peer removed
400    PeerRemoved(PeerId),
401    /// Closed an incoming pending session during authentication.
402    IncomingPendingSessionClosed {
403        remote_addr: SocketAddr,
404        error: Option<PendingSessionHandshakeError>,
405    },
406    /// Closed an outgoing pending session during authentication.
407    OutgoingPendingSessionClosed {
408        remote_addr: SocketAddr,
409        peer_id: PeerId,
410        error: Option<PendingSessionHandshakeError>,
411    },
412    /// Failed to establish a tcp stream to the given address/node
413    OutgoingConnectionError { remote_addr: SocketAddr, peer_id: PeerId, error: io::Error },
414}
415
416/// Represents the state of the connection of the node. If shutting down,
417/// new connections won't be established.
418/// When in hibernation mode, the node will not initiate new outbound connections. This is
419/// beneficial for sync stages that do not require a network connection.
420#[derive(Debug, Default)]
421pub enum NetworkConnectionState {
422    /// Node is active, new outbound connections will be established.
423    #[default]
424    Active,
425    /// Node is shutting down, no new outbound connections will be established.
426    ShuttingDown,
427    /// Hibernate Network connection, no new outbound connections will be established.
428    Hibernate,
429}
430
431impl NetworkConnectionState {
432    /// Returns true if the node is active.
433    pub(crate) const fn is_active(&self) -> bool {
434        matches!(self, Self::Active)
435    }
436
437    /// Returns true if the node is shutting down.
438    pub(crate) const fn is_shutting_down(&self) -> bool {
439        matches!(self, Self::ShuttingDown)
440    }
441}