1use crate::{
4 budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
5 metrics::EthRequestHandlerMetrics,
6};
7use alloy_consensus::{BlockHeader, ReceiptWithBloom};
8use alloy_eips::BlockHashOrNumber;
9use alloy_primitives::Bytes;
10use alloy_rlp::Encodable;
11use futures::StreamExt;
12use reth_eth_wire::{
13 BlockAccessLists, BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockAccessLists,
14 GetBlockBodies, GetBlockHeaders, GetNodeData, GetReceipts, GetReceipts70, HeadersDirection,
15 NetworkPrimitives, NodeData, Receipts, Receipts69, Receipts70,
16};
17use reth_network_api::test_utils::PeersHandle;
18use reth_network_p2p::error::RequestResult;
19use reth_network_peers::PeerId;
20use reth_primitives_traits::Block;
21use reth_storage_api::{BlockReader, HeaderProvider};
22use std::{
23 future::Future,
24 pin::Pin,
25 task::{Context, Poll},
26 time::Duration,
27};
28use tokio::sync::{mpsc::Receiver, oneshot};
29use tokio_stream::wrappers::ReceiverStream;
30
31pub const MAX_RECEIPTS_SERVE: usize = 1024;
37
38pub const MAX_HEADERS_SERVE: usize = 1024;
42
43pub const MAX_BODIES_SERVE: usize = 1024;
48
49pub const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
51
52#[derive(Debug)]
56#[must_use = "Manager does nothing unless polled."]
57pub struct EthRequestHandler<C, N: NetworkPrimitives = EthNetworkPrimitives> {
58 client: C,
60 #[expect(dead_code)]
63 peers: PeersHandle,
64 incoming_requests: ReceiverStream<IncomingEthRequest<N>>,
66 metrics: EthRequestHandlerMetrics,
68}
69
70impl<C, N: NetworkPrimitives> EthRequestHandler<C, N> {
72 pub fn new(client: C, peers: PeersHandle, incoming: Receiver<IncomingEthRequest<N>>) -> Self {
74 Self {
75 client,
76 peers,
77 incoming_requests: ReceiverStream::new(incoming),
78 metrics: Default::default(),
79 }
80 }
81}
82
83impl<C, N> EthRequestHandler<C, N>
84where
85 N: NetworkPrimitives,
86 C: BlockReader,
87{
88 fn get_headers_response(&self, request: GetBlockHeaders) -> Vec<C::Header> {
90 let GetBlockHeaders { start_block, limit, skip, direction } = request;
91
92 let mut headers = Vec::new();
93
94 let mut block: BlockHashOrNumber = match start_block {
95 BlockHashOrNumber::Hash(start) => start.into(),
96 BlockHashOrNumber::Number(num) => {
97 let Some(hash) = self.client.block_hash(num).unwrap_or_default() else {
98 return headers
99 };
100 hash.into()
101 }
102 };
103
104 let skip = skip as u64;
105 let mut total_bytes = 0;
106
107 for _ in 0..limit {
108 if let Some(header) = self.client.header_by_hash_or_number(block).unwrap_or_default() {
109 let number = header.number();
110 let parent_hash = header.parent_hash();
111
112 total_bytes += header.length();
113 headers.push(header);
114
115 if headers.len() >= MAX_HEADERS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
116 break
117 }
118
119 match direction {
120 HeadersDirection::Rising => {
121 if let Some(next) = number.checked_add(1).and_then(|n| n.checked_add(skip))
122 {
123 block = next.into()
124 } else {
125 break
126 }
127 }
128 HeadersDirection::Falling => {
129 if skip > 0 {
130 if let Some(next) =
133 number.checked_sub(1).and_then(|num| num.checked_sub(skip))
134 {
135 block = next.into()
136 } else {
137 break
138 }
139 } else {
140 block = parent_hash.into()
141 }
142 }
143 }
144 } else {
145 break
146 }
147 }
148
149 headers
150 }
151
152 fn on_headers_request(
153 &self,
154 _peer_id: PeerId,
155 request: GetBlockHeaders,
156 response: oneshot::Sender<RequestResult<BlockHeaders<C::Header>>>,
157 ) {
158 self.metrics.eth_headers_requests_received_total.increment(1);
159 let headers = self.get_headers_response(request);
160 let _ = response.send(Ok(BlockHeaders(headers)));
161 }
162
163 fn on_bodies_request(
164 &self,
165 _peer_id: PeerId,
166 request: GetBlockBodies,
167 response: oneshot::Sender<RequestResult<BlockBodies<<C::Block as Block>::Body>>>,
168 ) {
169 self.metrics.eth_bodies_requests_received_total.increment(1);
170 let mut bodies = Vec::new();
171
172 let mut total_bytes = 0;
173
174 for hash in request {
175 if let Some(block) = self.client.block_by_hash(hash).unwrap_or_default() {
176 let body = block.into_body();
177 total_bytes += body.length();
178 bodies.push(body);
179
180 if bodies.len() >= MAX_BODIES_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
181 break
182 }
183 } else {
184 break
185 }
186 }
187
188 let _ = response.send(Ok(BlockBodies(bodies)));
189 }
190
191 fn on_receipts_request(
192 &self,
193 _peer_id: PeerId,
194 request: GetReceipts,
195 response: oneshot::Sender<RequestResult<Receipts<C::Receipt>>>,
196 ) {
197 self.metrics.eth_receipts_requests_received_total.increment(1);
198
199 let receipts = self.get_receipts_response(request, |receipts_by_block| {
200 receipts_by_block.into_iter().map(ReceiptWithBloom::from).collect::<Vec<_>>()
201 });
202
203 let _ = response.send(Ok(Receipts(receipts)));
204 }
205
206 fn on_receipts69_request(
207 &self,
208 _peer_id: PeerId,
209 request: GetReceipts,
210 response: oneshot::Sender<RequestResult<Receipts69<C::Receipt>>>,
211 ) {
212 self.metrics.eth_receipts_requests_received_total.increment(1);
213
214 let receipts = self.get_receipts_response(request, |receipts_by_block| {
215 receipts_by_block
217 });
218
219 let _ = response.send(Ok(Receipts69(receipts)));
220 }
221
222 fn on_receipts70_request(
226 &self,
227 _peer_id: PeerId,
228 request: GetReceipts70,
229 response: oneshot::Sender<RequestResult<Receipts70<C::Receipt>>>,
230 ) {
231 self.metrics.eth_receipts_requests_received_total.increment(1);
232
233 let GetReceipts70 { first_block_receipt_index, block_hashes } = request;
234
235 let mut receipts = Vec::new();
236 let mut total_bytes = 0usize;
237 let mut last_block_incomplete = false;
238
239 for (idx, hash) in block_hashes.into_iter().enumerate() {
240 if idx >= MAX_RECEIPTS_SERVE {
241 break
242 }
243
244 let Some(mut block_receipts) =
245 self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
246 else {
247 break
248 };
249
250 if idx == 0 && first_block_receipt_index > 0 {
251 let skip = first_block_receipt_index as usize;
252 if skip >= block_receipts.len() {
253 block_receipts.clear();
254 } else {
255 block_receipts.drain(0..skip);
256 }
257 }
258
259 let block_size = block_receipts.length();
260
261 if total_bytes + block_size <= SOFT_RESPONSE_LIMIT {
262 total_bytes += block_size;
263 receipts.push(block_receipts);
264 continue;
265 }
266
267 let mut partial_block = Vec::new();
268 for receipt in block_receipts {
269 let receipt_size = receipt.length();
270 if total_bytes + receipt_size > SOFT_RESPONSE_LIMIT {
271 break;
272 }
273 total_bytes += receipt_size;
274 partial_block.push(receipt);
275 }
276
277 receipts.push(partial_block);
278 last_block_incomplete = true;
279 break;
280 }
281
282 let _ = response.send(Ok(Receipts70 { last_block_incomplete, receipts }));
283 }
284
285 fn on_block_access_lists_request(
289 &self,
290 _peer_id: PeerId,
291 request: GetBlockAccessLists,
292 response: oneshot::Sender<RequestResult<BlockAccessLists>>,
293 ) {
294 let access_lists = request.0.into_iter().map(|_| Bytes::new()).collect();
295 let _ = response.send(Ok(BlockAccessLists(access_lists)));
296 }
297
298 #[inline]
299 fn get_receipts_response<T, F>(&self, request: GetReceipts, transform_fn: F) -> Vec<Vec<T>>
300 where
301 F: Fn(Vec<C::Receipt>) -> Vec<T>,
302 T: Encodable,
303 {
304 let mut receipts = Vec::new();
305 let mut total_bytes = 0;
306
307 for hash in request {
308 if let Some(receipts_by_block) =
309 self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
310 {
311 let transformed_receipts = transform_fn(receipts_by_block);
312 total_bytes += transformed_receipts.length();
313 receipts.push(transformed_receipts);
314
315 if receipts.len() >= MAX_RECEIPTS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
316 break
317 }
318 } else {
319 break
320 }
321 }
322
323 receipts
324 }
325}
326
327impl<C, N> Future for EthRequestHandler<C, N>
331where
332 N: NetworkPrimitives,
333 C: BlockReader<Block = N::Block, Receipt = N::Receipt>
334 + HeaderProvider<Header = N::BlockHeader>
335 + Unpin,
336{
337 type Output = ();
338
339 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
340 let this = self.get_mut();
341
342 let mut acc = Duration::ZERO;
343 let maybe_more_incoming_requests = metered_poll_nested_stream_with_budget!(
344 acc,
345 "net::eth",
346 "Incoming eth requests stream",
347 DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS,
348 this.incoming_requests.poll_next_unpin(cx),
349 |incoming| {
350 match incoming {
351 IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => {
352 this.on_headers_request(peer_id, request, response)
353 }
354 IncomingEthRequest::GetBlockBodies { peer_id, request, response } => {
355 this.on_bodies_request(peer_id, request, response)
356 }
357 IncomingEthRequest::GetNodeData { .. } => {
358 this.metrics.eth_node_data_requests_received_total.increment(1);
359 }
360 IncomingEthRequest::GetReceipts { peer_id, request, response } => {
361 this.on_receipts_request(peer_id, request, response)
362 }
363 IncomingEthRequest::GetReceipts69 { peer_id, request, response } => {
364 this.on_receipts69_request(peer_id, request, response)
365 }
366 IncomingEthRequest::GetReceipts70 { peer_id, request, response } => {
367 this.on_receipts70_request(peer_id, request, response)
368 }
369 IncomingEthRequest::GetBlockAccessLists { peer_id, request, response } => {
370 this.on_block_access_lists_request(peer_id, request, response)
371 }
372 }
373 },
374 );
375
376 this.metrics.acc_duration_poll_eth_req_handler.set(acc.as_secs_f64());
377
378 if maybe_more_incoming_requests {
380 cx.waker().wake_by_ref();
382 }
383
384 Poll::Pending
385 }
386}
387
388#[derive(Debug)]
390pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
391 GetBlockHeaders {
395 peer_id: PeerId,
397 request: GetBlockHeaders,
399 response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
401 },
402 GetBlockBodies {
406 peer_id: PeerId,
408 request: GetBlockBodies,
410 response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
412 },
413 GetNodeData {
417 peer_id: PeerId,
419 request: GetNodeData,
421 response: oneshot::Sender<RequestResult<NodeData>>,
423 },
424 GetReceipts {
428 peer_id: PeerId,
430 request: GetReceipts,
432 response: oneshot::Sender<RequestResult<Receipts<N::Receipt>>>,
434 },
435 GetReceipts69 {
439 peer_id: PeerId,
441 request: GetReceipts,
443 response: oneshot::Sender<RequestResult<Receipts69<N::Receipt>>>,
445 },
446 GetReceipts70 {
450 peer_id: PeerId,
452 request: GetReceipts70,
454 response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
456 },
457 GetBlockAccessLists {
461 peer_id: PeerId,
463 request: GetBlockAccessLists,
465 response: oneshot::Sender<RequestResult<BlockAccessLists>>,
467 },
468}