reth_network/fetch/
client.rs1use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse};
4use alloy_primitives::B256;
5use futures::{future, future::Either};
6use reth_eth_wire::{BlockAccessLists, EthNetworkPrimitives, NetworkPrimitives};
7use reth_network_api::test_utils::PeersHandle;
8use reth_network_p2p::{
9 block_access_lists::client::BlockAccessListsClient,
10 bodies::client::{BodiesClient, BodiesFut},
11 download::DownloadClient,
12 error::{PeerRequestResult, RequestError},
13 headers::client::{HeadersClient, HeadersRequest},
14 priority::Priority,
15 receipts::client::{ReceiptsClient, ReceiptsFut},
16 BlockClient,
17};
18use reth_network_peers::PeerId;
19use reth_network_types::ReputationChangeKind;
20use std::{
21 ops::RangeInclusive,
22 sync::{
23 atomic::{AtomicUsize, Ordering},
24 Arc,
25 },
26};
27use tokio::sync::{mpsc::UnboundedSender, oneshot};
28
29#[cfg_attr(doc, aquamarine::aquamarine)]
30#[derive(Debug, Clone)]
37pub struct FetchClient<N: NetworkPrimitives = EthNetworkPrimitives> {
38 pub(crate) request_tx: UnboundedSender<DownloadRequest<N>>,
40 pub(crate) peers_handle: PeersHandle,
42 pub(crate) num_active_peers: Arc<AtomicUsize>,
44}
45
46impl<N: NetworkPrimitives> DownloadClient for FetchClient<N> {
47 fn report_bad_message(&self, peer_id: PeerId) {
48 self.peers_handle.reputation_change(peer_id, ReputationChangeKind::BadMessage);
49 }
50
51 fn num_connected_peers(&self) -> usize {
52 self.num_active_peers.load(Ordering::Relaxed)
53 }
54}
55
56type HeadersClientFuture<T> = Either<FlattenedResponse<T>, future::Ready<T>>;
59
60impl<N: NetworkPrimitives> HeadersClient for FetchClient<N> {
61 type Header = N::BlockHeader;
62 type Output = HeadersClientFuture<PeerRequestResult<Vec<N::BlockHeader>>>;
63
64 fn get_headers_with_priority(
66 &self,
67 request: HeadersRequest,
68 priority: Priority,
69 ) -> Self::Output {
70 let (response, rx) = oneshot::channel();
71 if self
72 .request_tx
73 .send(DownloadRequest::GetBlockHeaders { request, response, priority })
74 .is_ok()
75 {
76 Either::Left(FlattenedResponse::from(rx))
77 } else {
78 Either::Right(future::err(RequestError::ChannelClosed))
79 }
80 }
81}
82
83impl<N: NetworkPrimitives> BodiesClient for FetchClient<N> {
84 type Body = N::BlockBody;
85 type Output = BodiesFut<N::BlockBody>;
86
87 fn get_block_bodies_with_priority_and_range_hint(
89 &self,
90 request: Vec<B256>,
91 priority: Priority,
92 range_hint: Option<RangeInclusive<u64>>,
93 ) -> Self::Output {
94 let (response, rx) = oneshot::channel();
95 if self
96 .request_tx
97 .send(DownloadRequest::GetBlockBodies { request, response, priority, range_hint })
98 .is_ok()
99 {
100 Box::pin(FlattenedResponse::from(rx))
101 } else {
102 Box::pin(future::err(RequestError::ChannelClosed))
103 }
104 }
105}
106
107impl<N: NetworkPrimitives> ReceiptsClient for FetchClient<N> {
108 type Receipt = N::Receipt;
109 type Output = ReceiptsFut<N::Receipt>;
110
111 fn get_receipts_with_priority(&self, request: Vec<B256>, priority: Priority) -> Self::Output {
112 let (response, rx) = oneshot::channel();
113 if self
114 .request_tx
115 .send(DownloadRequest::GetReceipts { request, response, priority })
116 .is_ok()
117 {
118 Box::pin(FlattenedResponse::from(rx))
119 } else {
120 Box::pin(future::err(RequestError::ChannelClosed))
121 }
122 }
123}
124
125impl<N: NetworkPrimitives> BlockClient for FetchClient<N> {
126 type Block = N::Block;
127}
128
129impl<N: NetworkPrimitives> BlockAccessListsClient for FetchClient<N> {
130 type Output =
131 std::pin::Pin<Box<dyn Future<Output = PeerRequestResult<BlockAccessLists>> + Send + Sync>>;
132
133 fn get_block_access_lists_with_priority(
135 &self,
136 hashes: Vec<B256>,
137 priority: Priority,
138 ) -> Self::Output {
139 let (response, rx) = oneshot::channel();
140 if self
141 .request_tx
142 .send(DownloadRequest::GetBlockAccessLists { request: hashes, response, priority })
143 .is_ok()
144 {
145 Box::pin(FlattenedResponse::from(rx))
146 } else {
147 Box::pin(future::err(RequestError::ChannelClosed))
148 }
149 }
150}