reth_network/fetch/
client.rs

1//! A client implementation that can interact with the network and download data.
2
3use crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse};
4use alloy_primitives::B256;
5use futures::{future, future::Either};
6use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
7use reth_network_api::test_utils::PeersHandle;
8use reth_network_p2p::{
9    bodies::client::{BodiesClient, BodiesFut},
10    download::DownloadClient,
11    error::{PeerRequestResult, RequestError},
12    headers::client::{HeadersClient, HeadersRequest},
13    priority::Priority,
14    BlockClient,
15};
16use reth_network_peers::PeerId;
17use reth_network_types::ReputationChangeKind;
18use std::{
19    ops::RangeInclusive,
20    sync::{
21        atomic::{AtomicUsize, Ordering},
22        Arc,
23    },
24};
25use tokio::sync::{mpsc::UnboundedSender, oneshot};
26
27#[cfg_attr(doc, aquamarine::aquamarine)]
28/// Front-end API for fetching data from the network.
29///
30/// Following diagram illustrates how a request, See [`HeadersClient::get_headers`] and
31/// [`BodiesClient::get_block_bodies`] is handled internally.
32///
33/// include_mmd!("docs/mermaid/fetch-client.mmd")
34#[derive(Debug, Clone)]
35pub struct FetchClient<N: NetworkPrimitives = EthNetworkPrimitives> {
36    /// Sender half of the request channel.
37    pub(crate) request_tx: UnboundedSender<DownloadRequest<N>>,
38    /// The handle to the peers
39    pub(crate) peers_handle: PeersHandle,
40    /// Number of active peer sessions the node's currently handling.
41    pub(crate) num_active_peers: Arc<AtomicUsize>,
42}
43
44impl<N: NetworkPrimitives> DownloadClient for FetchClient<N> {
45    fn report_bad_message(&self, peer_id: PeerId) {
46        self.peers_handle.reputation_change(peer_id, ReputationChangeKind::BadMessage);
47    }
48
49    fn num_connected_peers(&self) -> usize {
50        self.num_active_peers.load(Ordering::Relaxed)
51    }
52}
53
54// The `Output` future of the [HeadersClient] impl of [FetchClient] that either returns a response
55// or an error.
56type HeadersClientFuture<T> = Either<FlattenedResponse<T>, future::Ready<T>>;
57
58impl<N: NetworkPrimitives> HeadersClient for FetchClient<N> {
59    type Header = N::BlockHeader;
60    type Output = HeadersClientFuture<PeerRequestResult<Vec<N::BlockHeader>>>;
61
62    /// Sends a `GetBlockHeaders` request to an available peer.
63    fn get_headers_with_priority(
64        &self,
65        request: HeadersRequest,
66        priority: Priority,
67    ) -> Self::Output {
68        let (response, rx) = oneshot::channel();
69        if self
70            .request_tx
71            .send(DownloadRequest::GetBlockHeaders { request, response, priority })
72            .is_ok()
73        {
74            Either::Left(FlattenedResponse::from(rx))
75        } else {
76            Either::Right(future::err(RequestError::ChannelClosed))
77        }
78    }
79}
80
81impl<N: NetworkPrimitives> BodiesClient for FetchClient<N> {
82    type Body = N::BlockBody;
83    type Output = BodiesFut<N::BlockBody>;
84
85    /// Sends a `GetBlockBodies` request to an available peer.
86    fn get_block_bodies_with_priority_and_range_hint(
87        &self,
88        request: Vec<B256>,
89        priority: Priority,
90        range_hint: Option<RangeInclusive<u64>>,
91    ) -> Self::Output {
92        let (response, rx) = oneshot::channel();
93        if self
94            .request_tx
95            .send(DownloadRequest::GetBlockBodies { request, response, priority, range_hint })
96            .is_ok()
97        {
98            Box::pin(FlattenedResponse::from(rx))
99        } else {
100            Box::pin(future::err(RequestError::ChannelClosed))
101        }
102    }
103}
104
105impl<N: NetworkPrimitives> BlockClient for FetchClient<N> {
106    type Block = N::Block;
107}