Skip to main content

reth_network/fetch/
mod.rs

1//! Fetch data from the network.
2
3mod client;
4
5pub use client::FetchClient;
6
7use crate::{message::BlockRequest, session::BlockRangeInfo};
8use alloy_primitives::B256;
9use futures::StreamExt;
10use reth_eth_wire::{
11    Capabilities, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetReceipts,
12    NetworkPrimitives,
13};
14use reth_network_api::test_utils::PeersHandle;
15use reth_network_p2p::{
16    error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult},
17    headers::client::HeadersRequest,
18    priority::Priority,
19    receipts::client::ReceiptsResponse,
20};
21use reth_network_peers::PeerId;
22use reth_network_types::ReputationChangeKind;
23use std::{
24    collections::{HashMap, VecDeque},
25    ops::RangeInclusive,
26    sync::{
27        atomic::{AtomicU64, AtomicUsize, Ordering},
28        Arc,
29    },
30    task::{Context, Poll},
31};
32use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
33use tokio_stream::wrappers::UnboundedReceiverStream;
34
35type InflightHeadersRequest<H> = Request<HeadersRequest, PeerRequestResult<Vec<H>>>;
36type InflightBodiesRequest<B> = Request<(), PeerRequestResult<Vec<B>>>;
37type InflightReceiptsRequest<R> = Request<(), PeerRequestResult<ReceiptsResponse<R>>>;
38
39/// Manages data fetching operations.
40///
41/// This type is hooked into the staged sync pipeline and delegates download request to available
42/// peers and sends the response once ready.
43///
44/// This type maintains a list of connected peers that are available for requests.
45#[derive(Debug)]
46pub struct StateFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
47    /// Currently active [`GetBlockHeaders`] requests
48    inflight_headers_requests: HashMap<PeerId, InflightHeadersRequest<N::BlockHeader>>,
49    /// Currently active [`GetBlockBodies`] requests
50    inflight_bodies_requests: HashMap<PeerId, InflightBodiesRequest<N::BlockBody>>,
51    /// Currently active `GetReceipts` requests
52    inflight_receipts_requests: HashMap<PeerId, InflightReceiptsRequest<N::Receipt>>,
53    /// The list of _available_ peers for requests.
54    peers: HashMap<PeerId, Peer>,
55    /// The handle to the peers manager
56    peers_handle: PeersHandle,
57    /// Number of active peer sessions the node's currently handling.
58    num_active_peers: Arc<AtomicUsize>,
59    /// Requests queued for processing
60    queued_requests: VecDeque<DownloadRequest<N>>,
61    /// Receiver for new incoming download requests
62    download_requests_rx: UnboundedReceiverStream<DownloadRequest<N>>,
63    /// Sender for download requests, used to detach a [`FetchClient`]
64    download_requests_tx: UnboundedSender<DownloadRequest<N>>,
65}
66
67// === impl StateSyncer ===
68
69impl<N: NetworkPrimitives> StateFetcher<N> {
70    pub(crate) fn new(peers_handle: PeersHandle, num_active_peers: Arc<AtomicUsize>) -> Self {
71        let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel();
72        Self {
73            inflight_headers_requests: Default::default(),
74            inflight_bodies_requests: Default::default(),
75            inflight_receipts_requests: Default::default(),
76            peers: Default::default(),
77            peers_handle,
78            num_active_peers,
79            queued_requests: Default::default(),
80            download_requests_rx: UnboundedReceiverStream::new(download_requests_rx),
81            download_requests_tx,
82        }
83    }
84
85    /// Invoked when connected to a new peer.
86    pub(crate) fn new_active_peer(
87        &mut self,
88        peer_id: PeerId,
89        best_hash: B256,
90        best_number: u64,
91        capabilities: Arc<Capabilities>,
92        timeout: Arc<AtomicU64>,
93        range_info: Option<BlockRangeInfo>,
94    ) {
95        self.peers.insert(
96            peer_id,
97            Peer {
98                state: PeerState::Idle,
99                best_hash,
100                best_number,
101                capabilities,
102                timeout,
103                last_response_likely_bad: false,
104                range_info,
105            },
106        );
107    }
108
109    /// Removes the peer from the peer list, after which it is no longer available for future
110    /// requests.
111    ///
112    /// Invoked when an active session was closed.
113    ///
114    /// This cancels also inflight request and sends an error to the receiver.
115    pub(crate) fn on_session_closed(&mut self, peer: &PeerId) {
116        self.peers.remove(peer);
117        if let Some(req) = self.inflight_headers_requests.remove(peer) {
118            let _ = req.response.send(Err(RequestError::ConnectionDropped));
119        }
120        if let Some(req) = self.inflight_bodies_requests.remove(peer) {
121            let _ = req.response.send(Err(RequestError::ConnectionDropped));
122        }
123        if let Some(req) = self.inflight_receipts_requests.remove(peer) {
124            let _ = req.response.send(Err(RequestError::ConnectionDropped));
125        }
126    }
127
128    /// Updates the block information for the peer.
129    ///
130    /// Returns `true` if this a newer block
131    pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) -> bool {
132        if let Some(peer) = self.peers.get_mut(peer_id) &&
133            number > peer.best_number
134        {
135            peer.best_hash = hash;
136            peer.best_number = number;
137            return true
138        }
139        false
140    }
141
142    /// Invoked when an active session is about to be disconnected.
143    pub(crate) fn on_pending_disconnect(&mut self, peer_id: &PeerId) {
144        if let Some(peer) = self.peers.get_mut(peer_id) {
145            peer.state = PeerState::Closing;
146        }
147    }
148
149    /// Returns the _next_ idle peer that's ready to accept a request,
150    /// prioritizing those with the lowest timeout/latency and those that recently responded with
151    /// adequate data. Additionally, if full blocks are required this prioritizes peers that have
152    /// full history available
153    fn next_best_peer(&self, requirement: BestPeerRequirements) -> Option<PeerId> {
154        let mut idle = self.peers.iter().filter(|(_, peer)| peer.state.is_idle());
155
156        let mut best_peer = idle.next()?;
157
158        for maybe_better in idle {
159            // replace best peer if our current best peer sent us a bad response last time
160            if best_peer.1.last_response_likely_bad && !maybe_better.1.last_response_likely_bad {
161                best_peer = maybe_better;
162                continue
163            }
164
165            // replace best peer if this peer meets the requirements better
166            if maybe_better.1.is_better(best_peer.1, &requirement) {
167                best_peer = maybe_better;
168                continue
169            }
170
171            // replace best peer if this peer has better rtt and both have same range quality
172            if maybe_better.1.timeout() < best_peer.1.timeout() &&
173                !maybe_better.1.last_response_likely_bad
174            {
175                best_peer = maybe_better;
176            }
177        }
178
179        Some(*best_peer.0)
180    }
181
182    /// Returns the next action to return
183    fn poll_action(&mut self) -> PollAction {
184        // we only check and not pop here since we don't know yet whether a peer is available.
185        if self.queued_requests.is_empty() {
186            return PollAction::NoRequests
187        }
188
189        let request = self.queued_requests.pop_front().expect("not empty");
190        let Some(peer_id) = self.next_best_peer(request.best_peer_requirements()) else {
191            // need to put back the request
192            self.queued_requests.push_front(request);
193            return PollAction::NoPeersAvailable
194        };
195
196        let request = self.prepare_block_request(peer_id, request);
197
198        PollAction::Ready(FetchAction::BlockRequest { peer_id, request })
199    }
200
201    /// Advance the state the syncer
202    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchAction> {
203        // drain buffered actions first
204        loop {
205            let no_peers_available = match self.poll_action() {
206                PollAction::Ready(action) => return Poll::Ready(action),
207                PollAction::NoRequests => false,
208                PollAction::NoPeersAvailable => true,
209            };
210
211            loop {
212                // poll incoming requests
213                match self.download_requests_rx.poll_next_unpin(cx) {
214                    Poll::Ready(Some(request)) => match request.get_priority() {
215                        Priority::High => {
216                            // find the first normal request and queue before, add this request to
217                            // the back of the high-priority queue
218                            let pos = self
219                                .queued_requests
220                                .iter()
221                                .position(|req| req.is_normal_priority())
222                                .unwrap_or(0);
223                            self.queued_requests.insert(pos, request);
224                        }
225                        Priority::Normal => {
226                            self.queued_requests.push_back(request);
227                        }
228                    },
229                    Poll::Ready(None) => {
230                        unreachable!("channel can't close")
231                    }
232                    Poll::Pending => break,
233                }
234            }
235
236            if self.queued_requests.is_empty() || no_peers_available {
237                return Poll::Pending
238            }
239        }
240    }
241
242    /// Handles a new request to a peer.
243    ///
244    /// Caution: this assumes the peer exists and is idle
245    fn prepare_block_request(&mut self, peer_id: PeerId, req: DownloadRequest<N>) -> BlockRequest {
246        // update the peer's state
247        if let Some(peer) = self.peers.get_mut(&peer_id) {
248            peer.state = req.peer_state();
249        }
250
251        match req {
252            DownloadRequest::GetBlockHeaders { request, response, .. } => {
253                let inflight = Request { request: request.clone(), response };
254                self.inflight_headers_requests.insert(peer_id, inflight);
255                let HeadersRequest { start, limit, direction } = request;
256                BlockRequest::GetBlockHeaders(GetBlockHeaders {
257                    start_block: start,
258                    limit,
259                    skip: 0,
260                    direction,
261                })
262            }
263            DownloadRequest::GetBlockBodies { request, response, .. } => {
264                let inflight = Request { request: (), response };
265                self.inflight_bodies_requests.insert(peer_id, inflight);
266                BlockRequest::GetBlockBodies(GetBlockBodies(request))
267            }
268            DownloadRequest::GetReceipts { request, response, .. } => {
269                let inflight = Request { request: (), response };
270                self.inflight_receipts_requests.insert(peer_id, inflight);
271                BlockRequest::GetReceipts(GetReceipts(request))
272            }
273        }
274    }
275
276    /// Returns a new followup request for the peer.
277    ///
278    /// Caution: this expects that the peer is _not_ closed.
279    fn followup_request(&mut self, peer_id: PeerId) -> Option<BlockResponseOutcome> {
280        let req = self.queued_requests.pop_front()?;
281        let req = self.prepare_block_request(peer_id, req);
282        Some(BlockResponseOutcome::Request(peer_id, req))
283    }
284
285    /// Called on a `GetBlockHeaders` response from a peer.
286    ///
287    /// This delegates the response and returns a [`BlockResponseOutcome`] to either queue in a
288    /// direct followup request or get the peer reported if the response was a
289    /// [`EthResponseValidator::reputation_change_err`]
290    pub(crate) fn on_block_headers_response(
291        &mut self,
292        peer_id: PeerId,
293        res: RequestResult<Vec<N::BlockHeader>>,
294    ) -> Option<BlockResponseOutcome> {
295        let is_error = res.is_err();
296        let maybe_reputation_change = res.reputation_change_err();
297
298        let resp = self.inflight_headers_requests.remove(&peer_id);
299
300        let is_likely_bad_response =
301            resp.as_ref().is_some_and(|r| res.is_likely_bad_headers_response(&r.request));
302
303        if let Some(resp) = resp {
304            // delegate the response
305            let _ = resp.response.send(res.map(|h| (peer_id, h).into()));
306        }
307
308        if let Some(peer) = self.peers.get_mut(&peer_id) {
309            // update the peer's response state
310            peer.last_response_likely_bad = is_likely_bad_response;
311
312            // If the peer is still ready to accept new requests, we try to send a followup
313            // request immediately.
314            if peer.state.on_request_finished() && !is_error && !is_likely_bad_response {
315                return self.followup_request(peer_id)
316            }
317        }
318
319        // if the response was an `Err` worth reporting the peer for then we return a `BadResponse`
320        // outcome
321        maybe_reputation_change
322            .map(|reputation_change| BlockResponseOutcome::BadResponse(peer_id, reputation_change))
323    }
324
325    /// Called on a `GetBlockBodies` response from a peer
326    pub(crate) fn on_block_bodies_response(
327        &mut self,
328        peer_id: PeerId,
329        res: RequestResult<Vec<N::BlockBody>>,
330    ) -> Option<BlockResponseOutcome> {
331        let is_likely_bad_response = res.as_ref().map_or(true, |bodies| bodies.is_empty());
332
333        if let Some(resp) = self.inflight_bodies_requests.remove(&peer_id) {
334            let _ = resp.response.send(res.map(|b| (peer_id, b).into()));
335        }
336        if let Some(peer) = self.peers.get_mut(&peer_id) {
337            // update the peer's response state
338            peer.last_response_likely_bad = is_likely_bad_response;
339
340            if peer.state.on_request_finished() && !is_likely_bad_response {
341                return self.followup_request(peer_id)
342            }
343        }
344        None
345    }
346
347    /// Called on a `GetReceipts` response from a peer.
348    ///
349    /// All receipt variants (legacy with bloom, eth/69, eth/70) are expected to be normalized
350    /// to [`ReceiptsResponse`] by the caller before invoking this method.
351    pub(crate) fn on_receipts_response(
352        &mut self,
353        peer_id: PeerId,
354        res: RequestResult<ReceiptsResponse<N::Receipt>>,
355    ) -> Option<BlockResponseOutcome> {
356        let is_likely_bad_response = res.as_ref().map_or(true, |resp| resp.receipts.is_empty());
357
358        if let Some(resp) = self.inflight_receipts_requests.remove(&peer_id) {
359            let _ = resp.response.send(res.map(|r| (peer_id, r).into()));
360        }
361        if let Some(peer) = self.peers.get_mut(&peer_id) {
362            peer.last_response_likely_bad = is_likely_bad_response;
363
364            if peer.state.on_request_finished() && !is_likely_bad_response {
365                return self.followup_request(peer_id)
366            }
367        }
368        None
369    }
370
371    /// Returns a new [`FetchClient`] that can send requests to this type.
372    pub(crate) fn client(&self) -> FetchClient<N> {
373        FetchClient {
374            request_tx: self.download_requests_tx.clone(),
375            peers_handle: self.peers_handle.clone(),
376            num_active_peers: Arc::clone(&self.num_active_peers),
377        }
378    }
379}
380
381/// The outcome of [`StateFetcher::poll_action`]
382enum PollAction {
383    Ready(FetchAction),
384    NoRequests,
385    NoPeersAvailable,
386}
387
388/// Represents a connected peer
389#[derive(Debug)]
390struct Peer {
391    /// The state this peer currently resides in.
392    state: PeerState,
393    /// Best known hash that the peer has
394    best_hash: B256,
395    /// Tracks the best number of the peer.
396    best_number: u64,
397    /// Capabilities announced by the peer.
398    #[allow(dead_code)]
399    capabilities: Arc<Capabilities>,
400    /// Tracks the current timeout value we use for the peer.
401    timeout: Arc<AtomicU64>,
402    /// Tracks whether the peer has recently responded with a likely bad response.
403    ///
404    /// This is used to de-rank the peer if there are other peers available.
405    /// This exists because empty responses may not be penalized (e.g. when blocks near the tip are
406    /// downloaded), but we still want to avoid requesting from the same peer again if it has the
407    /// lowest timeout.
408    last_response_likely_bad: bool,
409    /// Tracks the range info for the peer.
410    range_info: Option<BlockRangeInfo>,
411}
412
413impl Peer {
414    fn timeout(&self) -> u64 {
415        self.timeout.load(Ordering::Relaxed)
416    }
417
418    /// Returns the earliest block number available from the peer.
419    fn earliest(&self) -> u64 {
420        self.range_info.as_ref().map_or(0, |info| info.earliest())
421    }
422
423    /// Returns true if the peer has the full history available.
424    fn has_full_history(&self) -> bool {
425        self.earliest() == 0
426    }
427
428    fn range(&self) -> Option<RangeInclusive<u64>> {
429        self.range_info.as_ref().map(|info| info.range())
430    }
431
432    /// Returns true if this peer has a better range than the other peer for serving the requested
433    /// range.
434    ///
435    /// A peer has a "better range" if:
436    /// 1. It can fully cover the requested range while the other cannot
437    /// 2. None can fully cover the range, but this peer has lower start value
438    /// 3. If a peer doesn't announce a range we assume it has full history, but check the other's
439    ///    range and treat that as better if it can cover the range
440    fn has_better_range(&self, other: &Self, range: &RangeInclusive<u64>) -> bool {
441        let self_range = self.range();
442        let other_range = other.range();
443
444        match (self_range, other_range) {
445            (Some(self_r), Some(other_r)) => {
446                // Check if each peer can fully cover the requested range
447                let self_covers = self_r.contains(range.start()) && self_r.contains(range.end());
448                let other_covers = other_r.contains(range.start()) && other_r.contains(range.end());
449
450                #[allow(clippy::match_same_arms)]
451                match (self_covers, other_covers) {
452                    (true, false) => true,  // Only self covers the range
453                    (false, true) => false, // Only other covers the range
454                    (true, true) => false,  // Both cover
455                    (false, false) => {
456                        // neither covers - prefer if peer has lower (better) start range
457                        self_r.start() < other_r.start()
458                    }
459                }
460            }
461            (Some(self_r), None) => {
462                // Self has range info, other doesn't (treated as full history with unknown latest)
463                // Self is better only if it covers the range
464                self_r.contains(range.start()) && self_r.contains(range.end())
465            }
466            (None, Some(other_r)) => {
467                // Self has no range info (full history), other has range info
468                // Self is better only if other doesn't cover the range
469                !(other_r.contains(range.start()) && other_r.contains(range.end()))
470            }
471            (None, None) => false, // Neither has range info - no one is better
472        }
473    }
474
475    /// Returns true if this peer is better than the other peer based on the given requirements.
476    fn is_better(&self, other: &Self, requirement: &BestPeerRequirements) -> bool {
477        match requirement {
478            BestPeerRequirements::None => false,
479            BestPeerRequirements::FullBlockRange(range) => self.has_better_range(other, range),
480            BestPeerRequirements::FullBlock => self.has_full_history() && !other.has_full_history(),
481        }
482    }
483}
484
485/// Tracks the state of an individual peer
486#[derive(Debug)]
487enum PeerState {
488    /// Peer is currently not handling requests and is available.
489    Idle,
490    /// Peer is handling a `GetBlockHeaders` request.
491    GetBlockHeaders,
492    /// Peer is handling a `GetBlockBodies` request.
493    GetBlockBodies,
494    /// Peer is handling a `GetReceipts` request.
495    GetReceipts,
496    /// Peer session is about to close
497    Closing,
498}
499
500// === impl PeerState ===
501
502impl PeerState {
503    /// Returns true if the peer is currently idle.
504    const fn is_idle(&self) -> bool {
505        matches!(self, Self::Idle)
506    }
507
508    /// Resets the state on a received response.
509    ///
510    /// If the state was already marked as `Closing` do nothing.
511    ///
512    /// Returns `true` if the peer is ready for another request.
513    const fn on_request_finished(&mut self) -> bool {
514        if !matches!(self, Self::Closing) {
515            *self = Self::Idle;
516            return true
517        }
518        false
519    }
520}
521
522/// A request that waits for a response from the network, so it can send it back through the
523/// response channel.
524#[derive(Debug)]
525struct Request<Req, Resp> {
526    /// The issued request object
527    // TODO: this can be attached to the response in error case
528    request: Req,
529    response: oneshot::Sender<Resp>,
530}
531
532/// Requests that can be sent to the Syncer from a [`FetchClient`]
533#[derive(Debug)]
534#[allow(clippy::enum_variant_names)]
535pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
536    /// Download the requested headers and send response through channel
537    GetBlockHeaders {
538        request: HeadersRequest,
539        response: oneshot::Sender<PeerRequestResult<Vec<N::BlockHeader>>>,
540        priority: Priority,
541    },
542    /// Download the requested bodies and send response through channel
543    GetBlockBodies {
544        request: Vec<B256>,
545        response: oneshot::Sender<PeerRequestResult<Vec<N::BlockBody>>>,
546        priority: Priority,
547        range_hint: Option<RangeInclusive<u64>>,
548    },
549    /// Download receipts for the given block hashes and send response through channel
550    GetReceipts {
551        request: Vec<B256>,
552        response: oneshot::Sender<PeerRequestResult<ReceiptsResponse<N::Receipt>>>,
553        priority: Priority,
554    },
555}
556
557// === impl DownloadRequest ===
558
559impl<N: NetworkPrimitives> DownloadRequest<N> {
560    /// Returns the corresponding state for a peer that handles the request.
561    const fn peer_state(&self) -> PeerState {
562        match self {
563            Self::GetBlockHeaders { .. } => PeerState::GetBlockHeaders,
564            Self::GetBlockBodies { .. } => PeerState::GetBlockBodies,
565            Self::GetReceipts { .. } => PeerState::GetReceipts,
566        }
567    }
568
569    /// Returns the requested priority of this request
570    const fn get_priority(&self) -> &Priority {
571        match self {
572            Self::GetBlockHeaders { priority, .. } |
573            Self::GetBlockBodies { priority, .. } |
574            Self::GetReceipts { priority, .. } => priority,
575        }
576    }
577
578    /// Returns `true` if this request is normal priority.
579    const fn is_normal_priority(&self) -> bool {
580        self.get_priority().is_normal()
581    }
582
583    /// Returns the best peer requirements for this request.
584    fn best_peer_requirements(&self) -> BestPeerRequirements {
585        match self {
586            Self::GetBlockHeaders { .. } => BestPeerRequirements::None,
587            Self::GetBlockBodies { range_hint, .. } => {
588                if let Some(range) = range_hint {
589                    BestPeerRequirements::FullBlockRange(range.clone())
590                } else {
591                    BestPeerRequirements::FullBlock
592                }
593            }
594            Self::GetReceipts { .. } => BestPeerRequirements::FullBlock,
595        }
596    }
597}
598
599/// An action the syncer can emit.
600pub(crate) enum FetchAction {
601    /// Dispatch an eth request to the given peer.
602    BlockRequest {
603        /// The targeted recipient for the request
604        peer_id: PeerId,
605        /// The request to send
606        request: BlockRequest,
607    },
608}
609
610/// Outcome of a processed response.
611///
612/// Returned after processing a response.
613#[derive(Debug, PartialEq, Eq)]
614pub(crate) enum BlockResponseOutcome {
615    /// Continue with another request to the peer.
616    Request(PeerId, BlockRequest),
617    /// How to handle a bad response and the reputation change to apply, if any.
618    BadResponse(PeerId, ReputationChangeKind),
619}
620
621/// Additional requirements for how to rank peers during selection.
622enum BestPeerRequirements {
623    /// No additional requirements
624    None,
625    /// Peer must have this block range available.
626    FullBlockRange(RangeInclusive<u64>),
627    /// Peer must have full range.
628    FullBlock,
629}
630
631#[cfg(test)]
632mod tests {
633    use super::*;
634    use crate::{peers::PeersManager, PeersConfig};
635    use alloy_consensus::Header;
636    use alloy_primitives::B512;
637    use std::future::poll_fn;
638
639    #[tokio::test(flavor = "multi_thread")]
640    async fn test_poll_fetcher() {
641        let manager = PeersManager::new(PeersConfig::default());
642        let mut fetcher =
643            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
644
645        poll_fn(move |cx| {
646            assert!(fetcher.poll(cx).is_pending());
647            let (tx, _rx) = oneshot::channel();
648            fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
649                request: vec![],
650                response: tx,
651                priority: Priority::default(),
652                range_hint: None,
653            });
654            assert!(fetcher.poll(cx).is_pending());
655
656            Poll::Ready(())
657        })
658        .await;
659    }
660
661    #[tokio::test]
662    async fn test_peer_rotation() {
663        let manager = PeersManager::new(PeersConfig::default());
664        let mut fetcher =
665            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
666        // Add a few random peers
667        let peer1 = B512::random();
668        let peer2 = B512::random();
669        let capabilities = Arc::new(Capabilities::from(vec![]));
670        fetcher.new_active_peer(
671            peer1,
672            B256::random(),
673            1,
674            Arc::clone(&capabilities),
675            Arc::new(AtomicU64::new(1)),
676            None,
677        );
678        fetcher.new_active_peer(
679            peer2,
680            B256::random(),
681            2,
682            Arc::clone(&capabilities),
683            Arc::new(AtomicU64::new(1)),
684            None,
685        );
686
687        let first_peer = fetcher.next_best_peer(BestPeerRequirements::None).unwrap();
688        assert!(first_peer == peer1 || first_peer == peer2);
689        // Pending disconnect for first_peer
690        fetcher.on_pending_disconnect(&first_peer);
691        // first_peer now isn't idle, so we should get other peer
692        let second_peer = fetcher.next_best_peer(BestPeerRequirements::None).unwrap();
693        assert!(first_peer == peer1 || first_peer == peer2);
694        assert_ne!(first_peer, second_peer);
695        // without idle peers, returns None
696        fetcher.on_pending_disconnect(&second_peer);
697        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), None);
698    }
699
700    #[tokio::test]
701    async fn test_peer_prioritization() {
702        let manager = PeersManager::new(PeersConfig::default());
703        let mut fetcher =
704            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
705        // Add a few random peers
706        let peer1 = B512::random();
707        let peer2 = B512::random();
708        let peer3 = B512::random();
709
710        let peer2_timeout = Arc::new(AtomicU64::new(300));
711
712        let capabilities = Arc::new(Capabilities::from(vec![]));
713        fetcher.new_active_peer(
714            peer1,
715            B256::random(),
716            1,
717            Arc::clone(&capabilities),
718            Arc::new(AtomicU64::new(30)),
719            None,
720        );
721        fetcher.new_active_peer(
722            peer2,
723            B256::random(),
724            2,
725            Arc::clone(&capabilities),
726            Arc::clone(&peer2_timeout),
727            None,
728        );
729        fetcher.new_active_peer(
730            peer3,
731            B256::random(),
732            3,
733            Arc::clone(&capabilities),
734            Arc::new(AtomicU64::new(50)),
735            None,
736        );
737
738        // Must always get peer1 (lowest timeout)
739        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer1));
740        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer1));
741        // peer2's timeout changes below peer1's
742        peer2_timeout.store(10, Ordering::Relaxed);
743        // Then we get peer 2 always (now lowest)
744        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer2));
745        assert_eq!(fetcher.next_best_peer(BestPeerRequirements::None), Some(peer2));
746    }
747
748    #[tokio::test]
749    async fn test_on_block_headers_response() {
750        let manager = PeersManager::new(PeersConfig::default());
751        let mut fetcher =
752            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
753        let peer_id = B512::random();
754
755        assert_eq!(fetcher.on_block_headers_response(peer_id, Ok(vec![Header::default()])), None);
756
757        assert_eq!(
758            fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)),
759            Some(BlockResponseOutcome::BadResponse(peer_id, ReputationChangeKind::Timeout))
760        );
761        assert_eq!(
762            fetcher.on_block_headers_response(peer_id, Err(RequestError::BadResponse)),
763            None
764        );
765        assert_eq!(
766            fetcher.on_block_headers_response(peer_id, Err(RequestError::ChannelClosed)),
767            None
768        );
769        assert_eq!(
770            fetcher.on_block_headers_response(peer_id, Err(RequestError::ConnectionDropped)),
771            None
772        );
773        assert_eq!(
774            fetcher.on_block_headers_response(peer_id, Err(RequestError::UnsupportedCapability)),
775            None
776        );
777    }
778
779    #[tokio::test]
780    async fn test_header_response_outcome() {
781        let manager = PeersManager::new(PeersConfig::default());
782        let mut fetcher =
783            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
784        let peer_id = B512::random();
785
786        let request_pair = || {
787            let (tx, _rx) = oneshot::channel();
788            let req = Request {
789                request: HeadersRequest {
790                    start: 0u64.into(),
791                    limit: 1,
792                    direction: Default::default(),
793                },
794                response: tx,
795            };
796            let header = Header { number: 0, ..Default::default() };
797            (req, header)
798        };
799
800        fetcher.new_active_peer(
801            peer_id,
802            Default::default(),
803            Default::default(),
804            Arc::new(Capabilities::from(vec![])),
805            Default::default(),
806            None,
807        );
808
809        let (req, header) = request_pair();
810        fetcher.inflight_headers_requests.insert(peer_id, req);
811
812        let outcome = fetcher.on_block_headers_response(peer_id, Ok(vec![header]));
813        assert!(outcome.is_none());
814        assert!(fetcher.peers[&peer_id].state.is_idle());
815
816        let outcome =
817            fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)).unwrap();
818
819        assert!(EthResponseValidator::reputation_change_err(&Err::<Vec<Header>, _>(
820            RequestError::Timeout
821        ))
822        .is_some());
823
824        match outcome {
825            BlockResponseOutcome::BadResponse(peer, _) => {
826                assert_eq!(peer, peer_id)
827            }
828            BlockResponseOutcome::Request(_, _) => {
829                unreachable!()
830            }
831        };
832
833        assert!(fetcher.peers[&peer_id].state.is_idle());
834    }
835
836    #[test]
837    fn test_peer_is_better_none_requirement() {
838        let peer1 = Peer {
839            state: PeerState::Idle,
840            best_hash: B256::random(),
841            best_number: 100,
842            capabilities: Arc::new(Capabilities::new(vec![])),
843            timeout: Arc::new(AtomicU64::new(10)),
844            last_response_likely_bad: false,
845            range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
846        };
847
848        let peer2 = Peer {
849            state: PeerState::Idle,
850            best_hash: B256::random(),
851            best_number: 50,
852            capabilities: Arc::new(Capabilities::new(vec![])),
853            timeout: Arc::new(AtomicU64::new(20)),
854            last_response_likely_bad: false,
855            range_info: None,
856        };
857
858        // With None requirement, is_better should always return false
859        assert!(!peer1.is_better(&peer2, &BestPeerRequirements::None));
860        assert!(!peer2.is_better(&peer1, &BestPeerRequirements::None));
861    }
862
863    #[test]
864    fn test_peer_is_better_full_block_requirement() {
865        // Peer with full history (earliest = 0)
866        let peer_full = Peer {
867            state: PeerState::Idle,
868            best_hash: B256::random(),
869            best_number: 100,
870            capabilities: Arc::new(Capabilities::new(vec![])),
871            timeout: Arc::new(AtomicU64::new(10)),
872            last_response_likely_bad: false,
873            range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
874        };
875
876        // Peer without full history (earliest = 50)
877        let peer_partial = Peer {
878            state: PeerState::Idle,
879            best_hash: B256::random(),
880            best_number: 100,
881            capabilities: Arc::new(Capabilities::new(vec![])),
882            timeout: Arc::new(AtomicU64::new(10)),
883            last_response_likely_bad: false,
884            range_info: Some(BlockRangeInfo::new(50, 100, B256::random())),
885        };
886
887        // Peer without range info (treated as full history)
888        let peer_no_range = Peer {
889            state: PeerState::Idle,
890            best_hash: B256::random(),
891            best_number: 100,
892            capabilities: Arc::new(Capabilities::new(vec![])),
893            timeout: Arc::new(AtomicU64::new(10)),
894            last_response_likely_bad: false,
895            range_info: None,
896        };
897
898        // Peer with full history is better than peer without
899        assert!(peer_full.is_better(&peer_partial, &BestPeerRequirements::FullBlock));
900        assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlock));
901
902        // Peer without range info (full history) is better than partial
903        assert!(peer_no_range.is_better(&peer_partial, &BestPeerRequirements::FullBlock));
904        assert!(!peer_partial.is_better(&peer_no_range, &BestPeerRequirements::FullBlock));
905
906        // Both have full history - no improvement
907        assert!(!peer_full.is_better(&peer_no_range, &BestPeerRequirements::FullBlock));
908        assert!(!peer_no_range.is_better(&peer_full, &BestPeerRequirements::FullBlock));
909    }
910
911    #[test]
912    fn test_peer_is_better_full_block_range_requirement() {
913        let range = RangeInclusive::new(40, 60);
914
915        // Peer that covers the requested range
916        let peer_covers = Peer {
917            state: PeerState::Idle,
918            best_hash: B256::random(),
919            best_number: 100,
920            capabilities: Arc::new(Capabilities::new(vec![])),
921            timeout: Arc::new(AtomicU64::new(10)),
922            last_response_likely_bad: false,
923            range_info: Some(BlockRangeInfo::new(0, 100, B256::random())),
924        };
925
926        // Peer that doesn't cover the range (earliest too high)
927        let peer_no_cover = Peer {
928            state: PeerState::Idle,
929            best_hash: B256::random(),
930            best_number: 100,
931            capabilities: Arc::new(Capabilities::new(vec![])),
932            timeout: Arc::new(AtomicU64::new(10)),
933            last_response_likely_bad: false,
934            range_info: Some(BlockRangeInfo::new(70, 100, B256::random())),
935        };
936
937        // Peer that covers the requested range is better than one that doesn't
938        assert!(peer_covers
939            .is_better(&peer_no_cover, &BestPeerRequirements::FullBlockRange(range.clone())));
940        assert!(
941            !peer_no_cover.is_better(&peer_covers, &BestPeerRequirements::FullBlockRange(range))
942        );
943    }
944
945    #[test]
946    fn test_peer_is_better_both_cover_range() {
947        let range = RangeInclusive::new(30, 50);
948
949        // Peer with full history that covers the range
950        let peer_full = Peer {
951            state: PeerState::Idle,
952            best_hash: B256::random(),
953            best_number: 100,
954            capabilities: Arc::new(Capabilities::new(vec![])),
955            timeout: Arc::new(AtomicU64::new(10)),
956            last_response_likely_bad: false,
957            range_info: Some(BlockRangeInfo::new(0, 50, B256::random())),
958        };
959
960        // Peer without full history that also covers the range
961        let peer_partial = Peer {
962            state: PeerState::Idle,
963            best_hash: B256::random(),
964            best_number: 100,
965            capabilities: Arc::new(Capabilities::new(vec![])),
966            timeout: Arc::new(AtomicU64::new(10)),
967            last_response_likely_bad: false,
968            range_info: Some(BlockRangeInfo::new(30, 50, B256::random())),
969        };
970
971        // When both cover the range, prefer none
972        assert!(!peer_full
973            .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
974        assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
975    }
976
977    #[test]
978    fn test_peer_is_better_lower_start() {
979        let range = RangeInclusive::new(30, 60);
980
981        // Peer with full history that covers the range
982        let peer_full = Peer {
983            state: PeerState::Idle,
984            best_hash: B256::random(),
985            best_number: 100,
986            capabilities: Arc::new(Capabilities::new(vec![])),
987            timeout: Arc::new(AtomicU64::new(10)),
988            last_response_likely_bad: false,
989            range_info: Some(BlockRangeInfo::new(0, 50, B256::random())),
990        };
991
992        // Peer without full history that also covers the range
993        let peer_partial = Peer {
994            state: PeerState::Idle,
995            best_hash: B256::random(),
996            best_number: 100,
997            capabilities: Arc::new(Capabilities::new(vec![])),
998            timeout: Arc::new(AtomicU64::new(10)),
999            last_response_likely_bad: false,
1000            range_info: Some(BlockRangeInfo::new(30, 50, B256::random())),
1001        };
1002
1003        // When both cover the range, prefer lower start value
1004        assert!(peer_full
1005            .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
1006        assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
1007    }
1008
1009    #[test]
1010    fn test_peer_is_better_neither_covers_range() {
1011        let range = RangeInclusive::new(40, 60);
1012
1013        // Peer with full history that doesn't cover the range (latest too low)
1014        let peer_full = Peer {
1015            state: PeerState::Idle,
1016            best_hash: B256::random(),
1017            best_number: 30,
1018            capabilities: Arc::new(Capabilities::new(vec![])),
1019            timeout: Arc::new(AtomicU64::new(10)),
1020            last_response_likely_bad: false,
1021            range_info: Some(BlockRangeInfo::new(0, 30, B256::random())),
1022        };
1023
1024        // Peer without full history that also doesn't cover the range
1025        let peer_partial = Peer {
1026            state: PeerState::Idle,
1027            best_hash: B256::random(),
1028            best_number: 30,
1029            capabilities: Arc::new(Capabilities::new(vec![])),
1030            timeout: Arc::new(AtomicU64::new(10)),
1031            last_response_likely_bad: false,
1032            range_info: Some(BlockRangeInfo::new(10, 30, B256::random())),
1033        };
1034
1035        // When neither covers the range, prefer full history
1036        assert!(peer_full
1037            .is_better(&peer_partial, &BestPeerRequirements::FullBlockRange(range.clone())));
1038        assert!(!peer_partial.is_better(&peer_full, &BestPeerRequirements::FullBlockRange(range)));
1039    }
1040
1041    #[test]
1042    fn test_peer_is_better_no_range_info() {
1043        let range = RangeInclusive::new(40, 60);
1044
1045        // Peer with range info
1046        let peer_with_range = Peer {
1047            state: PeerState::Idle,
1048            best_hash: B256::random(),
1049            best_number: 100,
1050            capabilities: Arc::new(Capabilities::new(vec![])),
1051            timeout: Arc::new(AtomicU64::new(10)),
1052            last_response_likely_bad: false,
1053            range_info: Some(BlockRangeInfo::new(30, 100, B256::random())),
1054        };
1055
1056        // Peer without range info
1057        let peer_no_range = Peer {
1058            state: PeerState::Idle,
1059            best_hash: B256::random(),
1060            best_number: 100,
1061            capabilities: Arc::new(Capabilities::new(vec![])),
1062            timeout: Arc::new(AtomicU64::new(10)),
1063            last_response_likely_bad: false,
1064            range_info: None,
1065        };
1066
1067        // Peer without range info is not better (we prefer peers with known ranges)
1068        assert!(!peer_no_range
1069            .is_better(&peer_with_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1070
1071        // Peer with range info is better than peer without
1072        assert!(
1073            peer_with_range.is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range))
1074        );
1075    }
1076
1077    #[test]
1078    fn test_peer_is_better_one_peer_no_range_covers() {
1079        let range = RangeInclusive::new(40, 60);
1080
1081        // Peer with range info that covers the requested range
1082        let peer_with_range_covers = Peer {
1083            state: PeerState::Idle,
1084            best_hash: B256::random(),
1085            best_number: 100,
1086            capabilities: Arc::new(Capabilities::new(vec![])),
1087            timeout: Arc::new(AtomicU64::new(10)),
1088            last_response_likely_bad: false,
1089            range_info: Some(BlockRangeInfo::new(30, 100, B256::random())),
1090        };
1091
1092        // Peer without range info (treated as full history with unknown latest)
1093        let peer_no_range = Peer {
1094            state: PeerState::Idle,
1095            best_hash: B256::random(),
1096            best_number: 100,
1097            capabilities: Arc::new(Capabilities::new(vec![])),
1098            timeout: Arc::new(AtomicU64::new(10)),
1099            last_response_likely_bad: false,
1100            range_info: None,
1101        };
1102
1103        // Peer with range that covers is better than peer without range info
1104        assert!(peer_with_range_covers
1105            .is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1106
1107        // Peer without range info is not better when other covers
1108        assert!(!peer_no_range
1109            .is_better(&peer_with_range_covers, &BestPeerRequirements::FullBlockRange(range)));
1110    }
1111
1112    #[test]
1113    fn test_peer_is_better_one_peer_no_range_doesnt_cover() {
1114        let range = RangeInclusive::new(40, 60);
1115
1116        // Peer with range info that does NOT cover the requested range (too high)
1117        let peer_with_range_no_cover = Peer {
1118            state: PeerState::Idle,
1119            best_hash: B256::random(),
1120            best_number: 100,
1121            capabilities: Arc::new(Capabilities::new(vec![])),
1122            timeout: Arc::new(AtomicU64::new(10)),
1123            last_response_likely_bad: false,
1124            range_info: Some(BlockRangeInfo::new(70, 100, B256::random())),
1125        };
1126
1127        // Peer without range info (treated as full history)
1128        let peer_no_range = Peer {
1129            state: PeerState::Idle,
1130            best_hash: B256::random(),
1131            best_number: 100,
1132            capabilities: Arc::new(Capabilities::new(vec![])),
1133            timeout: Arc::new(AtomicU64::new(10)),
1134            last_response_likely_bad: false,
1135            range_info: None,
1136        };
1137
1138        // Peer with range that doesn't cover is not better
1139        assert!(!peer_with_range_no_cover
1140            .is_better(&peer_no_range, &BestPeerRequirements::FullBlockRange(range.clone())));
1141
1142        // Peer without range info (full history) is better when other doesn't cover
1143        assert!(peer_no_range
1144            .is_better(&peer_with_range_no_cover, &BestPeerRequirements::FullBlockRange(range)));
1145    }
1146
1147    #[test]
1148    fn test_peer_is_better_edge_cases() {
1149        // Test exact range boundaries
1150        let range = RangeInclusive::new(50, 100);
1151
1152        // Peer that exactly covers the range
1153        let peer_exact = Peer {
1154            state: PeerState::Idle,
1155            best_hash: B256::random(),
1156            best_number: 100,
1157            capabilities: Arc::new(Capabilities::new(vec![])),
1158            timeout: Arc::new(AtomicU64::new(10)),
1159            last_response_likely_bad: false,
1160            range_info: Some(BlockRangeInfo::new(50, 100, B256::random())),
1161        };
1162
1163        // Peer that's one block short at the start
1164        let peer_short_start = Peer {
1165            state: PeerState::Idle,
1166            best_hash: B256::random(),
1167            best_number: 100,
1168            capabilities: Arc::new(Capabilities::new(vec![])),
1169            timeout: Arc::new(AtomicU64::new(10)),
1170            last_response_likely_bad: false,
1171            range_info: Some(BlockRangeInfo::new(51, 100, B256::random())),
1172        };
1173
1174        // Peer that's one block short at the end
1175        let peer_short_end = Peer {
1176            state: PeerState::Idle,
1177            best_hash: B256::random(),
1178            best_number: 100,
1179            capabilities: Arc::new(Capabilities::new(vec![])),
1180            timeout: Arc::new(AtomicU64::new(10)),
1181            last_response_likely_bad: false,
1182            range_info: Some(BlockRangeInfo::new(50, 99, B256::random())),
1183        };
1184
1185        // Exact coverage is better than short coverage
1186        assert!(peer_exact
1187            .is_better(&peer_short_start, &BestPeerRequirements::FullBlockRange(range.clone())));
1188        assert!(peer_exact
1189            .is_better(&peer_short_end, &BestPeerRequirements::FullBlockRange(range.clone())));
1190
1191        // Short coverage is not better than exact coverage
1192        assert!(!peer_short_start
1193            .is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range.clone())));
1194        assert!(
1195            !peer_short_end.is_better(&peer_exact, &BestPeerRequirements::FullBlockRange(range))
1196        );
1197    }
1198
1199    /// Creates a `StateFetcher` with a single idle peer and returns both.
1200    fn fetcher_with_peer() -> (StateFetcher<EthNetworkPrimitives>, PeerId) {
1201        let manager = PeersManager::new(PeersConfig::default());
1202        let mut fetcher =
1203            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
1204        let peer_id = B512::random();
1205
1206        fetcher.new_active_peer(
1207            peer_id,
1208            Default::default(),
1209            Default::default(),
1210            Arc::new(Capabilities::from(vec![])),
1211            Default::default(),
1212            None,
1213        );
1214        (fetcher, peer_id)
1215    }
1216
1217    /// Inserts an inflight receipts request into the fetcher and returns the
1218    /// `oneshot::Receiver` that the final response will be sent through.
1219    fn insert_inflight_receipts(
1220        fetcher: &mut StateFetcher<EthNetworkPrimitives>,
1221        peer_id: PeerId,
1222    ) -> oneshot::Receiver<PeerRequestResult<ReceiptsResponse<reth_ethereum_primitives::Receipt>>>
1223    {
1224        let (tx, rx) = oneshot::channel();
1225        fetcher.inflight_receipts_requests.insert(peer_id, Request { request: (), response: tx });
1226        fetcher.peers.get_mut(&peer_id).unwrap().state = PeerState::GetReceipts;
1227        rx
1228    }
1229
1230    // ---- Receipts: basic dispatch ----
1231
1232    #[tokio::test]
1233    async fn test_poll_dispatches_receipts_to_peer() {
1234        let (mut fetcher, peer_id) = fetcher_with_peer();
1235
1236        poll_fn(move |cx| {
1237            let (tx, _rx) = oneshot::channel();
1238            fetcher.queued_requests.push_back(DownloadRequest::GetReceipts {
1239                request: vec![B256::ZERO],
1240                response: tx,
1241                priority: Priority::default(),
1242            });
1243
1244            let Poll::Ready(FetchAction::BlockRequest { peer_id: dispatched_peer, request }) =
1245                fetcher.poll(cx)
1246            else {
1247                panic!("expected Ready(BlockRequest)");
1248            };
1249            assert_eq!(dispatched_peer, peer_id);
1250            assert!(matches!(request, BlockRequest::GetReceipts(_)));
1251
1252            // Peer should now be in GetReceipts state
1253            assert!(matches!(fetcher.peers[&peer_id].state, PeerState::GetReceipts));
1254            // Inflight request should be tracked
1255            assert!(fetcher.inflight_receipts_requests.contains_key(&peer_id));
1256
1257            Poll::Ready(())
1258        })
1259        .await;
1260    }
1261
1262    // ---- Receipts: response handling ----
1263
1264    #[tokio::test]
1265    async fn test_receipts_complete_response_resolves_and_idles_peer() {
1266        let (mut fetcher, peer_id) = fetcher_with_peer();
1267
1268        let rx = insert_inflight_receipts(&mut fetcher, peer_id);
1269
1270        let resp = ReceiptsResponse::new(vec![vec![]]);
1271        let outcome = fetcher.on_receipts_response(peer_id, Ok(resp));
1272
1273        // No queued requests, so no followup
1274        assert!(outcome.is_none());
1275        // Peer back to idle
1276        assert!(fetcher.peers[&peer_id].state.is_idle());
1277        // Inflight cleaned up
1278        assert!(!fetcher.inflight_receipts_requests.contains_key(&peer_id));
1279
1280        // Caller receives the response
1281        let result = rx.await.unwrap().unwrap();
1282        assert_eq!(result.1.receipts.len(), 1);
1283    }
1284
1285    #[tokio::test]
1286    async fn test_receipts_empty_response_marks_peer_bad() {
1287        let (mut fetcher, peer_id) = fetcher_with_peer();
1288        let _rx = insert_inflight_receipts(&mut fetcher, peer_id);
1289
1290        let resp = ReceiptsResponse::new(vec![]);
1291        let _ = fetcher.on_receipts_response(peer_id, Ok(resp));
1292
1293        assert!(fetcher.peers[&peer_id].last_response_likely_bad);
1294    }
1295
1296    #[tokio::test]
1297    async fn test_receipts_error_forwards_and_marks_peer_bad() {
1298        let (mut fetcher, peer_id) = fetcher_with_peer();
1299        let rx = insert_inflight_receipts(&mut fetcher, peer_id);
1300
1301        let _ = fetcher.on_receipts_response(peer_id, Err(RequestError::Timeout));
1302
1303        assert!(fetcher.peers[&peer_id].last_response_likely_bad);
1304        // Error is forwarded to the caller
1305        let result = rx.await.unwrap();
1306        assert_eq!(result.unwrap_err(), RequestError::Timeout);
1307    }
1308
1309    #[tokio::test]
1310    async fn test_session_closed_cancels_inflight_receipts() {
1311        let (mut fetcher, peer_id) = fetcher_with_peer();
1312        let rx = insert_inflight_receipts(&mut fetcher, peer_id);
1313
1314        fetcher.on_session_closed(&peer_id);
1315
1316        assert!(!fetcher.peers.contains_key(&peer_id));
1317        assert!(!fetcher.inflight_receipts_requests.contains_key(&peer_id));
1318
1319        let result = rx.await.unwrap();
1320        assert_eq!(result.unwrap_err(), RequestError::ConnectionDropped);
1321    }
1322
1323    #[tokio::test]
1324    async fn test_receipts_response_triggers_followup() {
1325        let (mut fetcher, peer_id) = fetcher_with_peer();
1326
1327        // Queue a bodies request as a followup candidate
1328        let (followup_tx, _followup_rx) = oneshot::channel();
1329        fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
1330            request: vec![B256::random()],
1331            response: followup_tx,
1332            priority: Priority::default(),
1333            range_hint: None,
1334        });
1335
1336        let _rx = insert_inflight_receipts(&mut fetcher, peer_id);
1337
1338        let resp = ReceiptsResponse::new(vec![vec![]]);
1339        let outcome = fetcher.on_receipts_response(peer_id, Ok(resp));
1340
1341        assert!(matches!(outcome, Some(BlockResponseOutcome::Request(pid, _)) if pid == peer_id));
1342    }
1343
1344    #[tokio::test]
1345    async fn test_prepare_block_request_creates_inflight_receipts() {
1346        let (mut fetcher, peer_id) = fetcher_with_peer();
1347        let hashes = vec![B256::with_last_byte(1), B256::with_last_byte(2)];
1348
1349        let (tx, _rx) = oneshot::channel();
1350        let req = DownloadRequest::GetReceipts {
1351            request: hashes.clone(),
1352            response: tx,
1353            priority: Priority::default(),
1354        };
1355
1356        let block_request = fetcher.prepare_block_request(peer_id, req);
1357
1358        // Returns a GetReceipts block request with the same hashes
1359        match block_request {
1360            BlockRequest::GetReceipts(ref get) => {
1361                assert_eq!(get.0, hashes);
1362            }
1363            other => panic!("expected GetReceipts, got {other:?}"),
1364        }
1365
1366        // Peer state transitions to GetReceipts
1367        assert!(matches!(fetcher.peers[&peer_id].state, PeerState::GetReceipts));
1368
1369        // Inflight request is tracked
1370        assert!(fetcher.inflight_receipts_requests.contains_key(&peer_id));
1371    }
1372}