1use crate::{
4 budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
5 metrics::EthRequestHandlerMetrics,
6};
7use alloy_consensus::{BlockHeader, ReceiptWithBloom};
8use alloy_eips::BlockHashOrNumber;
9use alloy_rlp::Encodable;
10use futures::StreamExt;
11use reth_eth_wire::{
12 BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetNodeData,
13 GetReceipts, GetReceipts70, HeadersDirection, NetworkPrimitives, NodeData, Receipts,
14 Receipts69, Receipts70,
15};
16use reth_network_api::test_utils::PeersHandle;
17use reth_network_p2p::error::RequestResult;
18use reth_network_peers::PeerId;
19use reth_primitives_traits::Block;
20use reth_storage_api::{BlockReader, HeaderProvider};
21use std::{
22 future::Future,
23 pin::Pin,
24 task::{Context, Poll},
25 time::Duration,
26};
27use tokio::sync::{mpsc::Receiver, oneshot};
28use tokio_stream::wrappers::ReceiverStream;
29
30pub const MAX_RECEIPTS_SERVE: usize = 1024;
36
37pub const MAX_HEADERS_SERVE: usize = 1024;
41
42pub const MAX_BODIES_SERVE: usize = 1024;
47
48pub const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
50
51#[derive(Debug)]
55#[must_use = "Manager does nothing unless polled."]
56pub struct EthRequestHandler<C, N: NetworkPrimitives = EthNetworkPrimitives> {
57 client: C,
59 #[expect(dead_code)]
62 peers: PeersHandle,
63 incoming_requests: ReceiverStream<IncomingEthRequest<N>>,
65 metrics: EthRequestHandlerMetrics,
67}
68
69impl<C, N: NetworkPrimitives> EthRequestHandler<C, N> {
71 pub fn new(client: C, peers: PeersHandle, incoming: Receiver<IncomingEthRequest<N>>) -> Self {
73 Self {
74 client,
75 peers,
76 incoming_requests: ReceiverStream::new(incoming),
77 metrics: Default::default(),
78 }
79 }
80}
81
82impl<C, N> EthRequestHandler<C, N>
83where
84 N: NetworkPrimitives,
85 C: BlockReader,
86{
87 fn get_headers_response(&self, request: GetBlockHeaders) -> Vec<C::Header> {
89 let GetBlockHeaders { start_block, limit, skip, direction } = request;
90
91 let mut headers = Vec::new();
92
93 let mut block: BlockHashOrNumber = match start_block {
94 BlockHashOrNumber::Hash(start) => start.into(),
95 BlockHashOrNumber::Number(num) => {
96 let Some(hash) = self.client.block_hash(num).unwrap_or_default() else {
97 return headers
98 };
99 hash.into()
100 }
101 };
102
103 let skip = skip as u64;
104 let mut total_bytes = 0;
105
106 for _ in 0..limit {
107 if let Some(header) = self.client.header_by_hash_or_number(block).unwrap_or_default() {
108 let number = header.number();
109 let parent_hash = header.parent_hash();
110
111 total_bytes += header.length();
112 headers.push(header);
113
114 if headers.len() >= MAX_HEADERS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
115 break
116 }
117
118 match direction {
119 HeadersDirection::Rising => {
120 if let Some(next) = number.checked_add(1).and_then(|n| n.checked_add(skip))
121 {
122 block = next.into()
123 } else {
124 break
125 }
126 }
127 HeadersDirection::Falling => {
128 if skip > 0 {
129 if let Some(next) =
132 number.checked_sub(1).and_then(|num| num.checked_sub(skip))
133 {
134 block = next.into()
135 } else {
136 break
137 }
138 } else {
139 block = parent_hash.into()
140 }
141 }
142 }
143 } else {
144 break
145 }
146 }
147
148 headers
149 }
150
151 fn on_headers_request(
152 &self,
153 _peer_id: PeerId,
154 request: GetBlockHeaders,
155 response: oneshot::Sender<RequestResult<BlockHeaders<C::Header>>>,
156 ) {
157 self.metrics.eth_headers_requests_received_total.increment(1);
158 let headers = self.get_headers_response(request);
159 let _ = response.send(Ok(BlockHeaders(headers)));
160 }
161
162 fn on_bodies_request(
163 &self,
164 _peer_id: PeerId,
165 request: GetBlockBodies,
166 response: oneshot::Sender<RequestResult<BlockBodies<<C::Block as Block>::Body>>>,
167 ) {
168 self.metrics.eth_bodies_requests_received_total.increment(1);
169 let mut bodies = Vec::new();
170
171 let mut total_bytes = 0;
172
173 for hash in request.0 {
174 if let Some(block) = self.client.block_by_hash(hash).unwrap_or_default() {
175 let body = block.into_body();
176 total_bytes += body.length();
177 bodies.push(body);
178
179 if bodies.len() >= MAX_BODIES_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
180 break
181 }
182 } else {
183 break
184 }
185 }
186
187 let _ = response.send(Ok(BlockBodies(bodies)));
188 }
189
190 fn on_receipts_request(
191 &self,
192 _peer_id: PeerId,
193 request: GetReceipts,
194 response: oneshot::Sender<RequestResult<Receipts<C::Receipt>>>,
195 ) {
196 self.metrics.eth_receipts_requests_received_total.increment(1);
197
198 let receipts = self.get_receipts_response(request, |receipts_by_block| {
199 receipts_by_block.into_iter().map(ReceiptWithBloom::from).collect::<Vec<_>>()
200 });
201
202 let _ = response.send(Ok(Receipts(receipts)));
203 }
204
205 fn on_receipts69_request(
206 &self,
207 _peer_id: PeerId,
208 request: GetReceipts,
209 response: oneshot::Sender<RequestResult<Receipts69<C::Receipt>>>,
210 ) {
211 self.metrics.eth_receipts_requests_received_total.increment(1);
212
213 let receipts = self.get_receipts_response(request, |receipts_by_block| {
214 receipts_by_block
216 });
217
218 let _ = response.send(Ok(Receipts69(receipts)));
219 }
220
221 fn on_receipts70_request(
225 &self,
226 _peer_id: PeerId,
227 request: GetReceipts70,
228 response: oneshot::Sender<RequestResult<Receipts70<C::Receipt>>>,
229 ) {
230 self.metrics.eth_receipts_requests_received_total.increment(1);
231
232 let GetReceipts70 { first_block_receipt_index, block_hashes } = request;
233
234 let mut receipts = Vec::new();
235 let mut total_bytes = 0usize;
236 let mut last_block_incomplete = false;
237
238 for (idx, hash) in block_hashes.into_iter().enumerate() {
239 if idx >= MAX_RECEIPTS_SERVE {
240 break
241 }
242
243 let Some(mut block_receipts) =
244 self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
245 else {
246 break
247 };
248
249 if idx == 0 && first_block_receipt_index > 0 {
250 let skip = first_block_receipt_index as usize;
251 if skip >= block_receipts.len() {
252 block_receipts.clear();
253 } else {
254 block_receipts.drain(0..skip);
255 }
256 }
257
258 let block_size = block_receipts.length();
259
260 if total_bytes + block_size <= SOFT_RESPONSE_LIMIT {
261 total_bytes += block_size;
262 receipts.push(block_receipts);
263 continue;
264 }
265
266 let mut partial_block = Vec::new();
267 for receipt in block_receipts {
268 let receipt_size = receipt.length();
269 if total_bytes + receipt_size > SOFT_RESPONSE_LIMIT {
270 break;
271 }
272 total_bytes += receipt_size;
273 partial_block.push(receipt);
274 }
275
276 receipts.push(partial_block);
277 last_block_incomplete = true;
278 break;
279 }
280
281 let _ = response.send(Ok(Receipts70 { last_block_incomplete, receipts }));
282 }
283
284 #[inline]
285 fn get_receipts_response<T, F>(&self, request: GetReceipts, transform_fn: F) -> Vec<Vec<T>>
286 where
287 F: Fn(Vec<C::Receipt>) -> Vec<T>,
288 T: Encodable,
289 {
290 let mut receipts = Vec::new();
291 let mut total_bytes = 0;
292
293 for hash in request.0 {
294 if let Some(receipts_by_block) =
295 self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
296 {
297 let transformed_receipts = transform_fn(receipts_by_block);
298 total_bytes += transformed_receipts.length();
299 receipts.push(transformed_receipts);
300
301 if receipts.len() >= MAX_RECEIPTS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
302 break
303 }
304 } else {
305 break
306 }
307 }
308
309 receipts
310 }
311}
312
313impl<C, N> Future for EthRequestHandler<C, N>
317where
318 N: NetworkPrimitives,
319 C: BlockReader<Block = N::Block, Receipt = N::Receipt>
320 + HeaderProvider<Header = N::BlockHeader>
321 + Unpin,
322{
323 type Output = ();
324
325 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
326 let this = self.get_mut();
327
328 let mut acc = Duration::ZERO;
329 let maybe_more_incoming_requests = metered_poll_nested_stream_with_budget!(
330 acc,
331 "net::eth",
332 "Incoming eth requests stream",
333 DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS,
334 this.incoming_requests.poll_next_unpin(cx),
335 |incoming| {
336 match incoming {
337 IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => {
338 this.on_headers_request(peer_id, request, response)
339 }
340 IncomingEthRequest::GetBlockBodies { peer_id, request, response } => {
341 this.on_bodies_request(peer_id, request, response)
342 }
343 IncomingEthRequest::GetNodeData { .. } => {
344 this.metrics.eth_node_data_requests_received_total.increment(1);
345 }
346 IncomingEthRequest::GetReceipts { peer_id, request, response } => {
347 this.on_receipts_request(peer_id, request, response)
348 }
349 IncomingEthRequest::GetReceipts69 { peer_id, request, response } => {
350 this.on_receipts69_request(peer_id, request, response)
351 }
352 IncomingEthRequest::GetReceipts70 { peer_id, request, response } => {
353 this.on_receipts70_request(peer_id, request, response)
354 }
355 }
356 },
357 );
358
359 this.metrics.acc_duration_poll_eth_req_handler.set(acc.as_secs_f64());
360
361 if maybe_more_incoming_requests {
363 cx.waker().wake_by_ref();
365 }
366
367 Poll::Pending
368 }
369}
370
371#[derive(Debug)]
373pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
374 GetBlockHeaders {
378 peer_id: PeerId,
380 request: GetBlockHeaders,
382 response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
384 },
385 GetBlockBodies {
389 peer_id: PeerId,
391 request: GetBlockBodies,
393 response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
395 },
396 GetNodeData {
400 peer_id: PeerId,
402 request: GetNodeData,
404 response: oneshot::Sender<RequestResult<NodeData>>,
406 },
407 GetReceipts {
411 peer_id: PeerId,
413 request: GetReceipts,
415 response: oneshot::Sender<RequestResult<Receipts<N::Receipt>>>,
417 },
418 GetReceipts69 {
422 peer_id: PeerId,
424 request: GetReceipts,
426 response: oneshot::Sender<RequestResult<Receipts69<N::Receipt>>>,
428 },
429 GetReceipts70 {
433 peer_id: PeerId,
435 request: GetReceipts70,
437 response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
439 },
440}