use crate::{
budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
metrics::EthRequestHandlerMetrics,
};
use alloy_consensus::{BlockHeader, ReceiptWithBloom};
use alloy_eips::BlockHashOrNumber;
use alloy_rlp::Encodable;
use futures::StreamExt;
use reth_eth_wire::{
BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetNodeData,
GetReceipts, HeadersDirection, NetworkPrimitives, NodeData, Receipts,
};
use reth_network_api::test_utils::PeersHandle;
use reth_network_p2p::error::RequestResult;
use reth_network_peers::PeerId;
use reth_primitives_traits::Block;
use reth_storage_api::{BlockReader, HeaderProvider};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use tokio::sync::{mpsc::Receiver, oneshot};
use tokio_stream::wrappers::ReceiverStream;
const MAX_RECEIPTS_SERVE: usize = 1024;
const MAX_HEADERS_SERVE: usize = 1024;
const MAX_BODIES_SERVE: usize = 1024;
const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
#[derive(Debug)]
#[must_use = "Manager does nothing unless polled."]
pub struct EthRequestHandler<C, N: NetworkPrimitives = EthNetworkPrimitives> {
client: C,
#[allow(dead_code)]
peers: PeersHandle,
incoming_requests: ReceiverStream<IncomingEthRequest<N>>,
metrics: EthRequestHandlerMetrics,
}
impl<C, N: NetworkPrimitives> EthRequestHandler<C, N> {
pub fn new(client: C, peers: PeersHandle, incoming: Receiver<IncomingEthRequest<N>>) -> Self {
Self {
client,
peers,
incoming_requests: ReceiverStream::new(incoming),
metrics: Default::default(),
}
}
}
impl<C, N> EthRequestHandler<C, N>
where
N: NetworkPrimitives,
C: BlockReader,
{
fn get_headers_response(&self, request: GetBlockHeaders) -> Vec<C::Header> {
let GetBlockHeaders { start_block, limit, skip, direction } = request;
let mut headers = Vec::new();
let mut block: BlockHashOrNumber = match start_block {
BlockHashOrNumber::Hash(start) => start.into(),
BlockHashOrNumber::Number(num) => {
let Some(hash) = self.client.block_hash(num).unwrap_or_default() else {
return headers
};
hash.into()
}
};
let skip = skip as u64;
let mut total_bytes = 0;
for _ in 0..limit {
if let Some(header) = self.client.header_by_hash_or_number(block).unwrap_or_default() {
match direction {
HeadersDirection::Rising => {
if let Some(next) = (header.number() + 1).checked_add(skip) {
block = next.into()
} else {
break
}
}
HeadersDirection::Falling => {
if skip > 0 {
if let Some(next) =
header.number().checked_sub(1).and_then(|num| num.checked_sub(skip))
{
block = next.into()
} else {
break
}
} else {
block = header.parent_hash().into()
}
}
}
total_bytes += header.length();
headers.push(header);
if headers.len() >= MAX_HEADERS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
break
}
} else {
break
}
}
headers
}
fn on_headers_request(
&self,
_peer_id: PeerId,
request: GetBlockHeaders,
response: oneshot::Sender<RequestResult<BlockHeaders<C::Header>>>,
) {
self.metrics.eth_headers_requests_received_total.increment(1);
let headers = self.get_headers_response(request);
let _ = response.send(Ok(BlockHeaders(headers)));
}
fn on_bodies_request(
&self,
_peer_id: PeerId,
request: GetBlockBodies,
response: oneshot::Sender<
RequestResult<BlockBodies<<C::Block as reth_primitives_traits::Block>::Body>>,
>,
) {
self.metrics.eth_bodies_requests_received_total.increment(1);
let mut bodies = Vec::new();
let mut total_bytes = 0;
for hash in request.0 {
if let Some(block) = self.client.block_by_hash(hash).unwrap_or_default() {
let (_, body) = block.split();
total_bytes += body.length();
bodies.push(body);
if bodies.len() >= MAX_BODIES_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
break
}
} else {
break
}
}
let _ = response.send(Ok(BlockBodies(bodies)));
}
fn on_receipts_request(
&self,
_peer_id: PeerId,
request: GetReceipts,
response: oneshot::Sender<RequestResult<Receipts<C::Receipt>>>,
) {
self.metrics.eth_receipts_requests_received_total.increment(1);
let mut receipts = Vec::new();
let mut total_bytes = 0;
for hash in request.0 {
if let Some(receipts_by_block) =
self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
{
let receipt =
receipts_by_block.into_iter().map(ReceiptWithBloom::from).collect::<Vec<_>>();
total_bytes += receipt.length();
receipts.push(receipt);
if receipts.len() >= MAX_RECEIPTS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
break
}
} else {
break
}
}
let _ = response.send(Ok(Receipts(receipts)));
}
}
impl<C, N> Future for EthRequestHandler<C, N>
where
N: NetworkPrimitives,
C: BlockReader<Block = N::Block, Receipt = N::Receipt>
+ HeaderProvider<Header = N::BlockHeader>
+ Unpin,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let mut acc = Duration::ZERO;
let maybe_more_incoming_requests = metered_poll_nested_stream_with_budget!(
acc,
"net::eth",
"Incoming eth requests stream",
DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS,
this.incoming_requests.poll_next_unpin(cx),
|incoming| {
match incoming {
IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => {
this.on_headers_request(peer_id, request, response)
}
IncomingEthRequest::GetBlockBodies { peer_id, request, response } => {
this.on_bodies_request(peer_id, request, response)
}
IncomingEthRequest::GetNodeData { .. } => {
this.metrics.eth_node_data_requests_received_total.increment(1);
}
IncomingEthRequest::GetReceipts { peer_id, request, response } => {
this.on_receipts_request(peer_id, request, response)
}
}
},
);
this.metrics.acc_duration_poll_eth_req_handler.set(acc.as_secs_f64());
if maybe_more_incoming_requests {
cx.waker().wake_by_ref();
}
Poll::Pending
}
}
#[derive(Debug)]
pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
GetBlockHeaders {
peer_id: PeerId,
request: GetBlockHeaders,
response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
},
GetBlockBodies {
peer_id: PeerId,
request: GetBlockBodies,
response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
},
GetNodeData {
peer_id: PeerId,
request: GetNodeData,
response: oneshot::Sender<RequestResult<NodeData>>,
},
GetReceipts {
peer_id: PeerId,
request: GetReceipts,
response: oneshot::Sender<RequestResult<Receipts<N::Receipt>>>,
},
}