Skip to main content

reth_network/
state.rs

1//! Keeps track of the state of the network.
2
3use crate::{
4    cache::LruCache,
5    discovery::Discovery,
6    fetch::{BlockResponseOutcome, FetchAction, StateFetcher},
7    message::{BlockRequest, NewBlockMessage, PeerResponse, PeerResponseResult},
8    peers::{PeerAction, PeersManager},
9    session::BlockRangeInfo,
10    FetchClient,
11};
12use alloy_consensus::BlockHeader;
13use alloy_primitives::B256;
14use rand::seq::SliceRandom;
15use reth_eth_wire::{
16    BlockHashNumber, Capabilities, DisconnectReason, EthNetworkPrimitives, GetReceipts70,
17    NetworkPrimitives, NewBlockHashes, NewBlockPayload, UnifiedStatus,
18};
19use reth_ethereum_forks::ForkId;
20use reth_network_api::{DiscoveredEvent, DiscoveryEvent, PeerRequest, PeerRequestSender};
21use reth_network_p2p::receipts::client::ReceiptsResponse;
22use reth_network_peers::PeerId;
23use reth_network_types::{PeerAddr, PeerKind};
24use reth_primitives_traits::Block;
25use std::{
26    collections::{HashMap, VecDeque},
27    fmt,
28    net::{IpAddr, SocketAddr},
29    ops::Deref,
30    sync::{
31        atomic::{AtomicU64, AtomicUsize},
32        Arc,
33    },
34    task::{Context, Poll},
35};
36use tokio::sync::oneshot;
37use tracing::{debug, trace};
38
39/// Cache limit of blocks to keep track of for a single peer.
40const PEER_BLOCK_CACHE_LIMIT: u32 = 512;
41
42/// Wrapper type for the [`BlockNumReader`] trait.
43pub(crate) struct BlockNumReader(Box<dyn reth_storage_api::BlockNumReader>);
44
45impl BlockNumReader {
46    /// Create a new instance with the given reader.
47    pub fn new(reader: impl reth_storage_api::BlockNumReader + 'static) -> Self {
48        Self(Box::new(reader))
49    }
50}
51
52impl fmt::Debug for BlockNumReader {
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        f.debug_struct("BlockNumReader").field("inner", &"<dyn BlockNumReader>").finish()
55    }
56}
57
58impl Deref for BlockNumReader {
59    type Target = Box<dyn reth_storage_api::BlockNumReader>;
60
61    fn deref(&self) -> &Self::Target {
62        &self.0
63    }
64}
65
66/// The [`NetworkState`] keeps track of the state of all peers in the network.
67///
68/// This includes:
69///   - [`Discovery`]: manages the discovery protocol, essentially a stream of discovery updates
70///   - [`PeersManager`]: keeps track of connected peers and issues new outgoing connections
71///     depending on the configured capacity.
72///   - [`StateFetcher`]: streams download request (received from outside via channel) which are
73///     then send to the session of the peer.
74///
75/// This type is also responsible for responding for received request.
76#[derive(Debug)]
77pub struct NetworkState<N: NetworkPrimitives = EthNetworkPrimitives> {
78    /// All active peers and their state.
79    active_peers: HashMap<PeerId, ActivePeer<N>>,
80    /// Manages connections to peers.
81    peers_manager: PeersManager,
82    /// Buffered messages until polled.
83    queued_messages: VecDeque<StateAction<N>>,
84    /// The client type that can interact with the chain.
85    ///
86    /// This type is used to fetch the block number after we established a session and received the
87    /// [`UnifiedStatus`] block hash.
88    client: BlockNumReader,
89    /// Network discovery.
90    discovery: Discovery,
91    /// The type that handles requests.
92    ///
93    /// The fetcher streams `RLPx` related requests on a per-peer basis to this type. This type
94    /// will then queue in the request and notify the fetcher once the result has been
95    /// received.
96    state_fetcher: StateFetcher<N>,
97}
98
99impl<N: NetworkPrimitives> NetworkState<N> {
100    /// Create a new state instance with the given params
101    pub(crate) fn new(
102        client: BlockNumReader,
103        discovery: Discovery,
104        peers_manager: PeersManager,
105        num_active_peers: Arc<AtomicUsize>,
106    ) -> Self {
107        let state_fetcher = StateFetcher::new(peers_manager.handle(), num_active_peers);
108        Self {
109            active_peers: Default::default(),
110            peers_manager,
111            queued_messages: Default::default(),
112            client,
113            discovery,
114            state_fetcher,
115        }
116    }
117
118    /// Returns mutable access to the [`PeersManager`]
119    pub(crate) const fn peers_mut(&mut self) -> &mut PeersManager {
120        &mut self.peers_manager
121    }
122
123    /// Returns mutable access to the [`Discovery`]
124    pub(crate) const fn discovery_mut(&mut self) -> &mut Discovery {
125        &mut self.discovery
126    }
127
128    /// Returns access to the [`PeersManager`]
129    pub(crate) const fn peers(&self) -> &PeersManager {
130        &self.peers_manager
131    }
132
133    /// Returns a new [`FetchClient`]
134    pub(crate) fn fetch_client(&self) -> FetchClient<N> {
135        self.state_fetcher.client()
136    }
137
138    /// How many peers we're currently connected to.
139    pub fn num_active_peers(&self) -> usize {
140        self.active_peers.len()
141    }
142
143    /// Event hook for an activated session for the peer.
144    ///
145    /// Returns `Ok` if the session is valid, returns an `Err` if the session is not accepted and
146    /// should be rejected.
147    pub(crate) fn on_session_activated(
148        &mut self,
149        peer: PeerId,
150        capabilities: Arc<Capabilities>,
151        status: Arc<UnifiedStatus>,
152        request_tx: PeerRequestSender<PeerRequest<N>>,
153        timeout: Arc<AtomicU64>,
154        range_info: Option<BlockRangeInfo>,
155    ) {
156        debug_assert!(!self.active_peers.contains_key(&peer), "Already connected; not possible");
157
158        // find the corresponding block number
159        let block_number =
160            self.client.block_number(status.blockhash).ok().flatten().unwrap_or_default();
161        self.state_fetcher.new_active_peer(
162            peer,
163            status.blockhash,
164            block_number,
165            Arc::clone(&capabilities),
166            timeout,
167            range_info,
168        );
169
170        self.active_peers.insert(
171            peer,
172            ActivePeer {
173                best_hash: status.blockhash,
174                capabilities,
175                request_tx,
176                pending_response: None,
177                blocks: LruCache::new(PEER_BLOCK_CACHE_LIMIT),
178            },
179        );
180    }
181
182    /// Event hook for a disconnected session for the given peer.
183    ///
184    /// This will remove the peer from the available set of peers and close all inflight requests.
185    pub(crate) fn on_session_closed(&mut self, peer: PeerId) {
186        self.active_peers.remove(&peer);
187        self.state_fetcher.on_session_closed(&peer);
188    }
189
190    /// Starts propagating the new block to peers that haven't reported the block yet.
191    ///
192    /// This is supposed to be invoked after the block was validated.
193    ///
194    /// > It then sends the block to a small fraction of connected peers (usually the square root of
195    /// > the total number of peers) using the `NewBlock` message.
196    ///
197    /// See also <https://github.com/ethereum/devp2p/blob/master/caps/eth.md>
198    pub(crate) fn announce_new_block(&mut self, msg: NewBlockMessage<N::NewBlockPayload>) {
199        // send a `NewBlock` message to a fraction of the connected peers (square root of the total
200        // number of peers)
201        let num_propagate = (self.active_peers.len() as f64).sqrt() as u64 + 1;
202
203        let number = msg.block.block().header().number();
204        let mut count = 0;
205
206        // Shuffle to propagate to a random sample of peers on every block announcement
207        let mut peers: Vec<_> = self.active_peers.iter_mut().collect();
208        peers.shuffle(&mut rand::rng());
209
210        for (peer_id, peer) in peers {
211            if peer.blocks.contains(&msg.hash) {
212                // skip peers which already reported the block
213                continue
214            }
215
216            // Queue a `NewBlock` message for the peer
217            if count < num_propagate {
218                self.queued_messages
219                    .push_back(StateAction::NewBlock { peer_id: *peer_id, block: msg.clone() });
220
221                // update peer block info
222                if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
223                    peer.best_hash = msg.hash;
224                }
225
226                // mark the block as seen by the peer
227                peer.blocks.insert(msg.hash);
228
229                count += 1;
230            }
231
232            if count >= num_propagate {
233                break
234            }
235        }
236    }
237
238    /// Completes the block propagation process started in [`NetworkState::announce_new_block()`]
239    /// but sending `NewBlockHash` broadcast to all peers that haven't seen it yet.
240    pub(crate) fn announce_new_block_hash(&mut self, msg: NewBlockMessage<N::NewBlockPayload>) {
241        let number = msg.block.block().header().number();
242        let hashes = NewBlockHashes(vec![BlockHashNumber { hash: msg.hash, number }]);
243        for (peer_id, peer) in &mut self.active_peers {
244            if peer.blocks.contains(&msg.hash) {
245                // skip peers which already reported the block
246                continue
247            }
248
249            if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
250                peer.best_hash = msg.hash;
251            }
252
253            self.queued_messages.push_back(StateAction::NewBlockHashes {
254                peer_id: *peer_id,
255                hashes: hashes.clone(),
256            });
257        }
258    }
259
260    /// Updates the block information for the peer.
261    pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) {
262        if let Some(peer) = self.active_peers.get_mut(peer_id) {
263            peer.best_hash = hash;
264        }
265        self.state_fetcher.update_peer_block(peer_id, hash, number);
266    }
267
268    /// Invoked when a new [`ForkId`] is activated.
269    pub(crate) fn update_fork_id(&self, fork_id: ForkId) {
270        self.discovery.update_fork_id(fork_id)
271    }
272
273    /// Invoked after a `NewBlock` message was received by the peer.
274    ///
275    /// This will keep track of blocks we know a peer has
276    pub(crate) fn on_new_block(&mut self, peer_id: PeerId, hash: B256) {
277        // Mark the blocks as seen
278        if let Some(peer) = self.active_peers.get_mut(&peer_id) {
279            peer.blocks.insert(hash);
280        }
281    }
282
283    /// Invoked for a `NewBlockHashes` broadcast message.
284    pub(crate) fn on_new_block_hashes(&mut self, peer_id: PeerId, hashes: Vec<BlockHashNumber>) {
285        // Mark the blocks as seen
286        if let Some(peer) = self.active_peers.get_mut(&peer_id) {
287            peer.blocks.extend(hashes.into_iter().map(|b| b.hash));
288        }
289    }
290
291    /// Bans the [`IpAddr`] in the discovery service.
292    pub(crate) fn ban_ip_discovery(&self, ip: IpAddr) {
293        trace!(target: "net", ?ip, "Banning discovery");
294        self.discovery.ban_ip(ip)
295    }
296
297    /// Bans the [`PeerId`] and [`IpAddr`] in the discovery service.
298    pub(crate) fn ban_discovery(&self, peer_id: PeerId, ip: IpAddr) {
299        trace!(target: "net", ?peer_id, ?ip, "Banning discovery");
300        self.discovery.ban(peer_id, ip)
301    }
302
303    /// Marks the given peer as trusted.
304    pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
305        self.peers_manager.add_trusted_peer_id(peer_id)
306    }
307
308    /// Adds a peer and its address with the given kind to the peerset.
309    pub(crate) fn add_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind, addr: PeerAddr) {
310        self.peers_manager.add_peer_kind(peer_id, Some(kind), addr, None)
311    }
312
313    /// Connects a peer and its address with the given kind
314    pub(crate) fn add_and_connect(&mut self, peer_id: PeerId, kind: PeerKind, addr: PeerAddr) {
315        self.peers_manager.add_and_connect_kind(peer_id, kind, addr, None)
316    }
317
318    /// Removes a peer and its address with the given kind from the peerset.
319    pub(crate) fn remove_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind) {
320        match kind {
321            PeerKind::Basic | PeerKind::Static => self.peers_manager.remove_peer(peer_id),
322            PeerKind::Trusted => self.peers_manager.remove_peer_from_trusted_set(peer_id),
323        }
324    }
325
326    /// Event hook for events received from the discovery service.
327    fn on_discovery_event(&mut self, event: DiscoveryEvent) {
328        match event {
329            DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { peer_id, addr, fork_id }) => {
330                self.queued_messages.push_back(StateAction::DiscoveredNode {
331                    peer_id,
332                    addr,
333                    fork_id,
334                });
335            }
336            DiscoveryEvent::EnrForkId(record, fork_id) => {
337                let peer_id = record.id;
338                let tcp_addr = record.tcp_addr();
339                if tcp_addr.port() == 0 {
340                    return
341                }
342                let udp_addr = record.udp_addr();
343                let addr = PeerAddr::new(tcp_addr, Some(udp_addr));
344                self.queued_messages.push_back(StateAction::DiscoveredEnrForkId {
345                    peer_id,
346                    addr,
347                    fork_id,
348                });
349            }
350        }
351    }
352
353    /// Event hook for new actions derived from the peer management set.
354    fn on_peer_action(&mut self, action: PeerAction) {
355        match action {
356            PeerAction::Connect { peer_id, remote_addr } => {
357                self.queued_messages.push_back(StateAction::Connect { peer_id, remote_addr });
358            }
359            PeerAction::Disconnect { peer_id, reason } => {
360                self.state_fetcher.on_pending_disconnect(&peer_id);
361                self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason });
362            }
363            PeerAction::DisconnectBannedIncoming { peer_id } |
364            PeerAction::DisconnectUntrustedIncoming { peer_id } => {
365                self.state_fetcher.on_pending_disconnect(&peer_id);
366                self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason: None });
367            }
368            PeerAction::DiscoveryBanPeerId { peer_id, ip_addr } => {
369                self.ban_discovery(peer_id, ip_addr)
370            }
371            PeerAction::DiscoveryBanIp { ip_addr } => self.ban_ip_discovery(ip_addr),
372            PeerAction::PeerAdded(peer_id) => {
373                self.queued_messages.push_back(StateAction::PeerAdded(peer_id))
374            }
375            PeerAction::PeerRemoved(peer_id) => {
376                self.queued_messages.push_back(StateAction::PeerRemoved(peer_id))
377            }
378            PeerAction::BanPeer { .. } | PeerAction::UnBanPeer { .. } => {}
379        }
380    }
381
382    /// Sends The message to the peer's session and queues in a response.
383    ///
384    /// Caution: this will replace an already pending response. It's the responsibility of the
385    /// caller to select the peer.
386    fn handle_block_request(&mut self, peer_id: PeerId, request: BlockRequest) {
387        if let Some(ref mut peer) = self.active_peers.get_mut(&peer_id) {
388            let (request, response) = match request {
389                BlockRequest::GetBlockHeaders(request) => {
390                    let (response, rx) = oneshot::channel();
391                    let request = PeerRequest::GetBlockHeaders { request, response };
392                    let response = PeerResponse::BlockHeaders { response: rx };
393                    (request, response)
394                }
395                BlockRequest::GetBlockBodies(request) => {
396                    let (response, rx) = oneshot::channel();
397                    let request = PeerRequest::GetBlockBodies { request, response };
398                    let response = PeerResponse::BlockBodies { response: rx };
399                    (request, response)
400                }
401                BlockRequest::GetReceipts(request) => {
402                    if peer.capabilities.supports_eth_v70() {
403                        let (response, rx) = oneshot::channel();
404                        let request = PeerRequest::GetReceipts70 {
405                            request: GetReceipts70 {
406                                first_block_receipt_index: 0,
407                                block_hashes: request.0,
408                            },
409                            response,
410                        };
411                        let response = PeerResponse::Receipts70 { response: rx };
412                        (request, response)
413                    } else if peer.capabilities.supports_eth_v69() {
414                        let (response, rx) = oneshot::channel();
415                        let request = PeerRequest::GetReceipts69 { request, response };
416                        let response = PeerResponse::Receipts69 { response: rx };
417                        (request, response)
418                    } else {
419                        let (response, rx) = oneshot::channel();
420                        let request = PeerRequest::GetReceipts { request, response };
421                        let response = PeerResponse::Receipts { response: rx };
422                        (request, response)
423                    }
424                }
425            };
426            let _ = peer.request_tx.to_session_tx.try_send(request);
427            peer.pending_response = Some(response);
428        }
429    }
430
431    /// Handle the outcome of processed response, for example directly queue another request.
432    fn on_block_response_outcome(&mut self, outcome: BlockResponseOutcome) {
433        match outcome {
434            BlockResponseOutcome::Request(peer, request) => {
435                self.handle_block_request(peer, request);
436            }
437            BlockResponseOutcome::BadResponse(peer, reputation_change) => {
438                self.peers_manager.apply_reputation_change(&peer, reputation_change);
439            }
440        }
441    }
442
443    /// Invoked when received a response from a connected peer.
444    ///
445    /// Delegates the response result to the fetcher which may return an outcome specific
446    /// instruction that needs to be handled in [`Self::on_block_response_outcome`]. This could be
447    /// a follow-up request or an instruction to slash the peer's reputation.
448    fn on_eth_response(&mut self, peer: PeerId, resp: PeerResponseResult<N>) {
449        let outcome = match resp {
450            PeerResponseResult::BlockHeaders(res) => {
451                self.state_fetcher.on_block_headers_response(peer, res)
452            }
453            PeerResponseResult::BlockBodies(res) => {
454                self.state_fetcher.on_block_bodies_response(peer, res)
455            }
456            PeerResponseResult::Receipts(res) => {
457                // Legacy eth/66-68: strip bloom filters and wrap in ReceiptsResponse
458                let normalized = res.map(|blocks| {
459                    let receipts = blocks
460                        .into_iter()
461                        .map(|block_receipts| {
462                            block_receipts.into_iter().map(|rwb| rwb.receipt).collect()
463                        })
464                        .collect();
465                    ReceiptsResponse::new(receipts)
466                });
467                self.state_fetcher.on_receipts_response(peer, normalized)
468            }
469            PeerResponseResult::Receipts69(res) => {
470                let normalized = res.map(ReceiptsResponse::new);
471                self.state_fetcher.on_receipts_response(peer, normalized)
472            }
473            PeerResponseResult::Receipts70(res) => {
474                let normalized = res.map(ReceiptsResponse::from);
475                self.state_fetcher.on_receipts_response(peer, normalized)
476            }
477            _ => None,
478        };
479
480        if let Some(outcome) = outcome {
481            self.on_block_response_outcome(outcome);
482        }
483    }
484
485    /// Advances the state
486    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<StateAction<N>> {
487        loop {
488            // drain buffered messages
489            if let Some(message) = self.queued_messages.pop_front() {
490                return Poll::Ready(message)
491            }
492
493            while let Poll::Ready(discovery) = self.discovery.poll(cx) {
494                self.on_discovery_event(discovery);
495            }
496
497            while let Poll::Ready(action) = self.state_fetcher.poll(cx) {
498                match action {
499                    FetchAction::BlockRequest { peer_id, request } => {
500                        self.handle_block_request(peer_id, request)
501                    }
502                }
503            }
504
505            loop {
506                // need to buffer results here to make borrow checker happy
507                let mut closed_sessions = Vec::new();
508                let mut received_responses = Vec::new();
509
510                // poll all connected peers for responses
511                for (id, peer) in &mut self.active_peers {
512                    let Some(mut response) = peer.pending_response.take() else { continue };
513                    match response.poll(cx) {
514                        Poll::Ready(res) => {
515                            // check if the error is due to a closed channel to the session
516                            if res.err().is_some_and(|err| err.is_channel_closed()) {
517                                debug!(
518                                    target: "net",
519                                    ?id,
520                                    "Request canceled, response channel from session closed."
521                                );
522                                // if the channel is closed, this means the peer session is also
523                                // closed, in which case we can invoke the
524                                // [Self::on_closed_session]
525                                // immediately, preventing followup requests and propagate the
526                                // connection dropped error
527                                closed_sessions.push(*id);
528                            } else {
529                                received_responses.push((*id, res));
530                            }
531                        }
532                        Poll::Pending => {
533                            // not ready yet, store again.
534                            peer.pending_response = Some(response);
535                        }
536                    };
537                }
538
539                for peer in closed_sessions {
540                    self.on_session_closed(peer)
541                }
542
543                if received_responses.is_empty() {
544                    break;
545                }
546
547                for (peer_id, resp) in received_responses {
548                    self.on_eth_response(peer_id, resp);
549                }
550            }
551
552            // poll peer manager
553            while let Poll::Ready(action) = self.peers_manager.poll(cx) {
554                self.on_peer_action(action);
555            }
556
557            // We need to poll again in case we have received any responses because they may have
558            // triggered follow-up requests.
559            if self.queued_messages.is_empty() {
560                return Poll::Pending
561            }
562        }
563    }
564}
565
566/// Tracks the state of a Peer with an active Session.
567///
568/// For example known blocks,so we can decide what to announce.
569#[derive(Debug)]
570pub(crate) struct ActivePeer<N: NetworkPrimitives> {
571    /// Best block of the peer.
572    pub(crate) best_hash: B256,
573    /// The capabilities of the remote peer.
574    pub(crate) capabilities: Arc<Capabilities>,
575    /// A communication channel directly to the session task.
576    pub(crate) request_tx: PeerRequestSender<PeerRequest<N>>,
577    /// The response receiver for a currently active request to that peer.
578    pub(crate) pending_response: Option<PeerResponse<N>>,
579    /// Blocks we know the peer has.
580    pub(crate) blocks: LruCache<B256>,
581}
582
583/// Message variants triggered by the [`NetworkState`]
584#[derive(Debug)]
585pub(crate) enum StateAction<N: NetworkPrimitives> {
586    /// Dispatch a `NewBlock` message to the peer
587    NewBlock {
588        /// Target of the message
589        peer_id: PeerId,
590        /// The `NewBlock` message
591        block: NewBlockMessage<N::NewBlockPayload>,
592    },
593    NewBlockHashes {
594        /// Target of the message
595        peer_id: PeerId,
596        /// `NewBlockHashes` message to send to the peer.
597        hashes: NewBlockHashes,
598    },
599    /// Create a new connection to the given node.
600    Connect { remote_addr: SocketAddr, peer_id: PeerId },
601    /// Disconnect an existing connection
602    Disconnect {
603        peer_id: PeerId,
604        /// Why the disconnect was initiated
605        reason: Option<DisconnectReason>,
606    },
607    /// Retrieved a [`ForkId`] from the peer via ENR request, See <https://eips.ethereum.org/EIPS/eip-868>
608    DiscoveredEnrForkId {
609        peer_id: PeerId,
610        /// The address of the peer.
611        addr: PeerAddr,
612        /// The reported [`ForkId`] by this peer.
613        fork_id: ForkId,
614    },
615    /// A new node was found through the discovery, possibly with a `ForkId`
616    DiscoveredNode { peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId> },
617    /// A peer was added
618    PeerAdded(PeerId),
619    /// A peer was dropped
620    PeerRemoved(PeerId),
621}
622
623#[cfg(test)]
624mod tests {
625    use crate::{
626        discovery::Discovery,
627        fetch::StateFetcher,
628        peers::PeersManager,
629        state::{BlockNumReader, NetworkState},
630        PeerRequest,
631    };
632    use alloy_consensus::Header;
633    use alloy_primitives::B256;
634    use reth_eth_wire::{BlockBodies, Capabilities, Capability, EthNetworkPrimitives, EthVersion};
635    use reth_ethereum_primitives::BlockBody;
636    use reth_network_api::PeerRequestSender;
637    use reth_network_p2p::{bodies::client::BodiesClient, error::RequestError};
638    use reth_network_peers::PeerId;
639    use reth_storage_api::noop::NoopProvider;
640    use std::{
641        future::poll_fn,
642        sync::{atomic::AtomicU64, Arc},
643    };
644    use tokio::sync::mpsc;
645    use tokio_stream::{wrappers::ReceiverStream, StreamExt};
646
647    /// Returns a testing instance of the [`NetworkState`].
648    fn state() -> NetworkState<EthNetworkPrimitives> {
649        let peers = PeersManager::default();
650        let handle = peers.handle();
651        NetworkState {
652            active_peers: Default::default(),
653            peers_manager: Default::default(),
654            queued_messages: Default::default(),
655            client: BlockNumReader(Box::new(NoopProvider::default())),
656            discovery: Discovery::noop(),
657            state_fetcher: StateFetcher::new(handle, Default::default()),
658        }
659    }
660
661    fn capabilities() -> Arc<Capabilities> {
662        Arc::new(vec![Capability::from(EthVersion::Eth67)].into())
663    }
664
665    // tests that ongoing requests are answered with connection dropped if the session that received
666    // that request is drops the request object.
667    #[tokio::test(flavor = "multi_thread")]
668    async fn test_dropped_active_session() {
669        let mut state = state();
670        let client = state.fetch_client();
671
672        let peer_id = PeerId::random();
673        let (tx, session_rx) = mpsc::channel(1);
674        let peer_tx = PeerRequestSender::new(peer_id, tx);
675
676        state.on_session_activated(
677            peer_id,
678            capabilities(),
679            Arc::default(),
680            peer_tx,
681            Arc::new(AtomicU64::new(1)),
682            None,
683        );
684
685        assert!(state.active_peers.contains_key(&peer_id));
686
687        let body = BlockBody { ommers: vec![Header::default()], ..Default::default() };
688
689        let body_response = body.clone();
690
691        // this mimics an active session that receives the requests from the state
692        tokio::task::spawn(async move {
693            let mut stream = ReceiverStream::new(session_rx);
694            let resp = stream.next().await.unwrap();
695            match resp {
696                PeerRequest::GetBlockBodies { response, .. } => {
697                    response.send(Ok(BlockBodies(vec![body_response]))).unwrap();
698                }
699                _ => unreachable!(),
700            }
701
702            // wait for the next request, then drop
703            let _resp = stream.next().await.unwrap();
704        });
705
706        // spawn the state as future
707        tokio::task::spawn(async move {
708            loop {
709                poll_fn(|cx| state.poll(cx)).await;
710            }
711        });
712
713        // send requests to the state via the client
714        let (peer, bodies) = client.get_block_bodies(vec![B256::random()]).await.unwrap().split();
715        assert_eq!(peer, peer_id);
716        assert_eq!(bodies, vec![body]);
717
718        let resp = client.get_block_bodies(vec![B256::random()]).await;
719        assert!(resp.is_err());
720        assert_eq!(resp.unwrap_err(), RequestError::ConnectionDropped);
721    }
722}