reth_network/fetch/
client.rsuse crate::{fetch::DownloadRequest, flattened_response::FlattenedResponse};
use alloy_primitives::B256;
use futures::{future, future::Either};
use reth_eth_wire::{EthNetworkPrimitives, NetworkPrimitives};
use reth_network_api::test_utils::PeersHandle;
use reth_network_p2p::{
bodies::client::{BodiesClient, BodiesFut},
download::DownloadClient,
error::{PeerRequestResult, RequestError},
headers::client::{HeadersClient, HeadersRequest},
priority::Priority,
};
use reth_network_peers::PeerId;
use reth_network_types::ReputationChangeKind;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
#[cfg_attr(doc, aquamarine::aquamarine)]
#[derive(Debug, Clone)]
pub struct FetchClient<N: NetworkPrimitives = EthNetworkPrimitives> {
pub(crate) request_tx: UnboundedSender<DownloadRequest<N>>,
pub(crate) peers_handle: PeersHandle,
pub(crate) num_active_peers: Arc<AtomicUsize>,
}
impl<N: NetworkPrimitives> DownloadClient for FetchClient<N> {
fn report_bad_message(&self, peer_id: PeerId) {
self.peers_handle.reputation_change(peer_id, ReputationChangeKind::BadMessage);
}
fn num_connected_peers(&self) -> usize {
self.num_active_peers.load(Ordering::Relaxed)
}
}
type HeadersClientFuture<T> = Either<FlattenedResponse<T>, future::Ready<T>>;
impl<N: NetworkPrimitives> HeadersClient for FetchClient<N> {
type Header = N::BlockHeader;
type Output = HeadersClientFuture<PeerRequestResult<Vec<N::BlockHeader>>>;
fn get_headers_with_priority(
&self,
request: HeadersRequest,
priority: Priority,
) -> Self::Output {
let (response, rx) = oneshot::channel();
if self
.request_tx
.send(DownloadRequest::GetBlockHeaders { request, response, priority })
.is_ok()
{
Either::Left(FlattenedResponse::from(rx))
} else {
Either::Right(future::err(RequestError::ChannelClosed))
}
}
}
impl<N: NetworkPrimitives> BodiesClient for FetchClient<N> {
type Body = N::BlockBody;
type Output = BodiesFut<N::BlockBody>;
fn get_block_bodies_with_priority(
&self,
request: Vec<B256>,
priority: Priority,
) -> Self::Output {
let (response, rx) = oneshot::channel();
if self
.request_tx
.send(DownloadRequest::GetBlockBodies { request, response, priority })
.is_ok()
{
Box::pin(FlattenedResponse::from(rx))
} else {
Box::pin(future::err(RequestError::ChannelClosed))
}
}
}