1use crate::{
4 budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
5 metrics::EthRequestHandlerMetrics,
6};
7use alloy_consensus::{BlockHeader, ReceiptWithBloom};
8use alloy_eips::BlockHashOrNumber;
9use alloy_rlp::Encodable;
10use futures::StreamExt;
11use reth_eth_wire::{
12 BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetNodeData,
13 GetReceipts, HeadersDirection, NetworkPrimitives, NodeData, Receipts,
14};
15use reth_network_api::test_utils::PeersHandle;
16use reth_network_p2p::error::RequestResult;
17use reth_network_peers::PeerId;
18use reth_primitives_traits::Block;
19use reth_storage_api::{BlockReader, HeaderProvider};
20use std::{
21 future::Future,
22 pin::Pin,
23 task::{Context, Poll},
24 time::Duration,
25};
26use tokio::sync::{mpsc::Receiver, oneshot};
27use tokio_stream::wrappers::ReceiverStream;
28
29pub const MAX_RECEIPTS_SERVE: usize = 1024;
35
36pub const MAX_HEADERS_SERVE: usize = 1024;
40
41pub const MAX_BODIES_SERVE: usize = 1024;
46
47pub const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
49
50#[derive(Debug)]
54#[must_use = "Manager does nothing unless polled."]
55pub struct EthRequestHandler<C, N: NetworkPrimitives = EthNetworkPrimitives> {
56 client: C,
58 #[allow(dead_code)]
61 peers: PeersHandle,
62 incoming_requests: ReceiverStream<IncomingEthRequest<N>>,
64 metrics: EthRequestHandlerMetrics,
66}
67
68impl<C, N: NetworkPrimitives> EthRequestHandler<C, N> {
70 pub fn new(client: C, peers: PeersHandle, incoming: Receiver<IncomingEthRequest<N>>) -> Self {
72 Self {
73 client,
74 peers,
75 incoming_requests: ReceiverStream::new(incoming),
76 metrics: Default::default(),
77 }
78 }
79}
80
81impl<C, N> EthRequestHandler<C, N>
82where
83 N: NetworkPrimitives,
84 C: BlockReader,
85{
86 fn get_headers_response(&self, request: GetBlockHeaders) -> Vec<C::Header> {
88 let GetBlockHeaders { start_block, limit, skip, direction } = request;
89
90 let mut headers = Vec::new();
91
92 let mut block: BlockHashOrNumber = match start_block {
93 BlockHashOrNumber::Hash(start) => start.into(),
94 BlockHashOrNumber::Number(num) => {
95 let Some(hash) = self.client.block_hash(num).unwrap_or_default() else {
96 return headers
97 };
98 hash.into()
99 }
100 };
101
102 let skip = skip as u64;
103 let mut total_bytes = 0;
104
105 for _ in 0..limit {
106 if let Some(header) = self.client.header_by_hash_or_number(block).unwrap_or_default() {
107 match direction {
108 HeadersDirection::Rising => {
109 if let Some(next) = (header.number() + 1).checked_add(skip) {
110 block = next.into()
111 } else {
112 break
113 }
114 }
115 HeadersDirection::Falling => {
116 if skip > 0 {
117 if let Some(next) =
120 header.number().checked_sub(1).and_then(|num| num.checked_sub(skip))
121 {
122 block = next.into()
123 } else {
124 break
125 }
126 } else {
127 block = header.parent_hash().into()
128 }
129 }
130 }
131
132 total_bytes += header.length();
133 headers.push(header);
134
135 if headers.len() >= MAX_HEADERS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
136 break
137 }
138 } else {
139 break
140 }
141 }
142
143 headers
144 }
145
146 fn on_headers_request(
147 &self,
148 _peer_id: PeerId,
149 request: GetBlockHeaders,
150 response: oneshot::Sender<RequestResult<BlockHeaders<C::Header>>>,
151 ) {
152 self.metrics.eth_headers_requests_received_total.increment(1);
153 let headers = self.get_headers_response(request);
154 let _ = response.send(Ok(BlockHeaders(headers)));
155 }
156
157 fn on_bodies_request(
158 &self,
159 _peer_id: PeerId,
160 request: GetBlockBodies,
161 response: oneshot::Sender<RequestResult<BlockBodies<<C::Block as Block>::Body>>>,
162 ) {
163 self.metrics.eth_bodies_requests_received_total.increment(1);
164 let mut bodies = Vec::new();
165
166 let mut total_bytes = 0;
167
168 for hash in request.0 {
169 if let Some(block) = self.client.block_by_hash(hash).unwrap_or_default() {
170 let body = block.into_body();
171 total_bytes += body.length();
172 bodies.push(body);
173
174 if bodies.len() >= MAX_BODIES_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
175 break
176 }
177 } else {
178 break
179 }
180 }
181
182 let _ = response.send(Ok(BlockBodies(bodies)));
183 }
184
185 fn on_receipts_request(
186 &self,
187 _peer_id: PeerId,
188 request: GetReceipts,
189 response: oneshot::Sender<RequestResult<Receipts<C::Receipt>>>,
190 ) {
191 self.metrics.eth_receipts_requests_received_total.increment(1);
192
193 let mut receipts = Vec::new();
194
195 let mut total_bytes = 0;
196
197 for hash in request.0 {
198 if let Some(receipts_by_block) =
199 self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
200 {
201 let receipt =
202 receipts_by_block.into_iter().map(ReceiptWithBloom::from).collect::<Vec<_>>();
203
204 total_bytes += receipt.length();
205 receipts.push(receipt);
206
207 if receipts.len() >= MAX_RECEIPTS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
208 break
209 }
210 } else {
211 break
212 }
213 }
214
215 let _ = response.send(Ok(Receipts(receipts)));
216 }
217}
218
219impl<C, N> Future for EthRequestHandler<C, N>
223where
224 N: NetworkPrimitives,
225 C: BlockReader<Block = N::Block, Receipt = N::Receipt>
226 + HeaderProvider<Header = N::BlockHeader>
227 + Unpin,
228{
229 type Output = ();
230
231 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
232 let this = self.get_mut();
233
234 let mut acc = Duration::ZERO;
235 let maybe_more_incoming_requests = metered_poll_nested_stream_with_budget!(
236 acc,
237 "net::eth",
238 "Incoming eth requests stream",
239 DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS,
240 this.incoming_requests.poll_next_unpin(cx),
241 |incoming| {
242 match incoming {
243 IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => {
244 this.on_headers_request(peer_id, request, response)
245 }
246 IncomingEthRequest::GetBlockBodies { peer_id, request, response } => {
247 this.on_bodies_request(peer_id, request, response)
248 }
249 IncomingEthRequest::GetNodeData { .. } => {
250 this.metrics.eth_node_data_requests_received_total.increment(1);
251 }
252 IncomingEthRequest::GetReceipts { peer_id, request, response } => {
253 this.on_receipts_request(peer_id, request, response)
254 }
255 }
256 },
257 );
258
259 this.metrics.acc_duration_poll_eth_req_handler.set(acc.as_secs_f64());
260
261 if maybe_more_incoming_requests {
263 cx.waker().wake_by_ref();
265 }
266
267 Poll::Pending
268 }
269}
270
271#[derive(Debug)]
273pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
274 GetBlockHeaders {
278 peer_id: PeerId,
280 request: GetBlockHeaders,
282 response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
284 },
285 GetBlockBodies {
289 peer_id: PeerId,
291 request: GetBlockBodies,
293 response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
295 },
296 GetNodeData {
300 peer_id: PeerId,
302 request: GetNodeData,
304 response: oneshot::Sender<RequestResult<NodeData>>,
306 },
307 GetReceipts {
311 peer_id: PeerId,
313 request: GetReceipts,
315 response: oneshot::Sender<RequestResult<Receipts<N::Receipt>>>,
317 },
318}