Skip to main content

reth_network/fetch/
client.rs

1//! A client implementation that can interact with the network and download data.
2
3use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse};
4use alloy_primitives::B256;
5use futures::{future, future::Either};
6use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
7use reth_network_api::test_utils::PeersHandle;
8use reth_network_p2p::{
9    bodies::client::{BodiesClient, BodiesFut},
10    download::DownloadClient,
11    error::{PeerRequestResult, RequestError},
12    headers::client::{HeadersClient, HeadersRequest},
13    priority::Priority,
14    receipts::client::{ReceiptsClient, ReceiptsFut},
15    BlockClient,
16};
17use reth_network_peers::PeerId;
18use reth_network_types::ReputationChangeKind;
19use std::{
20    ops::RangeInclusive,
21    sync::{
22        atomic::{AtomicUsize, Ordering},
23        Arc,
24    },
25};
26use tokio::sync::{mpsc::UnboundedSender, oneshot};
27
28#[cfg_attr(doc, aquamarine::aquamarine)]
29/// Front-end API for fetching data from the network.
30///
31/// Following diagram illustrates how a request, See [`HeadersClient::get_headers`] and
32/// [`BodiesClient::get_block_bodies`] is handled internally.
33///
34/// include_mmd!("docs/mermaid/fetch-client.mmd")
35#[derive(Debug, Clone)]
36pub struct FetchClient<N: NetworkPrimitives = EthNetworkPrimitives> {
37    /// Sender half of the request channel.
38    pub(crate) request_tx: UnboundedSender<DownloadRequest<N>>,
39    /// The handle to the peers
40    pub(crate) peers_handle: PeersHandle,
41    /// Number of active peer sessions the node's currently handling.
42    pub(crate) num_active_peers: Arc<AtomicUsize>,
43}
44
45impl<N: NetworkPrimitives> DownloadClient for FetchClient<N> {
46    fn report_bad_message(&self, peer_id: PeerId) {
47        self.peers_handle.reputation_change(peer_id, ReputationChangeKind::BadMessage);
48    }
49
50    fn num_connected_peers(&self) -> usize {
51        self.num_active_peers.load(Ordering::Relaxed)
52    }
53}
54
55// The `Output` future of the [HeadersClient] impl of [FetchClient] that either returns a response
56// or an error.
57type HeadersClientFuture<T> = Either<FlattenedResponse<T>, future::Ready<T>>;
58
59impl<N: NetworkPrimitives> HeadersClient for FetchClient<N> {
60    type Header = N::BlockHeader;
61    type Output = HeadersClientFuture<PeerRequestResult<Vec<N::BlockHeader>>>;
62
63    /// Sends a `GetBlockHeaders` request to an available peer.
64    fn get_headers_with_priority(
65        &self,
66        request: HeadersRequest,
67        priority: Priority,
68    ) -> Self::Output {
69        let (response, rx) = oneshot::channel();
70        if self
71            .request_tx
72            .send(DownloadRequest::GetBlockHeaders { request, response, priority })
73            .is_ok()
74        {
75            Either::Left(FlattenedResponse::from(rx))
76        } else {
77            Either::Right(future::err(RequestError::ChannelClosed))
78        }
79    }
80}
81
82impl<N: NetworkPrimitives> BodiesClient for FetchClient<N> {
83    type Body = N::BlockBody;
84    type Output = BodiesFut<N::BlockBody>;
85
86    /// Sends a `GetBlockBodies` request to an available peer.
87    fn get_block_bodies_with_priority_and_range_hint(
88        &self,
89        request: Vec<B256>,
90        priority: Priority,
91        range_hint: Option<RangeInclusive<u64>>,
92    ) -> Self::Output {
93        let (response, rx) = oneshot::channel();
94        if self
95            .request_tx
96            .send(DownloadRequest::GetBlockBodies { request, response, priority, range_hint })
97            .is_ok()
98        {
99            Box::pin(FlattenedResponse::from(rx))
100        } else {
101            Box::pin(future::err(RequestError::ChannelClosed))
102        }
103    }
104}
105
106impl<N: NetworkPrimitives> ReceiptsClient for FetchClient<N> {
107    type Receipt = N::Receipt;
108    type Output = ReceiptsFut<N::Receipt>;
109
110    fn get_receipts_with_priority(&self, request: Vec<B256>, priority: Priority) -> Self::Output {
111        let (response, rx) = oneshot::channel();
112        if self
113            .request_tx
114            .send(DownloadRequest::GetReceipts { request, response, priority })
115            .is_ok()
116        {
117            Box::pin(FlattenedResponse::from(rx))
118        } else {
119            Box::pin(future::err(RequestError::ChannelClosed))
120        }
121    }
122}
123
124impl<N: NetworkPrimitives> BlockClient for FetchClient<N> {
125    type Block = N::Block;
126}