reth_network/
state.rs

1//! Keeps track of the state of the network.
2
3use crate::{
4    cache::LruCache,
5    discovery::Discovery,
6    fetch::{BlockResponseOutcome, FetchAction, StateFetcher},
7    message::{BlockRequest, NewBlockMessage, PeerResponse, PeerResponseResult},
8    peers::{PeerAction, PeersManager},
9    FetchClient,
10};
11use alloy_consensus::BlockHeader;
12use alloy_primitives::B256;
13use rand::seq::SliceRandom;
14use reth_eth_wire::{
15    BlockHashNumber, Capabilities, DisconnectReason, EthNetworkPrimitives, NetworkPrimitives,
16    NewBlockHashes, Status,
17};
18use reth_ethereum_forks::ForkId;
19use reth_network_api::{DiscoveredEvent, DiscoveryEvent, PeerRequest, PeerRequestSender};
20use reth_network_peers::PeerId;
21use reth_network_types::{PeerAddr, PeerKind};
22use reth_primitives_traits::Block;
23use std::{
24    collections::{HashMap, VecDeque},
25    fmt,
26    net::{IpAddr, SocketAddr},
27    ops::Deref,
28    sync::{
29        atomic::{AtomicU64, AtomicUsize},
30        Arc,
31    },
32    task::{Context, Poll},
33};
34use tokio::sync::oneshot;
35use tracing::{debug, trace};
36
37/// Cache limit of blocks to keep track of for a single peer.
38const PEER_BLOCK_CACHE_LIMIT: u32 = 512;
39
40/// Wrapper type for the [`BlockNumReader`] trait.
41pub(crate) struct BlockNumReader(Box<dyn reth_storage_api::BlockNumReader>);
42
43impl BlockNumReader {
44    /// Create a new instance with the given reader.
45    pub fn new(reader: impl reth_storage_api::BlockNumReader + 'static) -> Self {
46        Self(Box::new(reader))
47    }
48}
49
50impl fmt::Debug for BlockNumReader {
51    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52        f.debug_struct("BlockNumReader").field("inner", &"<dyn BlockNumReader>").finish()
53    }
54}
55
56impl Deref for BlockNumReader {
57    type Target = Box<dyn reth_storage_api::BlockNumReader>;
58
59    fn deref(&self) -> &Self::Target {
60        &self.0
61    }
62}
63
64/// The [`NetworkState`] keeps track of the state of all peers in the network.
65///
66/// This includes:
67///   - [`Discovery`]: manages the discovery protocol, essentially a stream of discovery updates
68///   - [`PeersManager`]: keeps track of connected peers and issues new outgoing connections
69///     depending on the configured capacity.
70///   - [`StateFetcher`]: streams download request (received from outside via channel) which are
71///     then send to the session of the peer.
72///
73/// This type is also responsible for responding for received request.
74#[derive(Debug)]
75pub struct NetworkState<N: NetworkPrimitives = EthNetworkPrimitives> {
76    /// All active peers and their state.
77    active_peers: HashMap<PeerId, ActivePeer<N>>,
78    /// Manages connections to peers.
79    peers_manager: PeersManager,
80    /// Buffered messages until polled.
81    queued_messages: VecDeque<StateAction<N>>,
82    /// The client type that can interact with the chain.
83    ///
84    /// This type is used to fetch the block number after we established a session and received the
85    /// [Status] block hash.
86    client: BlockNumReader,
87    /// Network discovery.
88    discovery: Discovery,
89    /// The type that handles requests.
90    ///
91    /// The fetcher streams `RLPx` related requests on a per-peer basis to this type. This type
92    /// will then queue in the request and notify the fetcher once the result has been
93    /// received.
94    state_fetcher: StateFetcher<N>,
95}
96
97impl<N: NetworkPrimitives> NetworkState<N> {
98    /// Create a new state instance with the given params
99    pub(crate) fn new(
100        client: BlockNumReader,
101        discovery: Discovery,
102        peers_manager: PeersManager,
103        num_active_peers: Arc<AtomicUsize>,
104    ) -> Self {
105        let state_fetcher = StateFetcher::new(peers_manager.handle(), num_active_peers);
106        Self {
107            active_peers: Default::default(),
108            peers_manager,
109            queued_messages: Default::default(),
110            client,
111            discovery,
112            state_fetcher,
113        }
114    }
115
116    /// Returns mutable access to the [`PeersManager`]
117    pub(crate) const fn peers_mut(&mut self) -> &mut PeersManager {
118        &mut self.peers_manager
119    }
120
121    /// Returns mutable access to the [`Discovery`]
122    pub(crate) const fn discovery_mut(&mut self) -> &mut Discovery {
123        &mut self.discovery
124    }
125
126    /// Returns access to the [`PeersManager`]
127    pub(crate) const fn peers(&self) -> &PeersManager {
128        &self.peers_manager
129    }
130
131    /// Returns a new [`FetchClient`]
132    pub(crate) fn fetch_client(&self) -> FetchClient<N> {
133        self.state_fetcher.client()
134    }
135
136    /// How many peers we're currently connected to.
137    pub fn num_active_peers(&self) -> usize {
138        self.active_peers.len()
139    }
140
141    /// Event hook for an activated session for the peer.
142    ///
143    /// Returns `Ok` if the session is valid, returns an `Err` if the session is not accepted and
144    /// should be rejected.
145    pub(crate) fn on_session_activated(
146        &mut self,
147        peer: PeerId,
148        capabilities: Arc<Capabilities>,
149        status: Arc<Status>,
150        request_tx: PeerRequestSender<PeerRequest<N>>,
151        timeout: Arc<AtomicU64>,
152    ) {
153        debug_assert!(!self.active_peers.contains_key(&peer), "Already connected; not possible");
154
155        // find the corresponding block number
156        let block_number =
157            self.client.block_number(status.blockhash).ok().flatten().unwrap_or_default();
158        self.state_fetcher.new_active_peer(peer, status.blockhash, block_number, timeout);
159
160        self.active_peers.insert(
161            peer,
162            ActivePeer {
163                best_hash: status.blockhash,
164                capabilities,
165                request_tx,
166                pending_response: None,
167                blocks: LruCache::new(PEER_BLOCK_CACHE_LIMIT),
168            },
169        );
170    }
171
172    /// Event hook for a disconnected session for the given peer.
173    ///
174    /// This will remove the peer from the available set of peers and close all inflight requests.
175    pub(crate) fn on_session_closed(&mut self, peer: PeerId) {
176        self.active_peers.remove(&peer);
177        self.state_fetcher.on_session_closed(&peer);
178    }
179
180    /// Starts propagating the new block to peers that haven't reported the block yet.
181    ///
182    /// This is supposed to be invoked after the block was validated.
183    ///
184    /// > It then sends the block to a small fraction of connected peers (usually the square root of
185    /// > the total number of peers) using the `NewBlock` message.
186    ///
187    /// See also <https://github.com/ethereum/devp2p/blob/master/caps/eth.md>
188    pub(crate) fn announce_new_block(&mut self, msg: NewBlockMessage<N::Block>) {
189        // send a `NewBlock` message to a fraction of the connected peers (square root of the total
190        // number of peers)
191        let num_propagate = (self.active_peers.len() as f64).sqrt() as u64 + 1;
192
193        let number = msg.block.block.header().number();
194        let mut count = 0;
195
196        // Shuffle to propagate to a random sample of peers on every block announcement
197        let mut peers: Vec<_> = self.active_peers.iter_mut().collect();
198        peers.shuffle(&mut rand::rng());
199
200        for (peer_id, peer) in peers {
201            if peer.blocks.contains(&msg.hash) {
202                // skip peers which already reported the block
203                continue
204            }
205
206            // Queue a `NewBlock` message for the peer
207            if count < num_propagate {
208                self.queued_messages
209                    .push_back(StateAction::NewBlock { peer_id: *peer_id, block: msg.clone() });
210
211                // update peer block info
212                if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
213                    peer.best_hash = msg.hash;
214                }
215
216                // mark the block as seen by the peer
217                peer.blocks.insert(msg.hash);
218
219                count += 1;
220            }
221
222            if count >= num_propagate {
223                break
224            }
225        }
226    }
227
228    /// Completes the block propagation process started in [`NetworkState::announce_new_block()`]
229    /// but sending `NewBlockHash` broadcast to all peers that haven't seen it yet.
230    pub(crate) fn announce_new_block_hash(&mut self, msg: NewBlockMessage<N::Block>) {
231        let number = msg.block.block.header().number();
232        let hashes = NewBlockHashes(vec![BlockHashNumber { hash: msg.hash, number }]);
233        for (peer_id, peer) in &mut self.active_peers {
234            if peer.blocks.contains(&msg.hash) {
235                // skip peers which already reported the block
236                continue
237            }
238
239            if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
240                peer.best_hash = msg.hash;
241            }
242
243            self.queued_messages.push_back(StateAction::NewBlockHashes {
244                peer_id: *peer_id,
245                hashes: hashes.clone(),
246            });
247        }
248    }
249
250    /// Updates the block information for the peer.
251    pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) {
252        if let Some(peer) = self.active_peers.get_mut(peer_id) {
253            peer.best_hash = hash;
254        }
255        self.state_fetcher.update_peer_block(peer_id, hash, number);
256    }
257
258    /// Invoked when a new [`ForkId`] is activated.
259    pub(crate) fn update_fork_id(&self, fork_id: ForkId) {
260        self.discovery.update_fork_id(fork_id)
261    }
262
263    /// Invoked after a `NewBlock` message was received by the peer.
264    ///
265    /// This will keep track of blocks we know a peer has
266    pub(crate) fn on_new_block(&mut self, peer_id: PeerId, hash: B256) {
267        // Mark the blocks as seen
268        if let Some(peer) = self.active_peers.get_mut(&peer_id) {
269            peer.blocks.insert(hash);
270        }
271    }
272
273    /// Invoked for a `NewBlockHashes` broadcast message.
274    pub(crate) fn on_new_block_hashes(&mut self, peer_id: PeerId, hashes: Vec<BlockHashNumber>) {
275        // Mark the blocks as seen
276        if let Some(peer) = self.active_peers.get_mut(&peer_id) {
277            peer.blocks.extend(hashes.into_iter().map(|b| b.hash));
278        }
279    }
280
281    /// Bans the [`IpAddr`] in the discovery service.
282    pub(crate) fn ban_ip_discovery(&self, ip: IpAddr) {
283        trace!(target: "net", ?ip, "Banning discovery");
284        self.discovery.ban_ip(ip)
285    }
286
287    /// Bans the [`PeerId`] and [`IpAddr`] in the discovery service.
288    pub(crate) fn ban_discovery(&self, peer_id: PeerId, ip: IpAddr) {
289        trace!(target: "net", ?peer_id, ?ip, "Banning discovery");
290        self.discovery.ban(peer_id, ip)
291    }
292
293    /// Marks the given peer as trusted.
294    pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
295        self.peers_manager.add_trusted_peer_id(peer_id)
296    }
297
298    /// Adds a peer and its address with the given kind to the peerset.
299    pub(crate) fn add_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind, addr: PeerAddr) {
300        self.peers_manager.add_peer_kind(peer_id, kind, addr, None)
301    }
302
303    /// Connects a peer and its address with the given kind
304    pub(crate) fn add_and_connect(&mut self, peer_id: PeerId, kind: PeerKind, addr: PeerAddr) {
305        self.peers_manager.add_and_connect_kind(peer_id, kind, addr, None)
306    }
307
308    /// Removes a peer and its address with the given kind from the peerset.
309    pub(crate) fn remove_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind) {
310        match kind {
311            PeerKind::Basic | PeerKind::Static => self.peers_manager.remove_peer(peer_id),
312            PeerKind::Trusted => self.peers_manager.remove_peer_from_trusted_set(peer_id),
313        }
314    }
315
316    /// Event hook for events received from the discovery service.
317    fn on_discovery_event(&mut self, event: DiscoveryEvent) {
318        match event {
319            DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { peer_id, addr, fork_id }) => {
320                self.queued_messages.push_back(StateAction::DiscoveredNode {
321                    peer_id,
322                    addr,
323                    fork_id,
324                });
325            }
326            DiscoveryEvent::EnrForkId(peer_id, fork_id) => {
327                self.queued_messages
328                    .push_back(StateAction::DiscoveredEnrForkId { peer_id, fork_id });
329            }
330        }
331    }
332
333    /// Event hook for new actions derived from the peer management set.
334    fn on_peer_action(&mut self, action: PeerAction) {
335        match action {
336            PeerAction::Connect { peer_id, remote_addr } => {
337                self.queued_messages.push_back(StateAction::Connect { peer_id, remote_addr });
338            }
339            PeerAction::Disconnect { peer_id, reason } => {
340                self.state_fetcher.on_pending_disconnect(&peer_id);
341                self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason });
342            }
343            PeerAction::DisconnectBannedIncoming { peer_id } |
344            PeerAction::DisconnectUntrustedIncoming { peer_id } => {
345                self.state_fetcher.on_pending_disconnect(&peer_id);
346                self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason: None });
347            }
348            PeerAction::DiscoveryBanPeerId { peer_id, ip_addr } => {
349                self.ban_discovery(peer_id, ip_addr)
350            }
351            PeerAction::DiscoveryBanIp { ip_addr } => self.ban_ip_discovery(ip_addr),
352            PeerAction::PeerAdded(peer_id) => {
353                self.queued_messages.push_back(StateAction::PeerAdded(peer_id))
354            }
355            PeerAction::PeerRemoved(peer_id) => {
356                self.queued_messages.push_back(StateAction::PeerRemoved(peer_id))
357            }
358            PeerAction::BanPeer { .. } | PeerAction::UnBanPeer { .. } => {}
359        }
360    }
361
362    /// Sends The message to the peer's session and queues in a response.
363    ///
364    /// Caution: this will replace an already pending response. It's the responsibility of the
365    /// caller to select the peer.
366    fn handle_block_request(&mut self, peer: PeerId, request: BlockRequest) {
367        if let Some(ref mut peer) = self.active_peers.get_mut(&peer) {
368            let (request, response) = match request {
369                BlockRequest::GetBlockHeaders(request) => {
370                    let (response, rx) = oneshot::channel();
371                    let request = PeerRequest::GetBlockHeaders { request, response };
372                    let response = PeerResponse::BlockHeaders { response: rx };
373                    (request, response)
374                }
375                BlockRequest::GetBlockBodies(request) => {
376                    let (response, rx) = oneshot::channel();
377                    let request = PeerRequest::GetBlockBodies { request, response };
378                    let response = PeerResponse::BlockBodies { response: rx };
379                    (request, response)
380                }
381            };
382            let _ = peer.request_tx.to_session_tx.try_send(request);
383            peer.pending_response = Some(response);
384        }
385    }
386
387    /// Handle the outcome of processed response, for example directly queue another request.
388    fn on_block_response_outcome(&mut self, outcome: BlockResponseOutcome) {
389        match outcome {
390            BlockResponseOutcome::Request(peer, request) => {
391                self.handle_block_request(peer, request);
392            }
393            BlockResponseOutcome::BadResponse(peer, reputation_change) => {
394                self.peers_manager.apply_reputation_change(&peer, reputation_change);
395            }
396        }
397    }
398
399    /// Invoked when received a response from a connected peer.
400    ///
401    /// Delegates the response result to the fetcher which may return an outcome specific
402    /// instruction that needs to be handled in [`Self::on_block_response_outcome`]. This could be
403    /// a follow-up request or an instruction to slash the peer's reputation.
404    fn on_eth_response(&mut self, peer: PeerId, resp: PeerResponseResult<N>) {
405        let outcome = match resp {
406            PeerResponseResult::BlockHeaders(res) => {
407                self.state_fetcher.on_block_headers_response(peer, res)
408            }
409            PeerResponseResult::BlockBodies(res) => {
410                self.state_fetcher.on_block_bodies_response(peer, res)
411            }
412            _ => None,
413        };
414
415        if let Some(outcome) = outcome {
416            self.on_block_response_outcome(outcome);
417        }
418    }
419
420    /// Advances the state
421    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<StateAction<N>> {
422        loop {
423            // drain buffered messages
424            if let Some(message) = self.queued_messages.pop_front() {
425                return Poll::Ready(message)
426            }
427
428            while let Poll::Ready(discovery) = self.discovery.poll(cx) {
429                self.on_discovery_event(discovery);
430            }
431
432            while let Poll::Ready(action) = self.state_fetcher.poll(cx) {
433                match action {
434                    FetchAction::BlockRequest { peer_id, request } => {
435                        self.handle_block_request(peer_id, request)
436                    }
437                }
438            }
439
440            loop {
441                // need to buffer results here to make borrow checker happy
442                let mut closed_sessions = Vec::new();
443                let mut received_responses = Vec::new();
444
445                // poll all connected peers for responses
446                for (id, peer) in &mut self.active_peers {
447                    let Some(mut response) = peer.pending_response.take() else { continue };
448                    match response.poll(cx) {
449                        Poll::Ready(res) => {
450                            // check if the error is due to a closed channel to the session
451                            if res.err().is_some_and(|err| err.is_channel_closed()) {
452                                debug!(
453                                    target: "net",
454                                    ?id,
455                                    "Request canceled, response channel from session closed."
456                                );
457                                // if the channel is closed, this means the peer session is also
458                                // closed, in which case we can invoke the
459                                // [Self::on_closed_session]
460                                // immediately, preventing followup requests and propagate the
461                                // connection dropped error
462                                closed_sessions.push(*id);
463                            } else {
464                                received_responses.push((*id, res));
465                            }
466                        }
467                        Poll::Pending => {
468                            // not ready yet, store again.
469                            peer.pending_response = Some(response);
470                        }
471                    };
472                }
473
474                for peer in closed_sessions {
475                    self.on_session_closed(peer)
476                }
477
478                if received_responses.is_empty() {
479                    break;
480                }
481
482                for (peer_id, resp) in received_responses {
483                    self.on_eth_response(peer_id, resp);
484                }
485            }
486
487            // poll peer manager
488            while let Poll::Ready(action) = self.peers_manager.poll(cx) {
489                self.on_peer_action(action);
490            }
491
492            // We need to poll again tn case we have received any responses because they may have
493            // triggered follow-up requests.
494            if self.queued_messages.is_empty() {
495                return Poll::Pending
496            }
497        }
498    }
499}
500
501/// Tracks the state of a Peer with an active Session.
502///
503/// For example known blocks,so we can decide what to announce.
504#[derive(Debug)]
505pub(crate) struct ActivePeer<N: NetworkPrimitives> {
506    /// Best block of the peer.
507    pub(crate) best_hash: B256,
508    /// The capabilities of the remote peer.
509    #[expect(dead_code)]
510    pub(crate) capabilities: Arc<Capabilities>,
511    /// A communication channel directly to the session task.
512    pub(crate) request_tx: PeerRequestSender<PeerRequest<N>>,
513    /// The response receiver for a currently active request to that peer.
514    pub(crate) pending_response: Option<PeerResponse<N>>,
515    /// Blocks we know the peer has.
516    pub(crate) blocks: LruCache<B256>,
517}
518
519/// Message variants triggered by the [`NetworkState`]
520#[derive(Debug)]
521pub(crate) enum StateAction<N: NetworkPrimitives> {
522    /// Dispatch a `NewBlock` message to the peer
523    NewBlock {
524        /// Target of the message
525        peer_id: PeerId,
526        /// The `NewBlock` message
527        block: NewBlockMessage<N::Block>,
528    },
529    NewBlockHashes {
530        /// Target of the message
531        peer_id: PeerId,
532        /// `NewBlockHashes` message to send to the peer.
533        hashes: NewBlockHashes,
534    },
535    /// Create a new connection to the given node.
536    Connect { remote_addr: SocketAddr, peer_id: PeerId },
537    /// Disconnect an existing connection
538    Disconnect {
539        peer_id: PeerId,
540        /// Why the disconnect was initiated
541        reason: Option<DisconnectReason>,
542    },
543    /// Retrieved a [`ForkId`] from the peer via ENR request, See <https://eips.ethereum.org/EIPS/eip-868>
544    DiscoveredEnrForkId {
545        peer_id: PeerId,
546        /// The reported [`ForkId`] by this peer.
547        fork_id: ForkId,
548    },
549    /// A new node was found through the discovery, possibly with a `ForkId`
550    DiscoveredNode { peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId> },
551    /// A peer was added
552    PeerAdded(PeerId),
553    /// A peer was dropped
554    PeerRemoved(PeerId),
555}
556
557#[cfg(test)]
558mod tests {
559    use crate::{
560        discovery::Discovery,
561        fetch::StateFetcher,
562        peers::PeersManager,
563        state::{BlockNumReader, NetworkState},
564        PeerRequest,
565    };
566    use alloy_consensus::Header;
567    use alloy_primitives::B256;
568    use reth_eth_wire::{BlockBodies, Capabilities, Capability, EthNetworkPrimitives, EthVersion};
569    use reth_ethereum_primitives::BlockBody;
570    use reth_network_api::PeerRequestSender;
571    use reth_network_p2p::{bodies::client::BodiesClient, error::RequestError};
572    use reth_network_peers::PeerId;
573    use reth_storage_api::noop::NoopProvider;
574    use std::{
575        future::poll_fn,
576        sync::{atomic::AtomicU64, Arc},
577    };
578    use tokio::sync::mpsc;
579    use tokio_stream::{wrappers::ReceiverStream, StreamExt};
580
581    /// Returns a testing instance of the [`NetworkState`].
582    fn state() -> NetworkState<EthNetworkPrimitives> {
583        let peers = PeersManager::default();
584        let handle = peers.handle();
585        NetworkState {
586            active_peers: Default::default(),
587            peers_manager: Default::default(),
588            queued_messages: Default::default(),
589            client: BlockNumReader(Box::new(NoopProvider::default())),
590            discovery: Discovery::noop(),
591            state_fetcher: StateFetcher::new(handle, Default::default()),
592        }
593    }
594
595    fn capabilities() -> Arc<Capabilities> {
596        Arc::new(vec![Capability::from(EthVersion::Eth67)].into())
597    }
598
599    // tests that ongoing requests are answered with connection dropped if the session that received
600    // that request is drops the request object.
601    #[tokio::test(flavor = "multi_thread")]
602    async fn test_dropped_active_session() {
603        let mut state = state();
604        let client = state.fetch_client();
605
606        let peer_id = PeerId::random();
607        let (tx, session_rx) = mpsc::channel(1);
608        let peer_tx = PeerRequestSender::new(peer_id, tx);
609
610        state.on_session_activated(
611            peer_id,
612            capabilities(),
613            Arc::default(),
614            peer_tx,
615            Arc::new(AtomicU64::new(1)),
616        );
617
618        assert!(state.active_peers.contains_key(&peer_id));
619
620        let body = BlockBody { ommers: vec![Header::default()], ..Default::default() };
621
622        let body_response = body.clone();
623
624        // this mimics an active session that receives the requests from the state
625        tokio::task::spawn(async move {
626            let mut stream = ReceiverStream::new(session_rx);
627            let resp = stream.next().await.unwrap();
628            match resp {
629                PeerRequest::GetBlockBodies { response, .. } => {
630                    response.send(Ok(BlockBodies(vec![body_response]))).unwrap();
631                }
632                _ => unreachable!(),
633            }
634
635            // wait for the next request, then drop
636            let _resp = stream.next().await.unwrap();
637        });
638
639        // spawn the state as future
640        tokio::task::spawn(async move {
641            loop {
642                poll_fn(|cx| state.poll(cx)).await;
643            }
644        });
645
646        // send requests to the state via the client
647        let (peer, bodies) = client.get_block_bodies(vec![B256::random()]).await.unwrap().split();
648        assert_eq!(peer, peer_id);
649        assert_eq!(bodies, vec![body]);
650
651        let resp = client.get_block_bodies(vec![B256::random()]).await;
652        assert!(resp.is_err());
653        assert_eq!(resp.unwrap_err(), RequestError::ConnectionDropped);
654    }
655}