1mod client;
4
5pub use client::FetchClient;
6
7use crate::message::BlockRequest;
8use alloy_primitives::B256;
9use futures::StreamExt;
10use reth_eth_wire::{EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives};
11use reth_network_api::test_utils::PeersHandle;
12use reth_network_p2p::{
13 error::{EthResponseValidator, PeerRequestResult, RequestError, RequestResult},
14 headers::client::HeadersRequest,
15 priority::Priority,
16};
17use reth_network_peers::PeerId;
18use reth_network_types::ReputationChangeKind;
19use std::{
20 collections::{HashMap, VecDeque},
21 sync::{
22 atomic::{AtomicU64, AtomicUsize, Ordering},
23 Arc,
24 },
25 task::{Context, Poll},
26};
27use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
28use tokio_stream::wrappers::UnboundedReceiverStream;
29
30type InflightHeadersRequest<H> = Request<HeadersRequest, PeerRequestResult<Vec<H>>>;
31type InflightBodiesRequest<B> = Request<Vec<B256>, PeerRequestResult<Vec<B>>>;
32
33#[derive(Debug)]
40pub struct StateFetcher<N: NetworkPrimitives = EthNetworkPrimitives> {
41 inflight_headers_requests: HashMap<PeerId, InflightHeadersRequest<N::BlockHeader>>,
43 inflight_bodies_requests: HashMap<PeerId, InflightBodiesRequest<N::BlockBody>>,
45 peers: HashMap<PeerId, Peer>,
47 peers_handle: PeersHandle,
49 num_active_peers: Arc<AtomicUsize>,
51 queued_requests: VecDeque<DownloadRequest<N>>,
53 download_requests_rx: UnboundedReceiverStream<DownloadRequest<N>>,
55 download_requests_tx: UnboundedSender<DownloadRequest<N>>,
57}
58
59impl<N: NetworkPrimitives> StateFetcher<N> {
62 pub(crate) fn new(peers_handle: PeersHandle, num_active_peers: Arc<AtomicUsize>) -> Self {
63 let (download_requests_tx, download_requests_rx) = mpsc::unbounded_channel();
64 Self {
65 inflight_headers_requests: Default::default(),
66 inflight_bodies_requests: Default::default(),
67 peers: Default::default(),
68 peers_handle,
69 num_active_peers,
70 queued_requests: Default::default(),
71 download_requests_rx: UnboundedReceiverStream::new(download_requests_rx),
72 download_requests_tx,
73 }
74 }
75
76 pub(crate) fn new_active_peer(
78 &mut self,
79 peer_id: PeerId,
80 best_hash: B256,
81 best_number: u64,
82 timeout: Arc<AtomicU64>,
83 ) {
84 self.peers.insert(
85 peer_id,
86 Peer {
87 state: PeerState::Idle,
88 best_hash,
89 best_number,
90 timeout,
91 last_response_likely_bad: false,
92 },
93 );
94 }
95
96 pub(crate) fn on_session_closed(&mut self, peer: &PeerId) {
103 self.peers.remove(peer);
104 if let Some(req) = self.inflight_headers_requests.remove(peer) {
105 let _ = req.response.send(Err(RequestError::ConnectionDropped));
106 }
107 if let Some(req) = self.inflight_bodies_requests.remove(peer) {
108 let _ = req.response.send(Err(RequestError::ConnectionDropped));
109 }
110 }
111
112 pub(crate) fn update_peer_block(&mut self, peer_id: &PeerId, hash: B256, number: u64) -> bool {
116 if let Some(peer) = self.peers.get_mut(peer_id) {
117 if number > peer.best_number {
118 peer.best_hash = hash;
119 peer.best_number = number;
120 return true
121 }
122 }
123 false
124 }
125
126 pub(crate) fn on_pending_disconnect(&mut self, peer_id: &PeerId) {
128 if let Some(peer) = self.peers.get_mut(peer_id) {
129 peer.state = PeerState::Closing;
130 }
131 }
132
133 fn next_best_peer(&self) -> Option<PeerId> {
137 let mut idle = self.peers.iter().filter(|(_, peer)| peer.state.is_idle());
138
139 let mut best_peer = idle.next()?;
140
141 for maybe_better in idle {
142 if best_peer.1.last_response_likely_bad && !maybe_better.1.last_response_likely_bad {
144 best_peer = maybe_better;
145 continue
146 }
147
148 if maybe_better.1.timeout() < best_peer.1.timeout() &&
150 !maybe_better.1.last_response_likely_bad
151 {
152 best_peer = maybe_better;
153 }
154 }
155
156 Some(*best_peer.0)
157 }
158
159 fn poll_action(&mut self) -> PollAction {
161 if self.queued_requests.is_empty() {
163 return PollAction::NoRequests
164 }
165
166 let Some(peer_id) = self.next_best_peer() else { return PollAction::NoPeersAvailable };
167
168 let request = self.queued_requests.pop_front().expect("not empty");
169 let request = self.prepare_block_request(peer_id, request);
170
171 PollAction::Ready(FetchAction::BlockRequest { peer_id, request })
172 }
173
174 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchAction> {
176 loop {
178 let no_peers_available = match self.poll_action() {
179 PollAction::Ready(action) => return Poll::Ready(action),
180 PollAction::NoRequests => false,
181 PollAction::NoPeersAvailable => true,
182 };
183
184 loop {
185 match self.download_requests_rx.poll_next_unpin(cx) {
187 Poll::Ready(Some(request)) => match request.get_priority() {
188 Priority::High => {
189 let pos = self
192 .queued_requests
193 .iter()
194 .position(|req| req.is_normal_priority())
195 .unwrap_or(0);
196 self.queued_requests.insert(pos, request);
197 }
198 Priority::Normal => {
199 self.queued_requests.push_back(request);
200 }
201 },
202 Poll::Ready(None) => {
203 unreachable!("channel can't close")
204 }
205 Poll::Pending => break,
206 }
207 }
208
209 if self.queued_requests.is_empty() || no_peers_available {
210 return Poll::Pending
211 }
212 }
213 }
214
215 fn prepare_block_request(&mut self, peer_id: PeerId, req: DownloadRequest<N>) -> BlockRequest {
219 if let Some(peer) = self.peers.get_mut(&peer_id) {
221 peer.state = req.peer_state();
222 }
223
224 match req {
225 DownloadRequest::GetBlockHeaders { request, response, .. } => {
226 let inflight = Request { request: request.clone(), response };
227 self.inflight_headers_requests.insert(peer_id, inflight);
228 let HeadersRequest { start, limit, direction } = request;
229 BlockRequest::GetBlockHeaders(GetBlockHeaders {
230 start_block: start,
231 limit,
232 skip: 0,
233 direction,
234 })
235 }
236 DownloadRequest::GetBlockBodies { request, response, .. } => {
237 let inflight = Request { request: request.clone(), response };
238 self.inflight_bodies_requests.insert(peer_id, inflight);
239 BlockRequest::GetBlockBodies(GetBlockBodies(request))
240 }
241 }
242 }
243
244 fn followup_request(&mut self, peer_id: PeerId) -> Option<BlockResponseOutcome> {
248 let req = self.queued_requests.pop_front()?;
249 let req = self.prepare_block_request(peer_id, req);
250 Some(BlockResponseOutcome::Request(peer_id, req))
251 }
252
253 pub(crate) fn on_block_headers_response(
259 &mut self,
260 peer_id: PeerId,
261 res: RequestResult<Vec<N::BlockHeader>>,
262 ) -> Option<BlockResponseOutcome> {
263 let is_error = res.is_err();
264 let maybe_reputation_change = res.reputation_change_err();
265
266 let resp = self.inflight_headers_requests.remove(&peer_id);
267
268 let is_likely_bad_response =
269 resp.as_ref().is_some_and(|r| res.is_likely_bad_headers_response(&r.request));
270
271 if let Some(resp) = resp {
272 let _ = resp.response.send(res.map(|h| (peer_id, h).into()));
274 }
275
276 if let Some(peer) = self.peers.get_mut(&peer_id) {
277 peer.last_response_likely_bad = is_likely_bad_response;
279
280 if peer.state.on_request_finished() && !is_error && !is_likely_bad_response {
283 return self.followup_request(peer_id)
284 }
285 }
286
287 maybe_reputation_change
290 .map(|reputation_change| BlockResponseOutcome::BadResponse(peer_id, reputation_change))
291 }
292
293 pub(crate) fn on_block_bodies_response(
295 &mut self,
296 peer_id: PeerId,
297 res: RequestResult<Vec<N::BlockBody>>,
298 ) -> Option<BlockResponseOutcome> {
299 let is_likely_bad_response = res.as_ref().map_or(true, |bodies| bodies.is_empty());
300
301 if let Some(resp) = self.inflight_bodies_requests.remove(&peer_id) {
302 let _ = resp.response.send(res.map(|b| (peer_id, b).into()));
303 }
304 if let Some(peer) = self.peers.get_mut(&peer_id) {
305 peer.last_response_likely_bad = is_likely_bad_response;
307
308 if peer.state.on_request_finished() && !is_likely_bad_response {
309 return self.followup_request(peer_id)
310 }
311 }
312 None
313 }
314
315 pub(crate) fn client(&self) -> FetchClient<N> {
317 FetchClient {
318 request_tx: self.download_requests_tx.clone(),
319 peers_handle: self.peers_handle.clone(),
320 num_active_peers: Arc::clone(&self.num_active_peers),
321 }
322 }
323}
324
325enum PollAction {
327 Ready(FetchAction),
328 NoRequests,
329 NoPeersAvailable,
330}
331
332#[derive(Debug)]
334struct Peer {
335 state: PeerState,
337 best_hash: B256,
339 best_number: u64,
341 timeout: Arc<AtomicU64>,
343 last_response_likely_bad: bool,
350}
351
352impl Peer {
353 fn timeout(&self) -> u64 {
354 self.timeout.load(Ordering::Relaxed)
355 }
356}
357
358#[derive(Debug)]
360enum PeerState {
361 Idle,
363 GetBlockHeaders,
365 GetBlockBodies,
367 Closing,
369}
370
371impl PeerState {
374 const fn is_idle(&self) -> bool {
376 matches!(self, Self::Idle)
377 }
378
379 const fn on_request_finished(&mut self) -> bool {
385 if !matches!(self, Self::Closing) {
386 *self = Self::Idle;
387 return true
388 }
389 false
390 }
391}
392
393#[derive(Debug)]
396struct Request<Req, Resp> {
397 request: Req,
400 response: oneshot::Sender<Resp>,
401}
402
403#[derive(Debug)]
405pub(crate) enum DownloadRequest<N: NetworkPrimitives> {
406 GetBlockHeaders {
408 request: HeadersRequest,
409 response: oneshot::Sender<PeerRequestResult<Vec<N::BlockHeader>>>,
410 priority: Priority,
411 },
412 GetBlockBodies {
414 request: Vec<B256>,
415 response: oneshot::Sender<PeerRequestResult<Vec<N::BlockBody>>>,
416 priority: Priority,
417 },
418}
419
420impl<N: NetworkPrimitives> DownloadRequest<N> {
423 const fn peer_state(&self) -> PeerState {
425 match self {
426 Self::GetBlockHeaders { .. } => PeerState::GetBlockHeaders,
427 Self::GetBlockBodies { .. } => PeerState::GetBlockBodies,
428 }
429 }
430
431 const fn get_priority(&self) -> &Priority {
433 match self {
434 Self::GetBlockHeaders { priority, .. } | Self::GetBlockBodies { priority, .. } => {
435 priority
436 }
437 }
438 }
439
440 const fn is_normal_priority(&self) -> bool {
442 self.get_priority().is_normal()
443 }
444}
445
446pub(crate) enum FetchAction {
448 BlockRequest {
450 peer_id: PeerId,
452 request: BlockRequest,
454 },
455}
456
457#[derive(Debug, PartialEq, Eq)]
461pub(crate) enum BlockResponseOutcome {
462 Request(PeerId, BlockRequest),
464 BadResponse(PeerId, ReputationChangeKind),
466}
467
468#[cfg(test)]
469mod tests {
470 use super::*;
471 use crate::{peers::PeersManager, PeersConfig};
472 use alloy_consensus::Header;
473 use alloy_primitives::B512;
474 use std::future::poll_fn;
475
476 #[tokio::test(flavor = "multi_thread")]
477 async fn test_poll_fetcher() {
478 let manager = PeersManager::new(PeersConfig::default());
479 let mut fetcher =
480 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
481
482 poll_fn(move |cx| {
483 assert!(fetcher.poll(cx).is_pending());
484 let (tx, _rx) = oneshot::channel();
485 fetcher.queued_requests.push_back(DownloadRequest::GetBlockBodies {
486 request: vec![],
487 response: tx,
488 priority: Priority::default(),
489 });
490 assert!(fetcher.poll(cx).is_pending());
491
492 Poll::Ready(())
493 })
494 .await;
495 }
496
497 #[tokio::test]
498 async fn test_peer_rotation() {
499 let manager = PeersManager::new(PeersConfig::default());
500 let mut fetcher =
501 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
502 let peer1 = B512::random();
504 let peer2 = B512::random();
505 fetcher.new_active_peer(peer1, B256::random(), 1, Arc::new(AtomicU64::new(1)));
506 fetcher.new_active_peer(peer2, B256::random(), 2, Arc::new(AtomicU64::new(1)));
507
508 let first_peer = fetcher.next_best_peer().unwrap();
509 assert!(first_peer == peer1 || first_peer == peer2);
510 fetcher.on_pending_disconnect(&first_peer);
512 let second_peer = fetcher.next_best_peer().unwrap();
514 assert!(first_peer == peer1 || first_peer == peer2);
515 assert_ne!(first_peer, second_peer);
516 fetcher.on_pending_disconnect(&second_peer);
518 assert_eq!(fetcher.next_best_peer(), None);
519 }
520
521 #[tokio::test]
522 async fn test_peer_prioritization() {
523 let manager = PeersManager::new(PeersConfig::default());
524 let mut fetcher =
525 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
526 let peer1 = B512::random();
528 let peer2 = B512::random();
529 let peer3 = B512::random();
530
531 let peer2_timeout = Arc::new(AtomicU64::new(300));
532
533 fetcher.new_active_peer(peer1, B256::random(), 1, Arc::new(AtomicU64::new(30)));
534 fetcher.new_active_peer(peer2, B256::random(), 2, Arc::clone(&peer2_timeout));
535 fetcher.new_active_peer(peer3, B256::random(), 3, Arc::new(AtomicU64::new(50)));
536
537 assert_eq!(fetcher.next_best_peer(), Some(peer1));
539 assert_eq!(fetcher.next_best_peer(), Some(peer1));
540 peer2_timeout.store(10, Ordering::Relaxed);
542 assert_eq!(fetcher.next_best_peer(), Some(peer2));
544 assert_eq!(fetcher.next_best_peer(), Some(peer2));
545 }
546
547 #[tokio::test]
548 async fn test_on_block_headers_response() {
549 let manager = PeersManager::new(PeersConfig::default());
550 let mut fetcher =
551 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
552 let peer_id = B512::random();
553
554 assert_eq!(fetcher.on_block_headers_response(peer_id, Ok(vec![Header::default()])), None);
555
556 assert_eq!(
557 fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)),
558 Some(BlockResponseOutcome::BadResponse(peer_id, ReputationChangeKind::Timeout))
559 );
560 assert_eq!(
561 fetcher.on_block_headers_response(peer_id, Err(RequestError::BadResponse)),
562 None
563 );
564 assert_eq!(
565 fetcher.on_block_headers_response(peer_id, Err(RequestError::ChannelClosed)),
566 None
567 );
568 assert_eq!(
569 fetcher.on_block_headers_response(peer_id, Err(RequestError::ConnectionDropped)),
570 None
571 );
572 assert_eq!(
573 fetcher.on_block_headers_response(peer_id, Err(RequestError::UnsupportedCapability)),
574 None
575 );
576 }
577
578 #[tokio::test]
579 async fn test_header_response_outcome() {
580 let manager = PeersManager::new(PeersConfig::default());
581 let mut fetcher =
582 StateFetcher::<EthNetworkPrimitives>::new(manager.handle(), Default::default());
583 let peer_id = B512::random();
584
585 let request_pair = || {
586 let (tx, _rx) = oneshot::channel();
587 let req = Request {
588 request: HeadersRequest {
589 start: 0u64.into(),
590 limit: 1,
591 direction: Default::default(),
592 },
593 response: tx,
594 };
595 let header = Header { number: 0, ..Default::default() };
596 (req, header)
597 };
598
599 fetcher.new_active_peer(
600 peer_id,
601 Default::default(),
602 Default::default(),
603 Default::default(),
604 );
605
606 let (req, header) = request_pair();
607 fetcher.inflight_headers_requests.insert(peer_id, req);
608
609 let outcome = fetcher.on_block_headers_response(peer_id, Ok(vec![header]));
610 assert!(outcome.is_none());
611 assert!(fetcher.peers[&peer_id].state.is_idle());
612
613 let outcome =
614 fetcher.on_block_headers_response(peer_id, Err(RequestError::Timeout)).unwrap();
615
616 assert!(EthResponseValidator::reputation_change_err(&Err::<Vec<Header>, _>(
617 RequestError::Timeout
618 ))
619 .is_some());
620
621 match outcome {
622 BlockResponseOutcome::BadResponse(peer, _) => {
623 assert_eq!(peer, peer_id)
624 }
625 BlockResponseOutcome::Request(_, _) => {
626 unreachable!()
627 }
628 };
629
630 assert!(fetcher.peers[&peer_id].state.is_idle());
631 }
632}