reth_network/fetch/
client.rs1use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse};
4use alloy_primitives::B256;
5use futures::{future, future::Either};
6use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
7use reth_network_api::test_utils::PeersHandle;
8use reth_network_p2p::{
9 bodies::client::{BodiesClient, BodiesFut},
10 download::DownloadClient,
11 error::{PeerRequestResult, RequestError},
12 headers::client::{HeadersClient, HeadersRequest},
13 priority::Priority,
14 receipts::client::{ReceiptsClient, ReceiptsFut},
15 BlockClient,
16};
17use reth_network_peers::PeerId;
18use reth_network_types::ReputationChangeKind;
19use std::{
20 ops::RangeInclusive,
21 sync::{
22 atomic::{AtomicUsize, Ordering},
23 Arc,
24 },
25};
26use tokio::sync::{mpsc::UnboundedSender, oneshot};
27
28#[cfg_attr(doc, aquamarine::aquamarine)]
29#[derive(Debug, Clone)]
36pub struct FetchClient<N: NetworkPrimitives = EthNetworkPrimitives> {
37 pub(crate) request_tx: UnboundedSender<DownloadRequest<N>>,
39 pub(crate) peers_handle: PeersHandle,
41 pub(crate) num_active_peers: Arc<AtomicUsize>,
43}
44
45impl<N: NetworkPrimitives> DownloadClient for FetchClient<N> {
46 fn report_bad_message(&self, peer_id: PeerId) {
47 self.peers_handle.reputation_change(peer_id, ReputationChangeKind::BadMessage);
48 }
49
50 fn num_connected_peers(&self) -> usize {
51 self.num_active_peers.load(Ordering::Relaxed)
52 }
53}
54
55type HeadersClientFuture<T> = Either<FlattenedResponse<T>, future::Ready<T>>;
58
59impl<N: NetworkPrimitives> HeadersClient for FetchClient<N> {
60 type Header = N::BlockHeader;
61 type Output = HeadersClientFuture<PeerRequestResult<Vec<N::BlockHeader>>>;
62
63 fn get_headers_with_priority(
65 &self,
66 request: HeadersRequest,
67 priority: Priority,
68 ) -> Self::Output {
69 let (response, rx) = oneshot::channel();
70 if self
71 .request_tx
72 .send(DownloadRequest::GetBlockHeaders { request, response, priority })
73 .is_ok()
74 {
75 Either::Left(FlattenedResponse::from(rx))
76 } else {
77 Either::Right(future::err(RequestError::ChannelClosed))
78 }
79 }
80}
81
82impl<N: NetworkPrimitives> BodiesClient for FetchClient<N> {
83 type Body = N::BlockBody;
84 type Output = BodiesFut<N::BlockBody>;
85
86 fn get_block_bodies_with_priority_and_range_hint(
88 &self,
89 request: Vec<B256>,
90 priority: Priority,
91 range_hint: Option<RangeInclusive<u64>>,
92 ) -> Self::Output {
93 let (response, rx) = oneshot::channel();
94 if self
95 .request_tx
96 .send(DownloadRequest::GetBlockBodies { request, response, priority, range_hint })
97 .is_ok()
98 {
99 Box::pin(FlattenedResponse::from(rx))
100 } else {
101 Box::pin(future::err(RequestError::ChannelClosed))
102 }
103 }
104}
105
106impl<N: NetworkPrimitives> ReceiptsClient for FetchClient<N> {
107 type Receipt = N::Receipt;
108 type Output = ReceiptsFut<N::Receipt>;
109
110 fn get_receipts_with_priority(&self, request: Vec<B256>, priority: Priority) -> Self::Output {
111 let (response, rx) = oneshot::channel();
112 if self
113 .request_tx
114 .send(DownloadRequest::GetReceipts { request, response, priority })
115 .is_ok()
116 {
117 Box::pin(FlattenedResponse::from(rx))
118 } else {
119 Box::pin(future::err(RequestError::ChannelClosed))
120 }
121 }
122}
123
124impl<N: NetworkPrimitives> BlockClient for FetchClient<N> {
125 type Block = N::Block;
126}