1use crate::{
4 budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
5 metrics::EthRequestHandlerMetrics,
6};
7use alloy_consensus::{BlockHeader, ReceiptWithBloom};
8use alloy_eips::BlockHashOrNumber;
9use alloy_rlp::Encodable;
10use futures::StreamExt;
11use reth_eth_wire::{
12 BlockAccessLists, BlockBodies, BlockHeaders, Cells, EthNetworkPrimitives, GetBlockAccessLists,
13 GetBlockBodies, GetBlockHeaders, GetCells, GetNodeData, GetReceipts, GetReceipts70,
14 HeadersDirection, NetworkPrimitives, NodeData, Receipts, Receipts69, Receipts70,
15};
16use reth_network_api::test_utils::PeersHandle;
17use reth_network_p2p::error::RequestResult;
18use reth_network_peers::PeerId;
19use reth_primitives_traits::Block;
20use reth_storage_api::{BalProvider, BlockReader, GetBlockAccessListLimit, HeaderProvider};
21use std::{
22 future::Future,
23 pin::Pin,
24 task::{Context, Poll},
25 time::Duration,
26};
27use tokio::sync::{mpsc::Receiver, oneshot};
28use tokio_stream::wrappers::ReceiverStream;
29
30pub const MAX_RECEIPTS_SERVE: usize = 1024;
36
37pub const MAX_HEADERS_SERVE: usize = 1024;
41
42pub const MAX_BODIES_SERVE: usize = 1024;
47
48pub const MAX_BLOCK_ACCESS_LISTS_SERVE: usize = 1024;
52
53pub const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
55
56#[derive(Debug)]
60#[must_use = "Manager does nothing unless polled."]
61pub struct EthRequestHandler<C, N: NetworkPrimitives = EthNetworkPrimitives> {
62 client: C,
64 #[expect(dead_code)]
67 peers: PeersHandle,
68 incoming_requests: ReceiverStream<IncomingEthRequest<N>>,
70 metrics: EthRequestHandlerMetrics,
72}
73
74impl<C, N: NetworkPrimitives> EthRequestHandler<C, N> {
76 pub fn new(client: C, peers: PeersHandle, incoming: Receiver<IncomingEthRequest<N>>) -> Self {
78 Self {
79 client,
80 peers,
81 incoming_requests: ReceiverStream::new(incoming),
82 metrics: Default::default(),
83 }
84 }
85}
86
87impl<C, N> EthRequestHandler<C, N>
88where
89 N: NetworkPrimitives,
90 C: BlockReader,
91{
92 fn get_headers_response(&self, request: GetBlockHeaders) -> Vec<C::Header> {
94 let GetBlockHeaders { start_block, limit, skip, direction } = request;
95
96 let mut headers = Vec::new();
97
98 let mut block: BlockHashOrNumber = match start_block {
99 BlockHashOrNumber::Hash(start) => start.into(),
100 BlockHashOrNumber::Number(num) => {
101 let Some(hash) = self.client.block_hash(num).unwrap_or_default() else {
102 return headers
103 };
104 hash.into()
105 }
106 };
107
108 let skip = skip as u64;
109 let mut total_bytes = 0;
110
111 for _ in 0..limit {
112 if let Some(header) = self.client.header_by_hash_or_number(block).unwrap_or_default() {
113 let number = header.number();
114 let parent_hash = header.parent_hash();
115
116 total_bytes += header.length();
117 headers.push(header);
118
119 if headers.len() >= MAX_HEADERS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
120 break
121 }
122
123 match direction {
124 HeadersDirection::Rising => {
125 if let Some(next) = number.checked_add(1).and_then(|n| n.checked_add(skip))
126 {
127 block = next.into()
128 } else {
129 break
130 }
131 }
132 HeadersDirection::Falling => {
133 if skip > 0 {
134 if let Some(next) =
137 number.checked_sub(1).and_then(|num| num.checked_sub(skip))
138 {
139 block = next.into()
140 } else {
141 break
142 }
143 } else {
144 block = parent_hash.into()
145 }
146 }
147 }
148 } else {
149 break
150 }
151 }
152
153 headers
154 }
155
156 fn on_headers_request(
157 &self,
158 _peer_id: PeerId,
159 request: GetBlockHeaders,
160 response: oneshot::Sender<RequestResult<BlockHeaders<C::Header>>>,
161 ) {
162 self.metrics.eth_headers_requests_received_total.increment(1);
163 let headers = self.get_headers_response(request);
164 let _ = response.send(Ok(BlockHeaders(headers)));
165 }
166
167 fn on_bodies_request(
168 &self,
169 _peer_id: PeerId,
170 request: GetBlockBodies,
171 response: oneshot::Sender<RequestResult<BlockBodies<<C::Block as Block>::Body>>>,
172 ) {
173 self.metrics.eth_bodies_requests_received_total.increment(1);
174 let mut bodies = Vec::new();
175
176 let mut total_bytes = 0;
177
178 for hash in request {
179 if let Some(block) = self.client.block_by_hash(hash).unwrap_or_default() {
180 let body = block.into_body();
181 total_bytes += body.length();
182 bodies.push(body);
183
184 if bodies.len() >= MAX_BODIES_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
185 break
186 }
187 } else {
188 break
189 }
190 }
191
192 let _ = response.send(Ok(BlockBodies(bodies)));
193 }
194
195 fn on_receipts_request(
196 &self,
197 _peer_id: PeerId,
198 request: GetReceipts,
199 response: oneshot::Sender<RequestResult<Receipts<C::Receipt>>>,
200 ) {
201 self.metrics.eth_receipts_requests_received_total.increment(1);
202
203 let receipts = self.get_receipts_response(request, |receipts_by_block| {
204 receipts_by_block.into_iter().map(ReceiptWithBloom::from).collect::<Vec<_>>()
205 });
206
207 let _ = response.send(Ok(Receipts(receipts)));
208 }
209
210 fn on_receipts69_request(
211 &self,
212 _peer_id: PeerId,
213 request: GetReceipts,
214 response: oneshot::Sender<RequestResult<Receipts69<C::Receipt>>>,
215 ) {
216 self.metrics.eth_receipts_requests_received_total.increment(1);
217
218 let receipts = self.get_receipts_response(request, |receipts_by_block| {
219 receipts_by_block
221 });
222
223 let _ = response.send(Ok(Receipts69(receipts)));
224 }
225
226 fn on_receipts70_request(
230 &self,
231 _peer_id: PeerId,
232 request: GetReceipts70,
233 response: oneshot::Sender<RequestResult<Receipts70<C::Receipt>>>,
234 ) {
235 self.metrics.eth_receipts_requests_received_total.increment(1);
236
237 let GetReceipts70 { first_block_receipt_index, block_hashes } = request;
238
239 let mut receipts = Vec::new();
240 let mut total_bytes = 0usize;
241 let mut last_block_incomplete = false;
242
243 for (idx, hash) in block_hashes.into_iter().enumerate() {
244 if idx >= MAX_RECEIPTS_SERVE {
245 break
246 }
247
248 let Some(mut block_receipts) =
249 self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
250 else {
251 break
252 };
253
254 if idx == 0 && first_block_receipt_index > 0 {
255 let skip = first_block_receipt_index as usize;
256 if skip >= block_receipts.len() {
257 block_receipts.clear();
258 } else {
259 block_receipts.drain(0..skip);
260 }
261 }
262
263 let block_size = block_receipts.length();
264
265 if total_bytes + block_size <= SOFT_RESPONSE_LIMIT {
266 total_bytes += block_size;
267 receipts.push(block_receipts);
268 continue;
269 }
270
271 let mut partial_block = Vec::new();
272 for receipt in block_receipts {
273 let receipt_size = receipt.length();
274 if total_bytes + receipt_size > SOFT_RESPONSE_LIMIT {
275 break;
276 }
277 total_bytes += receipt_size;
278 partial_block.push(receipt);
279 }
280
281 receipts.push(partial_block);
282 last_block_incomplete = true;
283 break;
284 }
285
286 let _ = response.send(Ok(Receipts70 { last_block_incomplete, receipts }));
287 }
288
289 #[inline]
290 fn get_receipts_response<T, F>(&self, request: GetReceipts, transform_fn: F) -> Vec<Vec<T>>
291 where
292 F: Fn(Vec<C::Receipt>) -> Vec<T>,
293 T: Encodable,
294 {
295 let mut receipts = Vec::new();
296 let mut total_bytes = 0;
297
298 for hash in request {
299 if let Some(receipts_by_block) =
300 self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
301 {
302 let transformed_receipts = transform_fn(receipts_by_block);
303 total_bytes += transformed_receipts.length();
304 receipts.push(transformed_receipts);
305
306 if receipts.len() >= MAX_RECEIPTS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
307 break
308 }
309 } else {
310 break
311 }
312 }
313
314 receipts
315 }
316
317 fn on_cells_request(
318 &self,
319 _peer_id: PeerId,
320 _request: GetCells,
321 response: oneshot::Sender<RequestResult<Cells>>,
322 ) {
323 let _ = response.send(Ok(Cells::default()));
324 }
325}
326
327impl<C, N> EthRequestHandler<C, N>
328where
329 N: NetworkPrimitives,
330 C: BalProvider,
331{
332 fn on_block_access_lists_request(
337 &self,
338 _peer_id: PeerId,
339 mut request: GetBlockAccessLists,
340 response: oneshot::Sender<RequestResult<BlockAccessLists>>,
341 ) {
342 request.0.truncate(MAX_BLOCK_ACCESS_LISTS_SERVE);
343
344 let limit = GetBlockAccessListLimit::ResponseSizeSoftLimit(SOFT_RESPONSE_LIMIT);
345 let access_lists =
346 self.client.bal_store().get_by_hashes_with_limit(&request.0, limit).unwrap_or_default();
347 let _ = response.send(Ok(BlockAccessLists(access_lists)));
348 }
349}
350
351impl<C, N> Future for EthRequestHandler<C, N>
355where
356 N: NetworkPrimitives,
357 C: BalProvider
358 + BlockReader<Block = N::Block, Receipt = N::Receipt>
359 + HeaderProvider<Header = N::BlockHeader>
360 + Unpin,
361{
362 type Output = ();
363
364 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
365 let this = self.get_mut();
366
367 let mut acc = Duration::ZERO;
368 let maybe_more_incoming_requests = metered_poll_nested_stream_with_budget!(
369 acc,
370 "net::eth",
371 "Incoming eth requests stream",
372 DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS,
373 this.incoming_requests.poll_next_unpin(cx),
374 |incoming| {
375 match incoming {
376 IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => {
377 this.on_headers_request(peer_id, request, response)
378 }
379 IncomingEthRequest::GetBlockBodies { peer_id, request, response } => {
380 this.on_bodies_request(peer_id, request, response)
381 }
382 IncomingEthRequest::GetNodeData { .. } => {
383 this.metrics.eth_node_data_requests_received_total.increment(1);
384 }
385 IncomingEthRequest::GetReceipts { peer_id, request, response } => {
386 this.on_receipts_request(peer_id, request, response)
387 }
388 IncomingEthRequest::GetReceipts69 { peer_id, request, response } => {
389 this.on_receipts69_request(peer_id, request, response)
390 }
391 IncomingEthRequest::GetReceipts70 { peer_id, request, response } => {
392 this.on_receipts70_request(peer_id, request, response)
393 }
394 IncomingEthRequest::GetBlockAccessLists { peer_id, request, response } => {
395 this.on_block_access_lists_request(peer_id, request, response)
396 }
397 IncomingEthRequest::GetCells { peer_id, request, response } => {
398 this.on_cells_request(peer_id, request, response)
399 }
400 }
401 },
402 );
403
404 this.metrics.acc_duration_poll_eth_req_handler.set(acc.as_secs_f64());
405
406 if maybe_more_incoming_requests {
408 cx.waker().wake_by_ref();
410 }
411
412 Poll::Pending
413 }
414}
415
416#[derive(Debug)]
418pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
419 GetBlockHeaders {
423 peer_id: PeerId,
425 request: GetBlockHeaders,
427 response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
429 },
430 GetBlockBodies {
434 peer_id: PeerId,
436 request: GetBlockBodies,
438 response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
440 },
441 GetNodeData {
445 peer_id: PeerId,
447 request: GetNodeData,
449 response: oneshot::Sender<RequestResult<NodeData>>,
451 },
452 GetReceipts {
456 peer_id: PeerId,
458 request: GetReceipts,
460 response: oneshot::Sender<RequestResult<Receipts<N::Receipt>>>,
462 },
463 GetReceipts69 {
467 peer_id: PeerId,
469 request: GetReceipts,
471 response: oneshot::Sender<RequestResult<Receipts69<N::Receipt>>>,
473 },
474 GetReceipts70 {
478 peer_id: PeerId,
480 request: GetReceipts70,
482 response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
484 },
485 GetBlockAccessLists {
489 peer_id: PeerId,
491 request: GetBlockAccessLists,
493 response: oneshot::Sender<RequestResult<BlockAccessLists>>,
495 },
496 GetCells {
500 peer_id: PeerId,
502 request: GetCells,
504 response: oneshot::Sender<RequestResult<Cells>>,
506 },
507}