reth_network/fetch/
mod.rs

1//! Fetch data from the network.
2
3mod client;
4
5pub use client::FetchClient;
6
7use crate::{message::BlockRequest, session::BlockRangeInfo};
8use alloy_primitives::B256;
9use futures::StreamExt;
10use reth_eth_wire::{EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives};
11use reth_network_api::test_utils::PeersHandle;
12use reth_network_p2p::{
13    error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult},
14    headers::client::HeadersRequest,
15    priority::Priority,
16};
17use reth_network_peers::PeerId;
18use reth_network_types::ReputationChangeKind;
19use std::{
20    collections::{HashMap, VecDeque},
21    ops::RangeInclusive,
22    sync::{
23        atomic::{AtomicU64, AtomicUsize, Ordering},
24        Arc,
25    },
26    task::{Context, Poll},
27};
28use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
29use tokio_stream::wrappers::UnboundedReceiverStream;
30
31type InflightHeadersRequest<H> = Request<HeadersRequest, PeerRequestResult<Vec<H>>>;
32type InflightBodiesRequest<B> = Request<Vec<B256>, PeerRequestResult<Vec<B>>>;
33
34/// Manages data fetching operations.
35///
36/// This type is hooked into the staged sync pipeline and delegates download request to available
37/// peers and sends the response once ready.
38///
39/// This type maintains a list of connected peers that are available for requests.
40#[derive(Debug)]
41pub struct StateFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
42    /// Currently active [`GetBlockHeaders`] requests
43    inflight_headers_requests: HashMap<PeerId, InflightHeadersRequest<N::BlockHeader>>,
44    /// Currently active [`GetBlockBodies`] requests
45    inflight_bodies_requests: HashMap<PeerId, InflightBodiesRequest<N::BlockBody>>,
46    /// The list of _available_ peers for requests.
47    peers: HashMap<PeerId, Peer>,
48    /// The handle to the peers manager
49    peers_handle: PeersHandle,
50    /// Number of active peer sessions the node's currently handling.
51    num_active_peers: Arc<AtomicUsize>,
52    /// Requests queued for processing
53    queued_requests: VecDeque<DownloadRequest<N>>,
54    /// Receiver for new incoming download requests
55    download_requests_rx: UnboundedReceiverStream<DownloadRequest<N>>,
56    /// Sender for download requests, used to detach a [`FetchClient`]
57    download_requests_tx: UnboundedSender<DownloadRequest<N>>,
58}
59
60// === impl StateSyncer ===
61
62impl<N: NetworkPrimitives> StateFetcher<N> {
63    pub(crate) fn new(peers_handle: PeersHandle, num_active_peers: Arc<AtomicUsize>) -> Self {
64        let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel();
65        Self {
66            inflight_headers_requests: Default::default(),
67            inflight_bodies_requests: Default::default(),
68            peers: Default::default(),
69            peers_handle,
70            num_active_peers,
71            queued_requests: Default::default(),
72            download_requests_rx: UnboundedReceiverStream::new(download_requests_rx),
73            download_requests_tx,
74        }
75    }
76
77    /// Invoked when connected to a new peer.
78    pub(crate) fn new_active_peer(
79        &mut self,
80        peer_id: PeerId,
81        best_hash: B256,
82        best_number: u64,
83        timeout: Arc<AtomicU64>,
84        range_info: Option<BlockRangeInfo>,
85    ) {
86        self.peers.insert(
87            peer_id,
88            Peer {
89                state: PeerState::Idle,
90                best_hash,
91                best_number,
92                timeout,
93                last_response_likely_bad: false,
94                range_info,
95            },
96        );
97    }
98
99    /// Removes the peer from the peer list, after which it is no longer available for future
100    /// requests.
101    ///
102    /// Invoked when an active session was closed.
103    ///
104    /// This cancels also inflight request and sends an error to the receiver.
105    pub(crate) fn on_session_closed(&mut self, peer: &PeerId) {
106        self.peers.remove(peer);
107        if let Some(req) = self.inflight_headers_requests.remove(peer) {
108            let _ = req.response.send(Err(RequestError::ConnectionDropped));
109        }
110        if let Some(req) = self.inflight_bodies_requests.remove(peer) {
111            let _ = req.response.send(Err(RequestError::ConnectionDropped));
112        }
113    }
114
115    /// Updates the block information for the peer.
116    ///
117    /// Returns `true` if this a newer block
118    pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) -> bool {
119        if let Some(peer) = self.peers.get_mut(peer_id) {
120            if number > peer.best_number {
121                peer.best_hash = hash;
122                peer.best_number = number;
123                return true
124            }
125        }
126        false
127    }
128
129    /// Invoked when an active session is about to be disconnected.
130    pub(crate) fn on_pending_disconnect(&mut self, peer_id: &PeerId) {
131        if let Some(peer) = self.peers.get_mut(peer_id) {
132            peer.state = PeerState::Closing;
133        }
134    }
135
136    /// Returns the _next_ idle peer that's ready to accept a request,
137    /// prioritizing those with the lowest timeout/latency and those that recently responded with
138    /// adequate data.
139    fn next_best_peer(&self) -> Option<PeerId> {
140        let mut idle = self.peers.iter().filter(|(_, peer)| peer.state.is_idle());
141
142        let mut best_peer = idle.next()?;
143
144        for maybe_better in idle {
145            // replace best peer if our current best peer sent us a bad response last time
146            if best_peer.1.last_response_likely_bad && !maybe_better.1.last_response_likely_bad {
147                best_peer = maybe_better;
148                continue
149            }
150
151            // replace best peer if this peer has better rtt
152            if maybe_better.1.timeout() < best_peer.1.timeout() &&
153                !maybe_better.1.last_response_likely_bad
154            {
155                best_peer = maybe_better;
156            }
157        }
158
159        Some(*best_peer.0)
160    }
161
162    /// Returns the next action to return
163    fn poll_action(&mut self) -> PollAction {
164        // we only check and not pop here since we don't know yet whether a peer is available.
165        if self.queued_requests.is_empty() {
166            return PollAction::NoRequests
167        }
168
169        let Some(peer_id) = self.next_best_peer() else { return PollAction::NoPeersAvailable };
170
171        let request = self.queued_requests.pop_front().expect("not empty");
172        let request = self.prepare_block_request(peer_id, request);
173
174        PollAction::Ready(FetchAction::BlockRequest { peer_id, request })
175    }
176
177    /// Advance the state the syncer
178    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchAction> {
179        // drain buffered actions first
180        loop {
181            let no_peers_available = match self.poll_action() {
182                PollAction::Ready(action) => return Poll::Ready(action),
183                PollAction::NoRequests => false,
184                PollAction::NoPeersAvailable => true,
185            };
186
187            loop {
188                // poll incoming requests
189                match self.download_requests_rx.poll_next_unpin(cx) {
190                    Poll::Ready(Some(request)) => match request.get_priority() {
191                        Priority::High => {
192                            // find the first normal request and queue before, add this request to
193                            // the back of the high-priority queue
194                            let pos = self
195                                .queued_requests
196                                .iter()
197                                .position(|req| req.is_normal_priority())
198                                .unwrap_or(0);
199                            self.queued_requests.insert(pos, request);
200                        }
201                        Priority::Normal => {
202                            self.queued_requests.push_back(request);
203                        }
204                    },
205                    Poll::Ready(None) => {
206                        unreachable!("channel can't close")
207                    }
208                    Poll::Pending => break,
209                }
210            }
211
212            if self.queued_requests.is_empty() || no_peers_available {
213                return Poll::Pending
214            }
215        }
216    }
217
218    /// Handles a new request to a peer.
219    ///
220    /// Caution: this assumes the peer exists and is idle
221    fn prepare_block_request(&mut self, peer_id: PeerId, req: DownloadRequest<N>) -> BlockRequest {
222        // update the peer's state
223        if let Some(peer) = self.peers.get_mut(&peer_id) {
224            peer.state = req.peer_state();
225        }
226
227        match req {
228            DownloadRequest::GetBlockHeaders { request, response, .. } => {
229                let inflight = Request { request: request.clone(), response };
230                self.inflight_headers_requests.insert(peer_id, inflight);
231                let HeadersRequest { start, limit, direction } = request;
232                BlockRequest::GetBlockHeaders(GetBlockHeaders {
233                    start_block: start,
234                    limit,
235                    skip: 0,
236                    direction,
237                })
238            }
239            DownloadRequest::GetBlockBodies { request, response, .. } => {
240                let inflight = Request { request: request.clone(), response };
241                self.inflight_bodies_requests.insert(peer_id, inflight);
242                BlockRequest::GetBlockBodies(GetBlockBodies(request))
243            }
244        }
245    }
246
247    /// Returns a new followup request for the peer.
248    ///
249    /// Caution: this expects that the peer is _not_ closed.
250    fn followup_request(&mut self, peer_id: PeerId) -> Option<BlockResponseOutcome> {
251        let req = self.queued_requests.pop_front()?;
252        let req = self.prepare_block_request(peer_id, req);
253        Some(BlockResponseOutcome::Request(peer_id, req))
254    }
255
256    /// Called on a `GetBlockHeaders` response from a peer.
257    ///
258    /// This delegates the response and returns a [`BlockResponseOutcome`] to either queue in a
259    /// direct followup request or get the peer reported if the response was a
260    /// [`EthResponseValidator::reputation_change_err`]
261    pub(crate) fn on_block_headers_response(
262        &mut self,
263        peer_id: PeerId,
264        res: RequestResult<Vec<N::BlockHeader>>,
265    ) -> Option<BlockResponseOutcome> {
266        let is_error = res.is_err();
267        let maybe_reputation_change = res.reputation_change_err();
268
269        let resp = self.inflight_headers_requests.remove(&peer_id);
270
271        let is_likely_bad_response =
272            resp.as_ref().is_some_and(|r| res.is_likely_bad_headers_response(&r.request));
273
274        if let Some(resp) = resp {
275            // delegate the response
276            let _ = resp.response.send(res.map(|h| (peer_id, h).into()));
277        }
278
279        if let Some(peer) = self.peers.get_mut(&peer_id) {
280            // update the peer's response state
281            peer.last_response_likely_bad = is_likely_bad_response;
282
283            // If the peer is still ready to accept new requests, we try to send a followup
284            // request immediately.
285            if peer.state.on_request_finished() && !is_error && !is_likely_bad_response {
286                return self.followup_request(peer_id)
287            }
288        }
289
290        // if the response was an `Err` worth reporting the peer for then we return a `BadResponse`
291        // outcome
292        maybe_reputation_change
293            .map(|reputation_change| BlockResponseOutcome::BadResponse(peer_id, reputation_change))
294    }
295
296    /// Called on a `GetBlockBodies` response from a peer
297    pub(crate) fn on_block_bodies_response(
298        &mut self,
299        peer_id: PeerId,
300        res: RequestResult<Vec<N::BlockBody>>,
301    ) -> Option<BlockResponseOutcome> {
302        let is_likely_bad_response = res.as_ref().map_or(true, |bodies| bodies.is_empty());
303
304        if let Some(resp) = self.inflight_bodies_requests.remove(&peer_id) {
305            let _ = resp.response.send(res.map(|b| (peer_id, b).into()));
306        }
307        if let Some(peer) = self.peers.get_mut(&peer_id) {
308            // update the peer's response state
309            peer.last_response_likely_bad = is_likely_bad_response;
310
311            if peer.state.on_request_finished() && !is_likely_bad_response {
312                return self.followup_request(peer_id)
313            }
314        }
315        None
316    }
317
318    /// Returns a new [`FetchClient`] that can send requests to this type.
319    pub(crate) fn client(&self) -> FetchClient<N> {
320        FetchClient {
321            request_tx: self.download_requests_tx.clone(),
322            peers_handle: self.peers_handle.clone(),
323            num_active_peers: Arc::clone(&self.num_active_peers),
324        }
325    }
326}
327
328/// The outcome of [`StateFetcher::poll_action`]
329enum PollAction {
330    Ready(FetchAction),
331    NoRequests,
332    NoPeersAvailable,
333}
334
335/// Represents a connected peer
336#[derive(Debug)]
337struct Peer {
338    /// The state this peer currently resides in.
339    state: PeerState,
340    /// Best known hash that the peer has
341    best_hash: B256,
342    /// Tracks the best number of the peer.
343    best_number: u64,
344    /// Tracks the current timeout value we use for the peer.
345    timeout: Arc<AtomicU64>,
346    /// Tracks whether the peer has recently responded with a likely bad response.
347    ///
348    /// This is used to de-rank the peer if there are other peers available.
349    /// This exists because empty responses may not be penalized (e.g. when blocks near the tip are
350    /// downloaded), but we still want to avoid requesting from the same peer again if it has the
351    /// lowest timeout.
352    last_response_likely_bad: bool,
353    /// Tracks the range info for the peer.
354    #[allow(dead_code)]
355    range_info: Option<BlockRangeInfo>,
356}
357
358impl Peer {
359    fn timeout(&self) -> u64 {
360        self.timeout.load(Ordering::Relaxed)
361    }
362}
363
364/// Tracks the state of an individual peer
365#[derive(Debug)]
366enum PeerState {
367    /// Peer is currently not handling requests and is available.
368    Idle,
369    /// Peer is handling a `GetBlockHeaders` request.
370    GetBlockHeaders,
371    /// Peer is handling a `GetBlockBodies` request.
372    GetBlockBodies,
373    /// Peer session is about to close
374    Closing,
375}
376
377// === impl PeerState ===
378
379impl PeerState {
380    /// Returns true if the peer is currently idle.
381    const fn is_idle(&self) -> bool {
382        matches!(self, Self::Idle)
383    }
384
385    /// Resets the state on a received response.
386    ///
387    /// If the state was already marked as `Closing` do nothing.
388    ///
389    /// Returns `true` if the peer is ready for another request.
390    const fn on_request_finished(&mut self) -> bool {
391        if !matches!(self, Self::Closing) {
392            *self = Self::Idle;
393            return true
394        }
395        false
396    }
397}
398
399/// A request that waits for a response from the network, so it can send it back through the
400/// response channel.
401#[derive(Debug)]
402struct Request<Req, Resp> {
403    /// The issued request object
404    // TODO: this can be attached to the response in error case
405    request: Req,
406    response: oneshot::Sender<Resp>,
407}
408
409/// Requests that can be sent to the Syncer from a [`FetchClient`]
410#[derive(Debug)]
411pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
412    /// Download the requested headers and send response through channel
413    GetBlockHeaders {
414        request: HeadersRequest,
415        response: oneshot::Sender<PeerRequestResult<Vec<N::BlockHeader>>>,
416        priority: Priority,
417    },
418    /// Download the requested headers and send response through channel
419    GetBlockBodies {
420        request: Vec<B256>,
421        response: oneshot::Sender<PeerRequestResult<Vec<N::BlockBody>>>,
422        priority: Priority,
423        #[allow(dead_code)]
424        range_hint: Option<RangeInclusive<u64>>,
425    },
426}
427
428// === impl DownloadRequest ===
429
430impl<N: NetworkPrimitives> DownloadRequest<N> {
431    /// Returns the corresponding state for a peer that handles the request.
432    const fn peer_state(&self) -> PeerState {
433        match self {
434            Self::GetBlockHeaders { .. } => PeerState::GetBlockHeaders,
435            Self::GetBlockBodies { .. } => PeerState::GetBlockBodies,
436        }
437    }
438
439    /// Returns the requested priority of this request
440    const fn get_priority(&self) -> &Priority {
441        match self {
442            Self::GetBlockHeaders { priority, .. } | Self::GetBlockBodies { priority, .. } => {
443                priority
444            }
445        }
446    }
447
448    /// Returns `true` if this request is normal priority.
449    const fn is_normal_priority(&self) -> bool {
450        self.get_priority().is_normal()
451    }
452}
453
454/// An action the syncer can emit.
455pub(crate) enum FetchAction {
456    /// Dispatch an eth request to the given peer.
457    BlockRequest {
458        /// The targeted recipient for the request
459        peer_id: PeerId,
460        /// The request to send
461        request: BlockRequest,
462    },
463}
464
465/// Outcome of a processed response.
466///
467/// Returned after processing a response.
468#[derive(Debug, PartialEq, Eq)]
469pub(crate) enum BlockResponseOutcome {
470    /// Continue with another request to the peer.
471    Request(PeerId, BlockRequest),
472    /// How to handle a bad response and the reputation change to apply, if any.
473    BadResponse(PeerId, ReputationChangeKind),
474}
475
476#[cfg(test)]
477mod tests {
478    use super::*;
479    use crate::{peers::PeersManager, PeersConfig};
480    use alloy_consensus::Header;
481    use alloy_primitives::B512;
482    use std::future::poll_fn;
483
484    #[tokio::test(flavor = "multi_thread")]
485    async fn test_poll_fetcher() {
486        let manager = PeersManager::new(PeersConfig::default());
487        let mut fetcher =
488            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
489
490        poll_fn(move |cx| {
491            assert!(fetcher.poll(cx).is_pending());
492            let (tx, _rx) = oneshot::channel();
493            fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
494                request: vec![],
495                response: tx,
496                priority: Priority::default(),
497                range_hint: None,
498            });
499            assert!(fetcher.poll(cx).is_pending());
500
501            Poll::Ready(())
502        })
503        .await;
504    }
505
506    #[tokio::test]
507    async fn test_peer_rotation() {
508        let manager = PeersManager::new(PeersConfig::default());
509        let mut fetcher =
510            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
511        // Add a few random peers
512        let peer1 = B512::random();
513        let peer2 = B512::random();
514        fetcher.new_active_peer(peer1, B256::random(), 1, Arc::new(AtomicU64::new(1)), None);
515        fetcher.new_active_peer(peer2, B256::random(), 2, Arc::new(AtomicU64::new(1)), None);
516
517        let first_peer = fetcher.next_best_peer().unwrap();
518        assert!(first_peer == peer1 || first_peer == peer2);
519        // Pending disconnect for first_peer
520        fetcher.on_pending_disconnect(&first_peer);
521        // first_peer now isn't idle, so we should get other peer
522        let second_peer = fetcher.next_best_peer().unwrap();
523        assert!(first_peer == peer1 || first_peer == peer2);
524        assert_ne!(first_peer, second_peer);
525        // without idle peers, returns None
526        fetcher.on_pending_disconnect(&second_peer);
527        assert_eq!(fetcher.next_best_peer(), None);
528    }
529
530    #[tokio::test]
531    async fn test_peer_prioritization() {
532        let manager = PeersManager::new(PeersConfig::default());
533        let mut fetcher =
534            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
535        // Add a few random peers
536        let peer1 = B512::random();
537        let peer2 = B512::random();
538        let peer3 = B512::random();
539
540        let peer2_timeout = Arc::new(AtomicU64::new(300));
541
542        fetcher.new_active_peer(peer1, B256::random(), 1, Arc::new(AtomicU64::new(30)), None);
543        fetcher.new_active_peer(peer2, B256::random(), 2, Arc::clone(&peer2_timeout), None);
544        fetcher.new_active_peer(peer3, B256::random(), 3, Arc::new(AtomicU64::new(50)), None);
545
546        // Must always get peer1 (lowest timeout)
547        assert_eq!(fetcher.next_best_peer(), Some(peer1));
548        assert_eq!(fetcher.next_best_peer(), Some(peer1));
549        // peer2's timeout changes below peer1's
550        peer2_timeout.store(10, Ordering::Relaxed);
551        // Then we get peer 2 always (now lowest)
552        assert_eq!(fetcher.next_best_peer(), Some(peer2));
553        assert_eq!(fetcher.next_best_peer(), Some(peer2));
554    }
555
556    #[tokio::test]
557    async fn test_on_block_headers_response() {
558        let manager = PeersManager::new(PeersConfig::default());
559        let mut fetcher =
560            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
561        let peer_id = B512::random();
562
563        assert_eq!(fetcher.on_block_headers_response(peer_id, Ok(vec![Header::default()])), None);
564
565        assert_eq!(
566            fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)),
567            Some(BlockResponseOutcome::BadResponse(peer_id, ReputationChangeKind::Timeout))
568        );
569        assert_eq!(
570            fetcher.on_block_headers_response(peer_id, Err(RequestError::BadResponse)),
571            None
572        );
573        assert_eq!(
574            fetcher.on_block_headers_response(peer_id, Err(RequestError::ChannelClosed)),
575            None
576        );
577        assert_eq!(
578            fetcher.on_block_headers_response(peer_id, Err(RequestError::ConnectionDropped)),
579            None
580        );
581        assert_eq!(
582            fetcher.on_block_headers_response(peer_id, Err(RequestError::UnsupportedCapability)),
583            None
584        );
585    }
586
587    #[tokio::test]
588    async fn test_header_response_outcome() {
589        let manager = PeersManager::new(PeersConfig::default());
590        let mut fetcher =
591            StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
592        let peer_id = B512::random();
593
594        let request_pair = || {
595            let (tx, _rx) = oneshot::channel();
596            let req = Request {
597                request: HeadersRequest {
598                    start: 0u64.into(),
599                    limit: 1,
600                    direction: Default::default(),
601                },
602                response: tx,
603            };
604            let header = Header { number: 0, ..Default::default() };
605            (req, header)
606        };
607
608        fetcher.new_active_peer(
609            peer_id,
610            Default::default(),
611            Default::default(),
612            Default::default(),
613            None,
614        );
615
616        let (req, header) = request_pair();
617        fetcher.inflight_headers_requests.insert(peer_id, req);
618
619        let outcome = fetcher.on_block_headers_response(peer_id, Ok(vec![header]));
620        assert!(outcome.is_none());
621        assert!(fetcher.peers[&peer_id].state.is_idle());
622
623        let outcome =
624            fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)).unwrap();
625
626        assert!(EthResponseValidator::reputation_change_err(&Err::<Vec<Header>, _>(
627            RequestError::Timeout
628        ))
629        .is_some());
630
631        match outcome {
632            BlockResponseOutcome::BadResponse(peer, _) => {
633                assert_eq!(peer, peer_id)
634            }
635            BlockResponseOutcome::Request(_, _) => {
636                unreachable!()
637            }
638        };
639
640        assert!(fetcher.peers[&peer_id].state.is_idle());
641    }
642}