1mod client;
4
5pub use client::FetchClient;
6
7use crate::{message::BlockRequest, session::BlockRangeInfo};
8use alloy_primitives::B256;
9use futures::StreamExt;
10use reth_eth_wire::{EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives};
11use reth_network_api::test_utils::PeersHandle;
12use reth_network_p2p::{
13 error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult},
14 headers::client::HeadersRequest,
15 priority::Priority,
16};
17use reth_network_peers::PeerId;
18use reth_network_types::ReputationChangeKind;
19use std::{
20 collections::{HashMap, VecDeque},
21 ops::RangeInclusive,
22 sync::{
23 atomic::{AtomicU64, AtomicUsize, Ordering},
24 Arc,
25 },
26 task::{Context, Poll},
27};
28use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
29use tokio_stream::wrappers::UnboundedReceiverStream;
30
31type InflightHeadersRequest<H> = Request<HeadersRequest, PeerRequestResult<Vec<H>>>;
32type InflightBodiesRequest<B> = Request<Vec<B256>, PeerRequestResult<Vec<B>>>;
33
34#[derive(Debug)]
41pub struct StateFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
42 inflight_headers_requests: HashMap<PeerId, InflightHeadersRequest<N::BlockHeader>>,
44 inflight_bodies_requests: HashMap<PeerId, InflightBodiesRequest<N::BlockBody>>,
46 peers: HashMap<PeerId, Peer>,
48 peers_handle: PeersHandle,
50 num_active_peers: Arc<AtomicUsize>,
52 queued_requests: VecDeque<DownloadRequest<N>>,
54 download_requests_rx: UnboundedReceiverStream<DownloadRequest<N>>,
56 download_requests_tx: UnboundedSender<DownloadRequest<N>>,
58}
59
60impl<N: NetworkPrimitives> StateFetcher<N> {
63 pub(crate) fn new(peers_handle: PeersHandle, num_active_peers: Arc<AtomicUsize>) -> Self {
64 let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel();
65 Self {
66 inflight_headers_requests: Default::default(),
67 inflight_bodies_requests: Default::default(),
68 peers: Default::default(),
69 peers_handle,
70 num_active_peers,
71 queued_requests: Default::default(),
72 download_requests_rx: UnboundedReceiverStream::new(download_requests_rx),
73 download_requests_tx,
74 }
75 }
76
77 pub(crate) fn new_active_peer(
79 &mut self,
80 peer_id: PeerId,
81 best_hash: B256,
82 best_number: u64,
83 timeout: Arc<AtomicU64>,
84 range_info: Option<BlockRangeInfo>,
85 ) {
86 self.peers.insert(
87 peer_id,
88 Peer {
89 state: PeerState::Idle,
90 best_hash,
91 best_number,
92 timeout,
93 last_response_likely_bad: false,
94 range_info,
95 },
96 );
97 }
98
99 pub(crate) fn on_session_closed(&mut self, peer: &PeerId) {
106 self.peers.remove(peer);
107 if let Some(req) = self.inflight_headers_requests.remove(peer) {
108 let _ = req.response.send(Err(RequestError::ConnectionDropped));
109 }
110 if let Some(req) = self.inflight_bodies_requests.remove(peer) {
111 let _ = req.response.send(Err(RequestError::ConnectionDropped));
112 }
113 }
114
115 pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) -> bool {
119 if let Some(peer) = self.peers.get_mut(peer_id) {
120 if number > peer.best_number {
121 peer.best_hash = hash;
122 peer.best_number = number;
123 return true
124 }
125 }
126 false
127 }
128
129 pub(crate) fn on_pending_disconnect(&mut self, peer_id: &PeerId) {
131 if let Some(peer) = self.peers.get_mut(peer_id) {
132 peer.state = PeerState::Closing;
133 }
134 }
135
136 fn next_best_peer(&self) -> Option<PeerId> {
140 let mut idle = self.peers.iter().filter(|(_, peer)| peer.state.is_idle());
141
142 let mut best_peer = idle.next()?;
143
144 for maybe_better in idle {
145 if best_peer.1.last_response_likely_bad && !maybe_better.1.last_response_likely_bad {
147 best_peer = maybe_better;
148 continue
149 }
150
151 if maybe_better.1.timeout() < best_peer.1.timeout() &&
153 !maybe_better.1.last_response_likely_bad
154 {
155 best_peer = maybe_better;
156 }
157 }
158
159 Some(*best_peer.0)
160 }
161
162 fn poll_action(&mut self) -> PollAction {
164 if self.queued_requests.is_empty() {
166 return PollAction::NoRequests
167 }
168
169 let Some(peer_id) = self.next_best_peer() else { return PollAction::NoPeersAvailable };
170
171 let request = self.queued_requests.pop_front().expect("not empty");
172 let request = self.prepare_block_request(peer_id, request);
173
174 PollAction::Ready(FetchAction::BlockRequest { peer_id, request })
175 }
176
177 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchAction> {
179 loop {
181 let no_peers_available = match self.poll_action() {
182 PollAction::Ready(action) => return Poll::Ready(action),
183 PollAction::NoRequests => false,
184 PollAction::NoPeersAvailable => true,
185 };
186
187 loop {
188 match self.download_requests_rx.poll_next_unpin(cx) {
190 Poll::Ready(Some(request)) => match request.get_priority() {
191 Priority::High => {
192 let pos = self
195 .queued_requests
196 .iter()
197 .position(|req| req.is_normal_priority())
198 .unwrap_or(0);
199 self.queued_requests.insert(pos, request);
200 }
201 Priority::Normal => {
202 self.queued_requests.push_back(request);
203 }
204 },
205 Poll::Ready(None) => {
206 unreachable!("channel can't close")
207 }
208 Poll::Pending => break,
209 }
210 }
211
212 if self.queued_requests.is_empty() || no_peers_available {
213 return Poll::Pending
214 }
215 }
216 }
217
218 fn prepare_block_request(&mut self, peer_id: PeerId, req: DownloadRequest<N>) -> BlockRequest {
222 if let Some(peer) = self.peers.get_mut(&peer_id) {
224 peer.state = req.peer_state();
225 }
226
227 match req {
228 DownloadRequest::GetBlockHeaders { request, response, .. } => {
229 let inflight = Request { request: request.clone(), response };
230 self.inflight_headers_requests.insert(peer_id, inflight);
231 let HeadersRequest { start, limit, direction } = request;
232 BlockRequest::GetBlockHeaders(GetBlockHeaders {
233 start_block: start,
234 limit,
235 skip: 0,
236 direction,
237 })
238 }
239 DownloadRequest::GetBlockBodies { request, response, .. } => {
240 let inflight = Request { request: request.clone(), response };
241 self.inflight_bodies_requests.insert(peer_id, inflight);
242 BlockRequest::GetBlockBodies(GetBlockBodies(request))
243 }
244 }
245 }
246
247 fn followup_request(&mut self, peer_id: PeerId) -> Option<BlockResponseOutcome> {
251 let req = self.queued_requests.pop_front()?;
252 let req = self.prepare_block_request(peer_id, req);
253 Some(BlockResponseOutcome::Request(peer_id, req))
254 }
255
256 pub(crate) fn on_block_headers_response(
262 &mut self,
263 peer_id: PeerId,
264 res: RequestResult<Vec<N::BlockHeader>>,
265 ) -> Option<BlockResponseOutcome> {
266 let is_error = res.is_err();
267 let maybe_reputation_change = res.reputation_change_err();
268
269 let resp = self.inflight_headers_requests.remove(&peer_id);
270
271 let is_likely_bad_response =
272 resp.as_ref().is_some_and(|r| res.is_likely_bad_headers_response(&r.request));
273
274 if let Some(resp) = resp {
275 let _ = resp.response.send(res.map(|h| (peer_id, h).into()));
277 }
278
279 if let Some(peer) = self.peers.get_mut(&peer_id) {
280 peer.last_response_likely_bad = is_likely_bad_response;
282
283 if peer.state.on_request_finished() && !is_error && !is_likely_bad_response {
286 return self.followup_request(peer_id)
287 }
288 }
289
290 maybe_reputation_change
293 .map(|reputation_change| BlockResponseOutcome::BadResponse(peer_id, reputation_change))
294 }
295
296 pub(crate) fn on_block_bodies_response(
298 &mut self,
299 peer_id: PeerId,
300 res: RequestResult<Vec<N::BlockBody>>,
301 ) -> Option<BlockResponseOutcome> {
302 let is_likely_bad_response = res.as_ref().map_or(true, |bodies| bodies.is_empty());
303
304 if let Some(resp) = self.inflight_bodies_requests.remove(&peer_id) {
305 let _ = resp.response.send(res.map(|b| (peer_id, b).into()));
306 }
307 if let Some(peer) = self.peers.get_mut(&peer_id) {
308 peer.last_response_likely_bad = is_likely_bad_response;
310
311 if peer.state.on_request_finished() && !is_likely_bad_response {
312 return self.followup_request(peer_id)
313 }
314 }
315 None
316 }
317
318 pub(crate) fn client(&self) -> FetchClient<N> {
320 FetchClient {
321 request_tx: self.download_requests_tx.clone(),
322 peers_handle: self.peers_handle.clone(),
323 num_active_peers: Arc::clone(&self.num_active_peers),
324 }
325 }
326}
327
328enum PollAction {
330 Ready(FetchAction),
331 NoRequests,
332 NoPeersAvailable,
333}
334
335#[derive(Debug)]
337struct Peer {
338 state: PeerState,
340 best_hash: B256,
342 best_number: u64,
344 timeout: Arc<AtomicU64>,
346 last_response_likely_bad: bool,
353 #[allow(dead_code)]
355 range_info: Option<BlockRangeInfo>,
356}
357
358impl Peer {
359 fn timeout(&self) -> u64 {
360 self.timeout.load(Ordering::Relaxed)
361 }
362}
363
364#[derive(Debug)]
366enum PeerState {
367 Idle,
369 GetBlockHeaders,
371 GetBlockBodies,
373 Closing,
375}
376
377impl PeerState {
380 const fn is_idle(&self) -> bool {
382 matches!(self, Self::Idle)
383 }
384
385 const fn on_request_finished(&mut self) -> bool {
391 if !matches!(self, Self::Closing) {
392 *self = Self::Idle;
393 return true
394 }
395 false
396 }
397}
398
399#[derive(Debug)]
402struct Request<Req, Resp> {
403 request: Req,
406 response: oneshot::Sender<Resp>,
407}
408
409#[derive(Debug)]
411pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
412 GetBlockHeaders {
414 request: HeadersRequest,
415 response: oneshot::Sender<PeerRequestResult<Vec<N::BlockHeader>>>,
416 priority: Priority,
417 },
418 GetBlockBodies {
420 request: Vec<B256>,
421 response: oneshot::Sender<PeerRequestResult<Vec<N::BlockBody>>>,
422 priority: Priority,
423 #[allow(dead_code)]
424 range_hint: Option<RangeInclusive<u64>>,
425 },
426}
427
428impl<N: NetworkPrimitives> DownloadRequest<N> {
431 const fn peer_state(&self) -> PeerState {
433 match self {
434 Self::GetBlockHeaders { .. } => PeerState::GetBlockHeaders,
435 Self::GetBlockBodies { .. } => PeerState::GetBlockBodies,
436 }
437 }
438
439 const fn get_priority(&self) -> &Priority {
441 match self {
442 Self::GetBlockHeaders { priority, .. } | Self::GetBlockBodies { priority, .. } => {
443 priority
444 }
445 }
446 }
447
448 const fn is_normal_priority(&self) -> bool {
450 self.get_priority().is_normal()
451 }
452}
453
454pub(crate) enum FetchAction {
456 BlockRequest {
458 peer_id: PeerId,
460 request: BlockRequest,
462 },
463}
464
465#[derive(Debug, PartialEq, Eq)]
469pub(crate) enum BlockResponseOutcome {
470 Request(PeerId, BlockRequest),
472 BadResponse(PeerId, ReputationChangeKind),
474}
475
476#[cfg(test)]
477mod tests {
478 use super::*;
479 use crate::{peers::PeersManager, PeersConfig};
480 use alloy_consensus::Header;
481 use alloy_primitives::B512;
482 use std::future::poll_fn;
483
484 #[tokio::test(flavor = "multi_thread")]
485 async fn test_poll_fetcher() {
486 let manager = PeersManager::new(PeersConfig::default());
487 let mut fetcher =
488 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
489
490 poll_fn(move |cx| {
491 assert!(fetcher.poll(cx).is_pending());
492 let (tx, _rx) = oneshot::channel();
493 fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
494 request: vec![],
495 response: tx,
496 priority: Priority::default(),
497 range_hint: None,
498 });
499 assert!(fetcher.poll(cx).is_pending());
500
501 Poll::Ready(())
502 })
503 .await;
504 }
505
506 #[tokio::test]
507 async fn test_peer_rotation() {
508 let manager = PeersManager::new(PeersConfig::default());
509 let mut fetcher =
510 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
511 let peer1 = B512::random();
513 let peer2 = B512::random();
514 fetcher.new_active_peer(peer1, B256::random(), 1, Arc::new(AtomicU64::new(1)), None);
515 fetcher.new_active_peer(peer2, B256::random(), 2, Arc::new(AtomicU64::new(1)), None);
516
517 let first_peer = fetcher.next_best_peer().unwrap();
518 assert!(first_peer == peer1 || first_peer == peer2);
519 fetcher.on_pending_disconnect(&first_peer);
521 let second_peer = fetcher.next_best_peer().unwrap();
523 assert!(first_peer == peer1 || first_peer == peer2);
524 assert_ne!(first_peer, second_peer);
525 fetcher.on_pending_disconnect(&second_peer);
527 assert_eq!(fetcher.next_best_peer(), None);
528 }
529
530 #[tokio::test]
531 async fn test_peer_prioritization() {
532 let manager = PeersManager::new(PeersConfig::default());
533 let mut fetcher =
534 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
535 let peer1 = B512::random();
537 let peer2 = B512::random();
538 let peer3 = B512::random();
539
540 let peer2_timeout = Arc::new(AtomicU64::new(300));
541
542 fetcher.new_active_peer(peer1, B256::random(), 1, Arc::new(AtomicU64::new(30)), None);
543 fetcher.new_active_peer(peer2, B256::random(), 2, Arc::clone(&peer2_timeout), None);
544 fetcher.new_active_peer(peer3, B256::random(), 3, Arc::new(AtomicU64::new(50)), None);
545
546 assert_eq!(fetcher.next_best_peer(), Some(peer1));
548 assert_eq!(fetcher.next_best_peer(), Some(peer1));
549 peer2_timeout.store(10, Ordering::Relaxed);
551 assert_eq!(fetcher.next_best_peer(), Some(peer2));
553 assert_eq!(fetcher.next_best_peer(), Some(peer2));
554 }
555
556 #[tokio::test]
557 async fn test_on_block_headers_response() {
558 let manager = PeersManager::new(PeersConfig::default());
559 let mut fetcher =
560 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
561 let peer_id = B512::random();
562
563 assert_eq!(fetcher.on_block_headers_response(peer_id, Ok(vec![Header::default()])), None);
564
565 assert_eq!(
566 fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)),
567 Some(BlockResponseOutcome::BadResponse(peer_id, ReputationChangeKind::Timeout))
568 );
569 assert_eq!(
570 fetcher.on_block_headers_response(peer_id, Err(RequestError::BadResponse)),
571 None
572 );
573 assert_eq!(
574 fetcher.on_block_headers_response(peer_id, Err(RequestError::ChannelClosed)),
575 None
576 );
577 assert_eq!(
578 fetcher.on_block_headers_response(peer_id, Err(RequestError::ConnectionDropped)),
579 None
580 );
581 assert_eq!(
582 fetcher.on_block_headers_response(peer_id, Err(RequestError::UnsupportedCapability)),
583 None
584 );
585 }
586
587 #[tokio::test]
588 async fn test_header_response_outcome() {
589 let manager = PeersManager::new(PeersConfig::default());
590 let mut fetcher =
591 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
592 let peer_id = B512::random();
593
594 let request_pair = || {
595 let (tx, _rx) = oneshot::channel();
596 let req = Request {
597 request: HeadersRequest {
598 start: 0u64.into(),
599 limit: 1,
600 direction: Default::default(),
601 },
602 response: tx,
603 };
604 let header = Header { number: 0, ..Default::default() };
605 (req, header)
606 };
607
608 fetcher.new_active_peer(
609 peer_id,
610 Default::default(),
611 Default::default(),
612 Default::default(),
613 None,
614 );
615
616 let (req, header) = request_pair();
617 fetcher.inflight_headers_requests.insert(peer_id, req);
618
619 let outcome = fetcher.on_block_headers_response(peer_id, Ok(vec![header]));
620 assert!(outcome.is_none());
621 assert!(fetcher.peers[&peer_id].state.is_idle());
622
623 let outcome =
624 fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)).unwrap();
625
626 assert!(EthResponseValidator::reputation_change_err(&Err::<Vec<Header>, _>(
627 RequestError::Timeout
628 ))
629 .is_some());
630
631 match outcome {
632 BlockResponseOutcome::BadResponse(peer, _) => {
633 assert_eq!(peer, peer_id)
634 }
635 BlockResponseOutcome::Request(_, _) => {
636 unreachable!()
637 }
638 };
639
640 assert!(fetcher.peers[&peer_id].state.is_idle());
641 }
642}