Skip to main content

reth_network/fetch/
client.rs

1//! A client implementation that can interact with the network and download data.
2
3use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse};
4use alloy_primitives::B256;
5use futures::{future, future::Either};
6use reth_eth_wire::{BlockAccessLists, EthNetworkPrimitives, NetworkPrimitives};
7use reth_network_api::test_utils::PeersHandle;
8use reth_network_p2p::{
9    block_access_lists::client::BlockAccessListsClient,
10    bodies::client::{BodiesClient, BodiesFut},
11    download::DownloadClient,
12    error::{PeerRequestResult, RequestError},
13    headers::client::{HeadersClient, HeadersRequest},
14    priority::Priority,
15    receipts::client::{ReceiptsClient, ReceiptsFut},
16    BlockClient,
17};
18use reth_network_peers::PeerId;
19use reth_network_types::ReputationChangeKind;
20use std::{
21    ops::RangeInclusive,
22    sync::{
23        atomic::{AtomicUsize, Ordering},
24        Arc,
25    },
26};
27use tokio::sync::{mpsc::UnboundedSender, oneshot};
28
29#[cfg_attr(doc, aquamarine::aquamarine)]
30/// Front-end API for fetching data from the network.
31///
32/// Following diagram illustrates how a request, See [`HeadersClient::get_headers`] and
33/// [`BodiesClient::get_block_bodies`] is handled internally.
34///
35/// include_mmd!("docs/mermaid/fetch-client.mmd")
36#[derive(Debug, Clone)]
37pub struct FetchClient<N: NetworkPrimitives = EthNetworkPrimitives> {
38    /// Sender half of the request channel.
39    pub(crate) request_tx: UnboundedSender<DownloadRequest<N>>,
40    /// The handle to the peers
41    pub(crate) peers_handle: PeersHandle,
42    /// Number of active peer sessions the node's currently handling.
43    pub(crate) num_active_peers: Arc<AtomicUsize>,
44}
45
46impl<N: NetworkPrimitives> DownloadClient for FetchClient<N> {
47    fn report_bad_message(&self, peer_id: PeerId) {
48        self.peers_handle.reputation_change(peer_id, ReputationChangeKind::BadMessage);
49    }
50
51    fn num_connected_peers(&self) -> usize {
52        self.num_active_peers.load(Ordering::Relaxed)
53    }
54}
55
56// The `Output` future of the [HeadersClient] impl of [FetchClient] that either returns a response
57// or an error.
58type HeadersClientFuture<T> = Either<FlattenedResponse<T>, future::Ready<T>>;
59
60impl<N: NetworkPrimitives> HeadersClient for FetchClient<N> {
61    type Header = N::BlockHeader;
62    type Output = HeadersClientFuture<PeerRequestResult<Vec<N::BlockHeader>>>;
63
64    /// Sends a `GetBlockHeaders` request to an available peer.
65    fn get_headers_with_priority(
66        &self,
67        request: HeadersRequest,
68        priority: Priority,
69    ) -> Self::Output {
70        let (response, rx) = oneshot::channel();
71        if self
72            .request_tx
73            .send(DownloadRequest::GetBlockHeaders { request, response, priority })
74            .is_ok()
75        {
76            Either::Left(FlattenedResponse::from(rx))
77        } else {
78            Either::Right(future::err(RequestError::ChannelClosed))
79        }
80    }
81}
82
83impl<N: NetworkPrimitives> BodiesClient for FetchClient<N> {
84    type Body = N::BlockBody;
85    type Output = BodiesFut<N::BlockBody>;
86
87    /// Sends a `GetBlockBodies` request to an available peer.
88    fn get_block_bodies_with_priority_and_range_hint(
89        &self,
90        request: Vec<B256>,
91        priority: Priority,
92        range_hint: Option<RangeInclusive<u64>>,
93    ) -> Self::Output {
94        let (response, rx) = oneshot::channel();
95        if self
96            .request_tx
97            .send(DownloadRequest::GetBlockBodies { request, response, priority, range_hint })
98            .is_ok()
99        {
100            Box::pin(FlattenedResponse::from(rx))
101        } else {
102            Box::pin(future::err(RequestError::ChannelClosed))
103        }
104    }
105}
106
107impl<N: NetworkPrimitives> ReceiptsClient for FetchClient<N> {
108    type Receipt = N::Receipt;
109    type Output = ReceiptsFut<N::Receipt>;
110
111    fn get_receipts_with_priority(&self, request: Vec<B256>, priority: Priority) -> Self::Output {
112        let (response, rx) = oneshot::channel();
113        if self
114            .request_tx
115            .send(DownloadRequest::GetReceipts { request, response, priority })
116            .is_ok()
117        {
118            Box::pin(FlattenedResponse::from(rx))
119        } else {
120            Box::pin(future::err(RequestError::ChannelClosed))
121        }
122    }
123}
124
125impl<N: NetworkPrimitives> BlockClient for FetchClient<N> {
126    type Block = N::Block;
127}
128
129impl<N: NetworkPrimitives> BlockAccessListsClient for FetchClient<N> {
130    type Output =
131        std::pin::Pin<Box<dyn Future<Output = PeerRequestResult<BlockAccessLists>> + Send + Sync>>;
132
133    /// Sends a `GetBlockAccessLists` request to an available peer.
134    fn get_block_access_lists_with_priority(
135        &self,
136        hashes: Vec<B256>,
137        priority: Priority,
138    ) -> Self::Output {
139        let (response, rx) = oneshot::channel();
140        if self
141            .request_tx
142            .send(DownloadRequest::GetBlockAccessLists { request: hashes, response, priority })
143            .is_ok()
144        {
145            Box::pin(FlattenedResponse::from(rx))
146        } else {
147            Box::pin(future::err(RequestError::ChannelClosed))
148        }
149    }
150}