reth_network/fetch/
client.rs
1use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse};
4use alloy_primitives::B256;
5use futures::{future, future::Either};
6use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
7use reth_network_api::test_utils::PeersHandle;
8use reth_network_p2p::{
9 bodies::client::{BodiesClient, BodiesFut},
10 download::DownloadClient,
11 error::{PeerRequestResult, RequestError},
12 headers::client::{HeadersClient, HeadersRequest},
13 priority::Priority,
14 BlockClient,
15};
16use reth_network_peers::PeerId;
17use reth_network_types::ReputationChangeKind;
18use std::sync::{
19 atomic::{AtomicUsize, Ordering},
20 Arc,
21};
22use tokio::sync::{mpsc::UnboundedSender, oneshot};
23
24#[cfg_attr(doc, aquamarine::aquamarine)]
25#[derive(Debug, Clone)]
32pub struct FetchClient<N: NetworkPrimitives = EthNetworkPrimitives> {
33 pub(crate) request_tx: UnboundedSender<DownloadRequest<N>>,
35 pub(crate) peers_handle: PeersHandle,
37 pub(crate) num_active_peers: Arc<AtomicUsize>,
39}
40
41impl<N: NetworkPrimitives> DownloadClient for FetchClient<N> {
42 fn report_bad_message(&self, peer_id: PeerId) {
43 self.peers_handle.reputation_change(peer_id, ReputationChangeKind::BadMessage);
44 }
45
46 fn num_connected_peers(&self) -> usize {
47 self.num_active_peers.load(Ordering::Relaxed)
48 }
49}
50
51type HeadersClientFuture<T> = Either<FlattenedResponse<T>, future::Ready<T>>;
54
55impl<N: NetworkPrimitives> HeadersClient for FetchClient<N> {
56 type Header = N::BlockHeader;
57 type Output = HeadersClientFuture<PeerRequestResult<Vec<N::BlockHeader>>>;
58
59 fn get_headers_with_priority(
61 &self,
62 request: HeadersRequest,
63 priority: Priority,
64 ) -> Self::Output {
65 let (response, rx) = oneshot::channel();
66 if self
67 .request_tx
68 .send(DownloadRequest::GetBlockHeaders { request, response, priority })
69 .is_ok()
70 {
71 Either::Left(FlattenedResponse::from(rx))
72 } else {
73 Either::Right(future::err(RequestError::ChannelClosed))
74 }
75 }
76}
77
78impl<N: NetworkPrimitives> BodiesClient for FetchClient<N> {
79 type Body = N::BlockBody;
80 type Output = BodiesFut<N::BlockBody>;
81
82 fn get_block_bodies_with_priority(
84 &self,
85 request: Vec<B256>,
86 priority: Priority,
87 ) -> Self::Output {
88 let (response, rx) = oneshot::channel();
89 if self
90 .request_tx
91 .send(DownloadRequest::GetBlockBodies { request, response, priority })
92 .is_ok()
93 {
94 Box::pin(FlattenedResponse::from(rx))
95 } else {
96 Box::pin(future::err(RequestError::ChannelClosed))
97 }
98 }
99}
100
101impl<N: NetworkPrimitives> BlockClient for FetchClient<N> {
102 type Block = N::Block;
103}