reth_network/fetch/
client.rs1use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse};
4use alloy_primitives::B256;
5use futures::{future, future::Either};
6use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
7use reth_network_api::test_utils::PeersHandle;
8use reth_network_p2p::{
9 bodies::client::{BodiesClient, BodiesFut},
10 download::DownloadClient,
11 error::{PeerRequestResult, RequestError},
12 headers::client::{HeadersClient, HeadersRequest},
13 priority::Priority,
14 BlockClient,
15};
16use reth_network_peers::PeerId;
17use reth_network_types::ReputationChangeKind;
18use std::{
19 ops::RangeInclusive,
20 sync::{
21 atomic::{AtomicUsize, Ordering},
22 Arc,
23 },
24};
25use tokio::sync::{mpsc::UnboundedSender, oneshot};
26
27#[cfg_attr(doc, aquamarine::aquamarine)]
28#[derive(Debug, Clone)]
35pub struct FetchClient<N: NetworkPrimitives = EthNetworkPrimitives> {
36 pub(crate) request_tx: UnboundedSender<DownloadRequest<N>>,
38 pub(crate) peers_handle: PeersHandle,
40 pub(crate) num_active_peers: Arc<AtomicUsize>,
42}
43
44impl<N: NetworkPrimitives> DownloadClient for FetchClient<N> {
45 fn report_bad_message(&self, peer_id: PeerId) {
46 self.peers_handle.reputation_change(peer_id, ReputationChangeKind::BadMessage);
47 }
48
49 fn num_connected_peers(&self) -> usize {
50 self.num_active_peers.load(Ordering::Relaxed)
51 }
52}
53
54type HeadersClientFuture<T> = Either<FlattenedResponse<T>, future::Ready<T>>;
57
58impl<N: NetworkPrimitives> HeadersClient for FetchClient<N> {
59 type Header = N::BlockHeader;
60 type Output = HeadersClientFuture<PeerRequestResult<Vec<N::BlockHeader>>>;
61
62 fn get_headers_with_priority(
64 &self,
65 request: HeadersRequest,
66 priority: Priority,
67 ) -> Self::Output {
68 let (response, rx) = oneshot::channel();
69 if self
70 .request_tx
71 .send(DownloadRequest::GetBlockHeaders { request, response, priority })
72 .is_ok()
73 {
74 Either::Left(FlattenedResponse::from(rx))
75 } else {
76 Either::Right(future::err(RequestError::ChannelClosed))
77 }
78 }
79}
80
81impl<N: NetworkPrimitives> BodiesClient for FetchClient<N> {
82 type Body = N::BlockBody;
83 type Output = BodiesFut<N::BlockBody>;
84
85 fn get_block_bodies_with_priority_and_range_hint(
87 &self,
88 request: Vec<B256>,
89 priority: Priority,
90 range_hint: Option<RangeInclusive<u64>>,
91 ) -> Self::Output {
92 let (response, rx) = oneshot::channel();
93 if self
94 .request_tx
95 .send(DownloadRequest::GetBlockBodies { request, response, priority, range_hint })
96 .is_ok()
97 {
98 Box::pin(FlattenedResponse::from(rx))
99 } else {
100 Box::pin(future::err(RequestError::ChannelClosed))
101 }
102 }
103}
104
105impl<N: NetworkPrimitives> BlockClient for FetchClient<N> {
106 type Block = N::Block;
107}