reth_network/
state.rs

1//! Keeps track of the state of the network.
2
3use crate::{
4    cache::LruCache,
5    discovery::Discovery,
6    fetch::{BlockResponseOutcome, FetchAction, StateFetcher},
7    message::{BlockRequest, NewBlockMessage, PeerResponse, PeerResponseResult},
8    peers::{PeerAction, PeersManager},
9    session::BlockRangeInfo,
10    FetchClient,
11};
12use alloy_consensus::BlockHeader;
13use alloy_primitives::B256;
14use rand::seq::SliceRandom;
15use reth_eth_wire::{
16    BlockHashNumber, Capabilities, DisconnectReason, EthNetworkPrimitives, NetworkPrimitives,
17    NewBlockHashes, NewBlockPayload, UnifiedStatus,
18};
19use reth_ethereum_forks::ForkId;
20use reth_network_api::{DiscoveredEvent, DiscoveryEvent, PeerRequest, PeerRequestSender};
21use reth_network_peers::PeerId;
22use reth_network_types::{PeerAddr, PeerKind};
23use reth_primitives_traits::Block;
24use std::{
25    collections::{HashMap, VecDeque},
26    fmt,
27    net::{IpAddr, SocketAddr},
28    ops::Deref,
29    sync::{
30        atomic::{AtomicU64, AtomicUsize},
31        Arc,
32    },
33    task::{Context, Poll},
34};
35use tokio::sync::oneshot;
36use tracing::{debug, trace};
37
38/// Cache limit of blocks to keep track of for a single peer.
39const PEER_BLOCK_CACHE_LIMIT: u32 = 512;
40
41/// Wrapper type for the [`BlockNumReader`] trait.
42pub(crate) struct BlockNumReader(Box<dyn reth_storage_api::BlockNumReader>);
43
44impl BlockNumReader {
45    /// Create a new instance with the given reader.
46    pub fn new(reader: impl reth_storage_api::BlockNumReader + 'static) -> Self {
47        Self(Box::new(reader))
48    }
49}
50
51impl fmt::Debug for BlockNumReader {
52    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53        f.debug_struct("BlockNumReader").field("inner", &"<dyn BlockNumReader>").finish()
54    }
55}
56
57impl Deref for BlockNumReader {
58    type Target = Box<dyn reth_storage_api::BlockNumReader>;
59
60    fn deref(&self) -> &Self::Target {
61        &self.0
62    }
63}
64
65/// The [`NetworkState`] keeps track of the state of all peers in the network.
66///
67/// This includes:
68///   - [`Discovery`]: manages the discovery protocol, essentially a stream of discovery updates
69///   - [`PeersManager`]: keeps track of connected peers and issues new outgoing connections
70///     depending on the configured capacity.
71///   - [`StateFetcher`]: streams download request (received from outside via channel) which are
72///     then send to the session of the peer.
73///
74/// This type is also responsible for responding for received request.
75#[derive(Debug)]
76pub struct NetworkState<N: NetworkPrimitives = EthNetworkPrimitives> {
77    /// All active peers and their state.
78    active_peers: HashMap<PeerId, ActivePeer<N>>,
79    /// Manages connections to peers.
80    peers_manager: PeersManager,
81    /// Buffered messages until polled.
82    queued_messages: VecDeque<StateAction<N>>,
83    /// The client type that can interact with the chain.
84    ///
85    /// This type is used to fetch the block number after we established a session and received the
86    /// [`UnifiedStatus`] block hash.
87    client: BlockNumReader,
88    /// Network discovery.
89    discovery: Discovery,
90    /// The type that handles requests.
91    ///
92    /// The fetcher streams `RLPx` related requests on a per-peer basis to this type. This type
93    /// will then queue in the request and notify the fetcher once the result has been
94    /// received.
95    state_fetcher: StateFetcher<N>,
96}
97
98impl<N: NetworkPrimitives> NetworkState<N> {
99    /// Create a new state instance with the given params
100    pub(crate) fn new(
101        client: BlockNumReader,
102        discovery: Discovery,
103        peers_manager: PeersManager,
104        num_active_peers: Arc<AtomicUsize>,
105    ) -> Self {
106        let state_fetcher = StateFetcher::new(peers_manager.handle(), num_active_peers);
107        Self {
108            active_peers: Default::default(),
109            peers_manager,
110            queued_messages: Default::default(),
111            client,
112            discovery,
113            state_fetcher,
114        }
115    }
116
117    /// Returns mutable access to the [`PeersManager`]
118    pub(crate) const fn peers_mut(&mut self) -> &mut PeersManager {
119        &mut self.peers_manager
120    }
121
122    /// Returns mutable access to the [`Discovery`]
123    pub(crate) const fn discovery_mut(&mut self) -> &mut Discovery {
124        &mut self.discovery
125    }
126
127    /// Returns access to the [`PeersManager`]
128    pub(crate) const fn peers(&self) -> &PeersManager {
129        &self.peers_manager
130    }
131
132    /// Returns a new [`FetchClient`]
133    pub(crate) fn fetch_client(&self) -> FetchClient<N> {
134        self.state_fetcher.client()
135    }
136
137    /// How many peers we're currently connected to.
138    pub fn num_active_peers(&self) -> usize {
139        self.active_peers.len()
140    }
141
142    /// Event hook for an activated session for the peer.
143    ///
144    /// Returns `Ok` if the session is valid, returns an `Err` if the session is not accepted and
145    /// should be rejected.
146    pub(crate) fn on_session_activated(
147        &mut self,
148        peer: PeerId,
149        capabilities: Arc<Capabilities>,
150        status: Arc<UnifiedStatus>,
151        request_tx: PeerRequestSender<PeerRequest<N>>,
152        timeout: Arc<AtomicU64>,
153        range_info: Option<BlockRangeInfo>,
154    ) {
155        debug_assert!(!self.active_peers.contains_key(&peer), "Already connected; not possible");
156
157        // find the corresponding block number
158        let block_number =
159            self.client.block_number(status.blockhash).ok().flatten().unwrap_or_default();
160        self.state_fetcher.new_active_peer(
161            peer,
162            status.blockhash,
163            block_number,
164            Arc::clone(&capabilities),
165            timeout,
166            range_info,
167        );
168
169        self.active_peers.insert(
170            peer,
171            ActivePeer {
172                best_hash: status.blockhash,
173                capabilities,
174                request_tx,
175                pending_response: None,
176                blocks: LruCache::new(PEER_BLOCK_CACHE_LIMIT),
177            },
178        );
179    }
180
181    /// Event hook for a disconnected session for the given peer.
182    ///
183    /// This will remove the peer from the available set of peers and close all inflight requests.
184    pub(crate) fn on_session_closed(&mut self, peer: PeerId) {
185        self.active_peers.remove(&peer);
186        self.state_fetcher.on_session_closed(&peer);
187    }
188
189    /// Starts propagating the new block to peers that haven't reported the block yet.
190    ///
191    /// This is supposed to be invoked after the block was validated.
192    ///
193    /// > It then sends the block to a small fraction of connected peers (usually the square root of
194    /// > the total number of peers) using the `NewBlock` message.
195    ///
196    /// See also <https://github.com/ethereum/devp2p/blob/master/caps/eth.md>
197    pub(crate) fn announce_new_block(&mut self, msg: NewBlockMessage<N::NewBlockPayload>) {
198        // send a `NewBlock` message to a fraction of the connected peers (square root of the total
199        // number of peers)
200        let num_propagate = (self.active_peers.len() as f64).sqrt() as u64 + 1;
201
202        let number = msg.block.block().header().number();
203        let mut count = 0;
204
205        // Shuffle to propagate to a random sample of peers on every block announcement
206        let mut peers: Vec<_> = self.active_peers.iter_mut().collect();
207        peers.shuffle(&mut rand::rng());
208
209        for (peer_id, peer) in peers {
210            if peer.blocks.contains(&msg.hash) {
211                // skip peers which already reported the block
212                continue
213            }
214
215            // Queue a `NewBlock` message for the peer
216            if count < num_propagate {
217                self.queued_messages
218                    .push_back(StateAction::NewBlock { peer_id: *peer_id, block: msg.clone() });
219
220                // update peer block info
221                if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
222                    peer.best_hash = msg.hash;
223                }
224
225                // mark the block as seen by the peer
226                peer.blocks.insert(msg.hash);
227
228                count += 1;
229            }
230
231            if count >= num_propagate {
232                break
233            }
234        }
235    }
236
237    /// Completes the block propagation process started in [`NetworkState::announce_new_block()`]
238    /// but sending `NewBlockHash` broadcast to all peers that haven't seen it yet.
239    pub(crate) fn announce_new_block_hash(&mut self, msg: NewBlockMessage<N::NewBlockPayload>) {
240        let number = msg.block.block().header().number();
241        let hashes = NewBlockHashes(vec![BlockHashNumber { hash: msg.hash, number }]);
242        for (peer_id, peer) in &mut self.active_peers {
243            if peer.blocks.contains(&msg.hash) {
244                // skip peers which already reported the block
245                continue
246            }
247
248            if self.state_fetcher.update_peer_block(peer_id, msg.hash, number) {
249                peer.best_hash = msg.hash;
250            }
251
252            self.queued_messages.push_back(StateAction::NewBlockHashes {
253                peer_id: *peer_id,
254                hashes: hashes.clone(),
255            });
256        }
257    }
258
259    /// Updates the block information for the peer.
260    pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) {
261        if let Some(peer) = self.active_peers.get_mut(peer_id) {
262            peer.best_hash = hash;
263        }
264        self.state_fetcher.update_peer_block(peer_id, hash, number);
265    }
266
267    /// Invoked when a new [`ForkId`] is activated.
268    pub(crate) fn update_fork_id(&self, fork_id: ForkId) {
269        self.discovery.update_fork_id(fork_id)
270    }
271
272    /// Invoked after a `NewBlock` message was received by the peer.
273    ///
274    /// This will keep track of blocks we know a peer has
275    pub(crate) fn on_new_block(&mut self, peer_id: PeerId, hash: B256) {
276        // Mark the blocks as seen
277        if let Some(peer) = self.active_peers.get_mut(&peer_id) {
278            peer.blocks.insert(hash);
279        }
280    }
281
282    /// Invoked for a `NewBlockHashes` broadcast message.
283    pub(crate) fn on_new_block_hashes(&mut self, peer_id: PeerId, hashes: Vec<BlockHashNumber>) {
284        // Mark the blocks as seen
285        if let Some(peer) = self.active_peers.get_mut(&peer_id) {
286            peer.blocks.extend(hashes.into_iter().map(|b| b.hash));
287        }
288    }
289
290    /// Bans the [`IpAddr`] in the discovery service.
291    pub(crate) fn ban_ip_discovery(&self, ip: IpAddr) {
292        trace!(target: "net", ?ip, "Banning discovery");
293        self.discovery.ban_ip(ip)
294    }
295
296    /// Bans the [`PeerId`] and [`IpAddr`] in the discovery service.
297    pub(crate) fn ban_discovery(&self, peer_id: PeerId, ip: IpAddr) {
298        trace!(target: "net", ?peer_id, ?ip, "Banning discovery");
299        self.discovery.ban(peer_id, ip)
300    }
301
302    /// Marks the given peer as trusted.
303    pub(crate) fn add_trusted_peer_id(&mut self, peer_id: PeerId) {
304        self.peers_manager.add_trusted_peer_id(peer_id)
305    }
306
307    /// Adds a peer and its address with the given kind to the peerset.
308    pub(crate) fn add_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind, addr: PeerAddr) {
309        self.peers_manager.add_peer_kind(peer_id, kind, addr, None)
310    }
311
312    /// Connects a peer and its address with the given kind
313    pub(crate) fn add_and_connect(&mut self, peer_id: PeerId, kind: PeerKind, addr: PeerAddr) {
314        self.peers_manager.add_and_connect_kind(peer_id, kind, addr, None)
315    }
316
317    /// Removes a peer and its address with the given kind from the peerset.
318    pub(crate) fn remove_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind) {
319        match kind {
320            PeerKind::Basic | PeerKind::Static => self.peers_manager.remove_peer(peer_id),
321            PeerKind::Trusted => self.peers_manager.remove_peer_from_trusted_set(peer_id),
322        }
323    }
324
325    /// Event hook for events received from the discovery service.
326    fn on_discovery_event(&mut self, event: DiscoveryEvent) {
327        match event {
328            DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { peer_id, addr, fork_id }) => {
329                self.queued_messages.push_back(StateAction::DiscoveredNode {
330                    peer_id,
331                    addr,
332                    fork_id,
333                });
334            }
335            DiscoveryEvent::EnrForkId(peer_id, fork_id) => {
336                self.queued_messages
337                    .push_back(StateAction::DiscoveredEnrForkId { peer_id, fork_id });
338            }
339        }
340    }
341
342    /// Event hook for new actions derived from the peer management set.
343    fn on_peer_action(&mut self, action: PeerAction) {
344        match action {
345            PeerAction::Connect { peer_id, remote_addr } => {
346                self.queued_messages.push_back(StateAction::Connect { peer_id, remote_addr });
347            }
348            PeerAction::Disconnect { peer_id, reason } => {
349                self.state_fetcher.on_pending_disconnect(&peer_id);
350                self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason });
351            }
352            PeerAction::DisconnectBannedIncoming { peer_id } |
353            PeerAction::DisconnectUntrustedIncoming { peer_id } => {
354                self.state_fetcher.on_pending_disconnect(&peer_id);
355                self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason: None });
356            }
357            PeerAction::DiscoveryBanPeerId { peer_id, ip_addr } => {
358                self.ban_discovery(peer_id, ip_addr)
359            }
360            PeerAction::DiscoveryBanIp { ip_addr } => self.ban_ip_discovery(ip_addr),
361            PeerAction::PeerAdded(peer_id) => {
362                self.queued_messages.push_back(StateAction::PeerAdded(peer_id))
363            }
364            PeerAction::PeerRemoved(peer_id) => {
365                self.queued_messages.push_back(StateAction::PeerRemoved(peer_id))
366            }
367            PeerAction::BanPeer { .. } | PeerAction::UnBanPeer { .. } => {}
368        }
369    }
370
371    /// Sends The message to the peer's session and queues in a response.
372    ///
373    /// Caution: this will replace an already pending response. It's the responsibility of the
374    /// caller to select the peer.
375    fn handle_block_request(&mut self, peer: PeerId, request: BlockRequest) {
376        if let Some(ref mut peer) = self.active_peers.get_mut(&peer) {
377            let (request, response) = match request {
378                BlockRequest::GetBlockHeaders(request) => {
379                    let (response, rx) = oneshot::channel();
380                    let request = PeerRequest::GetBlockHeaders { request, response };
381                    let response = PeerResponse::BlockHeaders { response: rx };
382                    (request, response)
383                }
384                BlockRequest::GetBlockBodies(request) => {
385                    let (response, rx) = oneshot::channel();
386                    let request = PeerRequest::GetBlockBodies { request, response };
387                    let response = PeerResponse::BlockBodies { response: rx };
388                    (request, response)
389                }
390            };
391            let _ = peer.request_tx.to_session_tx.try_send(request);
392            peer.pending_response = Some(response);
393        }
394    }
395
396    /// Handle the outcome of processed response, for example directly queue another request.
397    fn on_block_response_outcome(&mut self, outcome: BlockResponseOutcome) {
398        match outcome {
399            BlockResponseOutcome::Request(peer, request) => {
400                self.handle_block_request(peer, request);
401            }
402            BlockResponseOutcome::BadResponse(peer, reputation_change) => {
403                self.peers_manager.apply_reputation_change(&peer, reputation_change);
404            }
405        }
406    }
407
408    /// Invoked when received a response from a connected peer.
409    ///
410    /// Delegates the response result to the fetcher which may return an outcome specific
411    /// instruction that needs to be handled in [`Self::on_block_response_outcome`]. This could be
412    /// a follow-up request or an instruction to slash the peer's reputation.
413    fn on_eth_response(&mut self, peer: PeerId, resp: PeerResponseResult<N>) {
414        let outcome = match resp {
415            PeerResponseResult::BlockHeaders(res) => {
416                self.state_fetcher.on_block_headers_response(peer, res)
417            }
418            PeerResponseResult::BlockBodies(res) => {
419                self.state_fetcher.on_block_bodies_response(peer, res)
420            }
421            _ => None,
422        };
423
424        if let Some(outcome) = outcome {
425            self.on_block_response_outcome(outcome);
426        }
427    }
428
429    /// Advances the state
430    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<StateAction<N>> {
431        loop {
432            // drain buffered messages
433            if let Some(message) = self.queued_messages.pop_front() {
434                return Poll::Ready(message)
435            }
436
437            while let Poll::Ready(discovery) = self.discovery.poll(cx) {
438                self.on_discovery_event(discovery);
439            }
440
441            while let Poll::Ready(action) = self.state_fetcher.poll(cx) {
442                match action {
443                    FetchAction::BlockRequest { peer_id, request } => {
444                        self.handle_block_request(peer_id, request)
445                    }
446                }
447            }
448
449            loop {
450                // need to buffer results here to make borrow checker happy
451                let mut closed_sessions = Vec::new();
452                let mut received_responses = Vec::new();
453
454                // poll all connected peers for responses
455                for (id, peer) in &mut self.active_peers {
456                    let Some(mut response) = peer.pending_response.take() else { continue };
457                    match response.poll(cx) {
458                        Poll::Ready(res) => {
459                            // check if the error is due to a closed channel to the session
460                            if res.err().is_some_and(|err| err.is_channel_closed()) {
461                                debug!(
462                                    target: "net",
463                                    ?id,
464                                    "Request canceled, response channel from session closed."
465                                );
466                                // if the channel is closed, this means the peer session is also
467                                // closed, in which case we can invoke the
468                                // [Self::on_closed_session]
469                                // immediately, preventing followup requests and propagate the
470                                // connection dropped error
471                                closed_sessions.push(*id);
472                            } else {
473                                received_responses.push((*id, res));
474                            }
475                        }
476                        Poll::Pending => {
477                            // not ready yet, store again.
478                            peer.pending_response = Some(response);
479                        }
480                    };
481                }
482
483                for peer in closed_sessions {
484                    self.on_session_closed(peer)
485                }
486
487                if received_responses.is_empty() {
488                    break;
489                }
490
491                for (peer_id, resp) in received_responses {
492                    self.on_eth_response(peer_id, resp);
493                }
494            }
495
496            // poll peer manager
497            while let Poll::Ready(action) = self.peers_manager.poll(cx) {
498                self.on_peer_action(action);
499            }
500
501            // We need to poll again in case we have received any responses because they may have
502            // triggered follow-up requests.
503            if self.queued_messages.is_empty() {
504                return Poll::Pending
505            }
506        }
507    }
508}
509
510/// Tracks the state of a Peer with an active Session.
511///
512/// For example known blocks,so we can decide what to announce.
513#[derive(Debug)]
514pub(crate) struct ActivePeer<N: NetworkPrimitives> {
515    /// Best block of the peer.
516    pub(crate) best_hash: B256,
517    /// The capabilities of the remote peer.
518    #[expect(dead_code)]
519    pub(crate) capabilities: Arc<Capabilities>,
520    /// A communication channel directly to the session task.
521    pub(crate) request_tx: PeerRequestSender<PeerRequest<N>>,
522    /// The response receiver for a currently active request to that peer.
523    pub(crate) pending_response: Option<PeerResponse<N>>,
524    /// Blocks we know the peer has.
525    pub(crate) blocks: LruCache<B256>,
526}
527
528/// Message variants triggered by the [`NetworkState`]
529#[derive(Debug)]
530pub(crate) enum StateAction<N: NetworkPrimitives> {
531    /// Dispatch a `NewBlock` message to the peer
532    NewBlock {
533        /// Target of the message
534        peer_id: PeerId,
535        /// The `NewBlock` message
536        block: NewBlockMessage<N::NewBlockPayload>,
537    },
538    NewBlockHashes {
539        /// Target of the message
540        peer_id: PeerId,
541        /// `NewBlockHashes` message to send to the peer.
542        hashes: NewBlockHashes,
543    },
544    /// Create a new connection to the given node.
545    Connect { remote_addr: SocketAddr, peer_id: PeerId },
546    /// Disconnect an existing connection
547    Disconnect {
548        peer_id: PeerId,
549        /// Why the disconnect was initiated
550        reason: Option<DisconnectReason>,
551    },
552    /// Retrieved a [`ForkId`] from the peer via ENR request, See <https://eips.ethereum.org/EIPS/eip-868>
553    DiscoveredEnrForkId {
554        peer_id: PeerId,
555        /// The reported [`ForkId`] by this peer.
556        fork_id: ForkId,
557    },
558    /// A new node was found through the discovery, possibly with a `ForkId`
559    DiscoveredNode { peer_id: PeerId, addr: PeerAddr, fork_id: Option<ForkId> },
560    /// A peer was added
561    PeerAdded(PeerId),
562    /// A peer was dropped
563    PeerRemoved(PeerId),
564}
565
566#[cfg(test)]
567mod tests {
568    use crate::{
569        discovery::Discovery,
570        fetch::StateFetcher,
571        peers::PeersManager,
572        state::{BlockNumReader, NetworkState},
573        PeerRequest,
574    };
575    use alloy_consensus::Header;
576    use alloy_primitives::B256;
577    use reth_eth_wire::{BlockBodies, Capabilities, Capability, EthNetworkPrimitives, EthVersion};
578    use reth_ethereum_primitives::BlockBody;
579    use reth_network_api::PeerRequestSender;
580    use reth_network_p2p::{bodies::client::BodiesClient, error::RequestError};
581    use reth_network_peers::PeerId;
582    use reth_storage_api::noop::NoopProvider;
583    use std::{
584        future::poll_fn,
585        sync::{atomic::AtomicU64, Arc},
586    };
587    use tokio::sync::mpsc;
588    use tokio_stream::{wrappers::ReceiverStream, StreamExt};
589
590    /// Returns a testing instance of the [`NetworkState`].
591    fn state() -> NetworkState<EthNetworkPrimitives> {
592        let peers = PeersManager::default();
593        let handle = peers.handle();
594        NetworkState {
595            active_peers: Default::default(),
596            peers_manager: Default::default(),
597            queued_messages: Default::default(),
598            client: BlockNumReader(Box::new(NoopProvider::default())),
599            discovery: Discovery::noop(),
600            state_fetcher: StateFetcher::new(handle, Default::default()),
601        }
602    }
603
604    fn capabilities() -> Arc<Capabilities> {
605        Arc::new(vec![Capability::from(EthVersion::Eth67)].into())
606    }
607
608    // tests that ongoing requests are answered with connection dropped if the session that received
609    // that request is drops the request object.
610    #[tokio::test(flavor = "multi_thread")]
611    async fn test_dropped_active_session() {
612        let mut state = state();
613        let client = state.fetch_client();
614
615        let peer_id = PeerId::random();
616        let (tx, session_rx) = mpsc::channel(1);
617        let peer_tx = PeerRequestSender::new(peer_id, tx);
618
619        state.on_session_activated(
620            peer_id,
621            capabilities(),
622            Arc::default(),
623            peer_tx,
624            Arc::new(AtomicU64::new(1)),
625            None,
626        );
627
628        assert!(state.active_peers.contains_key(&peer_id));
629
630        let body = BlockBody { ommers: vec![Header::default()], ..Default::default() };
631
632        let body_response = body.clone();
633
634        // this mimics an active session that receives the requests from the state
635        tokio::task::spawn(async move {
636            let mut stream = ReceiverStream::new(session_rx);
637            let resp = stream.next().await.unwrap();
638            match resp {
639                PeerRequest::GetBlockBodies { response, .. } => {
640                    response.send(Ok(BlockBodies(vec![body_response]))).unwrap();
641                }
642                _ => unreachable!(),
643            }
644
645            // wait for the next request, then drop
646            let _resp = stream.next().await.unwrap();
647        });
648
649        // spawn the state as future
650        tokio::task::spawn(async move {
651            loop {
652                poll_fn(|cx| state.poll(cx)).await;
653            }
654        });
655
656        // send requests to the state via the client
657        let (peer, bodies) = client.get_block_bodies(vec![B256::random()]).await.unwrap().split();
658        assert_eq!(peer, peer_id);
659        assert_eq!(bodies, vec![body]);
660
661        let resp = client.get_block_bodies(vec![B256::random()]).await;
662        assert!(resp.is_err());
663        assert_eq!(resp.unwrap_err(), RequestError::ConnectionDropped);
664    }
665}