1use crate::{
4 budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
5 metrics::EthRequestHandlerMetrics,
6};
7use alloy_consensus::{BlockHeader, ReceiptWithBloom};
8use alloy_eips::BlockHashOrNumber;
9use alloy_rlp::Encodable;
10use futures::StreamExt;
11use reth_eth_wire::{
12 BlockAccessLists, BlockBodies, BlockHeaders, Cells, EthNetworkPrimitives, GetBlockAccessLists,
13 GetBlockBodies, GetBlockHeaders, GetCells, GetNodeData, GetReceipts, GetReceipts70,
14 HeadersDirection, NetworkPrimitives, NodeData, Receipts, Receipts69, Receipts70,
15};
16use reth_network_api::test_utils::PeersHandle;
17use reth_network_p2p::error::RequestResult;
18use reth_network_peers::PeerId;
19use reth_primitives_traits::Block;
20use reth_storage_api::{BalProvider, BlockReader, GetBlockAccessListLimit, HeaderProvider};
21use reth_transaction_pool::{blobstore::NoopBlobStore, BlobStore};
22use std::{
23 future::Future,
24 pin::Pin,
25 task::{Context, Poll},
26 time::Duration,
27};
28use tokio::sync::{mpsc::Receiver, oneshot};
29use tokio_stream::wrappers::ReceiverStream;
30
31pub const MAX_RECEIPTS_SERVE: usize = 1024;
37
38pub const MAX_HEADERS_SERVE: usize = 1024;
42
43pub const MAX_BODIES_SERVE: usize = 1024;
48
49pub const MAX_BLOCK_ACCESS_LISTS_SERVE: usize = 1024;
53
54pub const MAX_CELLS_SERVE: usize = 1024;
58
59pub const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
61
62#[derive(Debug)]
66#[must_use = "Manager does nothing unless polled."]
67pub struct EthRequestHandler<C, N: NetworkPrimitives = EthNetworkPrimitives> {
68 client: C,
70 blob_store: Box<dyn BlobStore>,
72 #[expect(dead_code)]
75 peers: PeersHandle,
76 incoming_requests: ReceiverStream<IncomingEthRequest<N>>,
78 metrics: EthRequestHandlerMetrics,
80}
81
82impl<C, N: NetworkPrimitives> EthRequestHandler<C, N> {
84 pub fn new(client: C, peers: PeersHandle, incoming: Receiver<IncomingEthRequest<N>>) -> Self {
86 Self {
87 client,
88 blob_store: Box::<NoopBlobStore>::default(),
89 peers,
90 incoming_requests: ReceiverStream::new(incoming),
91 metrics: Default::default(),
92 }
93 }
94
95 pub fn with_blob_store(mut self, blob_store: Box<dyn BlobStore>) -> Self {
97 self.blob_store = blob_store;
98 self
99 }
100}
101
102impl<C, N> EthRequestHandler<C, N>
103where
104 N: NetworkPrimitives,
105 C: BlockReader,
106{
107 fn get_headers_response(&self, request: GetBlockHeaders) -> Vec<C::Header> {
109 let GetBlockHeaders { start_block, limit, skip, direction } = request;
110
111 let mut headers = Vec::new();
112
113 let mut block: BlockHashOrNumber = match start_block {
114 BlockHashOrNumber::Hash(start) => start.into(),
115 BlockHashOrNumber::Number(num) => {
116 let Some(hash) = self.client.block_hash(num).unwrap_or_default() else {
117 return headers
118 };
119 hash.into()
120 }
121 };
122
123 let skip = skip as u64;
124 let mut total_bytes = 0;
125
126 for _ in 0..limit {
127 if let Some(header) = self.client.header_by_hash_or_number(block).unwrap_or_default() {
128 let number = header.number();
129 let parent_hash = header.parent_hash();
130
131 total_bytes += header.length();
132 headers.push(header);
133
134 if headers.len() >= MAX_HEADERS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
135 break
136 }
137
138 match direction {
139 HeadersDirection::Rising => {
140 if let Some(next) = number.checked_add(1).and_then(|n| n.checked_add(skip))
141 {
142 block = next.into()
143 } else {
144 break
145 }
146 }
147 HeadersDirection::Falling => {
148 if skip > 0 {
149 if let Some(next) =
152 number.checked_sub(1).and_then(|num| num.checked_sub(skip))
153 {
154 block = next.into()
155 } else {
156 break
157 }
158 } else {
159 block = parent_hash.into()
160 }
161 }
162 }
163 } else {
164 break
165 }
166 }
167
168 headers
169 }
170
171 fn on_headers_request(
172 &self,
173 _peer_id: PeerId,
174 request: GetBlockHeaders,
175 response: oneshot::Sender<RequestResult<BlockHeaders<C::Header>>>,
176 ) {
177 self.metrics.eth_headers_requests_received_total.increment(1);
178 let headers = self.get_headers_response(request);
179 let _ = response.send(Ok(BlockHeaders(headers)));
180 }
181
182 fn on_bodies_request(
183 &self,
184 _peer_id: PeerId,
185 request: GetBlockBodies,
186 response: oneshot::Sender<RequestResult<BlockBodies<<C::Block as Block>::Body>>>,
187 ) {
188 self.metrics.eth_bodies_requests_received_total.increment(1);
189 let mut bodies = Vec::new();
190
191 let mut total_bytes = 0;
192
193 for hash in request {
194 if let Some(block) = self.client.block_by_hash(hash).unwrap_or_default() {
195 let body = block.into_body();
196 total_bytes += body.length();
197 bodies.push(body);
198
199 if bodies.len() >= MAX_BODIES_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
200 break
201 }
202 } else {
203 break
204 }
205 }
206
207 let _ = response.send(Ok(BlockBodies(bodies)));
208 }
209
210 fn on_receipts_request(
211 &self,
212 _peer_id: PeerId,
213 request: GetReceipts,
214 response: oneshot::Sender<RequestResult<Receipts<C::Receipt>>>,
215 ) {
216 self.metrics.eth_receipts_requests_received_total.increment(1);
217
218 let receipts = self.get_receipts_response(request, |receipts_by_block| {
219 receipts_by_block.into_iter().map(ReceiptWithBloom::from).collect::<Vec<_>>()
220 });
221
222 let _ = response.send(Ok(Receipts(receipts)));
223 }
224
225 fn on_receipts69_request(
226 &self,
227 _peer_id: PeerId,
228 request: GetReceipts,
229 response: oneshot::Sender<RequestResult<Receipts69<C::Receipt>>>,
230 ) {
231 self.metrics.eth_receipts_requests_received_total.increment(1);
232
233 let receipts = self.get_receipts_response(request, |receipts_by_block| {
234 receipts_by_block
236 });
237
238 let _ = response.send(Ok(Receipts69(receipts)));
239 }
240
241 fn on_receipts70_request(
245 &self,
246 _peer_id: PeerId,
247 request: GetReceipts70,
248 response: oneshot::Sender<RequestResult<Receipts70<C::Receipt>>>,
249 ) {
250 self.metrics.eth_receipts_requests_received_total.increment(1);
251
252 let GetReceipts70 { first_block_receipt_index, block_hashes } = request;
253
254 let mut receipts = Vec::new();
255 let mut total_bytes = 0usize;
256 let mut last_block_incomplete = false;
257
258 for (idx, hash) in block_hashes.into_iter().enumerate() {
259 if idx >= MAX_RECEIPTS_SERVE {
260 break
261 }
262
263 let Some(mut block_receipts) =
264 self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
265 else {
266 break
267 };
268
269 if idx == 0 && first_block_receipt_index > 0 {
270 let skip = first_block_receipt_index as usize;
271 if skip >= block_receipts.len() {
272 block_receipts.clear();
273 } else {
274 block_receipts.drain(0..skip);
275 }
276 }
277
278 let block_size = block_receipts.length();
279
280 if total_bytes + block_size <= SOFT_RESPONSE_LIMIT {
281 total_bytes += block_size;
282 receipts.push(block_receipts);
283 continue;
284 }
285
286 let mut partial_block = Vec::new();
287 for receipt in block_receipts {
288 let receipt_size = receipt.length();
289 if total_bytes + receipt_size > SOFT_RESPONSE_LIMIT {
290 break;
291 }
292 total_bytes += receipt_size;
293 partial_block.push(receipt);
294 }
295
296 receipts.push(partial_block);
297 last_block_incomplete = true;
298 break;
299 }
300
301 let _ = response.send(Ok(Receipts70 { last_block_incomplete, receipts }));
302 }
303
304 #[inline]
305 fn get_receipts_response<T, F>(&self, request: GetReceipts, transform_fn: F) -> Vec<Vec<T>>
306 where
307 F: Fn(Vec<C::Receipt>) -> Vec<T>,
308 T: Encodable,
309 {
310 let mut receipts = Vec::new();
311 let mut total_bytes = 0;
312
313 for hash in request {
314 if let Some(receipts_by_block) =
315 self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
316 {
317 let transformed_receipts = transform_fn(receipts_by_block);
318 total_bytes += transformed_receipts.length();
319 receipts.push(transformed_receipts);
320
321 if receipts.len() >= MAX_RECEIPTS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
322 break
323 }
324 } else {
325 break
326 }
327 }
328
329 receipts
330 }
331
332 fn on_cells_request(
333 &self,
334 _peer_id: PeerId,
335 request: GetCells,
336 response: oneshot::Sender<RequestResult<Cells>>,
337 ) {
338 let mut cells_response = Cells { cell_mask: request.cell_mask, ..Default::default() };
339
340 for hash in request.hashes.into_iter().take(MAX_CELLS_SERVE) {
341 let Some(cells) =
342 self.blob_store.get_cells(hash, request.cell_mask).unwrap_or_default()
343 else {
344 continue;
345 };
346
347 cells_response.hashes.push(hash);
348 cells_response.cells.push(cells);
349
350 if cells_response.length() > SOFT_RESPONSE_LIMIT {
351 break
352 }
353 }
354
355 let _ = response.send(Ok(cells_response));
356 }
357}
358
359impl<C, N> EthRequestHandler<C, N>
360where
361 N: NetworkPrimitives,
362 C: BalProvider,
363{
364 fn on_block_access_lists_request(
369 &self,
370 _peer_id: PeerId,
371 mut request: GetBlockAccessLists,
372 response: oneshot::Sender<RequestResult<BlockAccessLists>>,
373 ) {
374 self.metrics.eth_block_access_lists_requests_received_total.increment(1);
375 request.0.truncate(MAX_BLOCK_ACCESS_LISTS_SERVE);
376
377 let limit = GetBlockAccessListLimit::ResponseSizeSoftLimit(SOFT_RESPONSE_LIMIT);
378 let access_lists =
379 self.client.bal_store().get_by_hashes_with_limit(&request.0, limit).unwrap_or_default();
380 let _ = response.send(Ok(BlockAccessLists(access_lists)));
381 }
382}
383
384impl<C, N> Future for EthRequestHandler<C, N>
388where
389 N: NetworkPrimitives,
390 C: BalProvider
391 + BlockReader<Block = N::Block, Receipt = N::Receipt>
392 + HeaderProvider<Header = N::BlockHeader>
393 + Unpin,
394{
395 type Output = ();
396
397 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
398 let this = self.get_mut();
399
400 let mut acc = Duration::ZERO;
401 let maybe_more_incoming_requests = metered_poll_nested_stream_with_budget!(
402 acc,
403 "net::eth",
404 "Incoming eth requests stream",
405 DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS,
406 this.incoming_requests.poll_next_unpin(cx),
407 |incoming| {
408 match incoming {
409 IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => {
410 this.on_headers_request(peer_id, request, response)
411 }
412 IncomingEthRequest::GetBlockBodies { peer_id, request, response } => {
413 this.on_bodies_request(peer_id, request, response)
414 }
415 IncomingEthRequest::GetNodeData { .. } => {
416 this.metrics.eth_node_data_requests_received_total.increment(1);
417 }
418 IncomingEthRequest::GetReceipts { peer_id, request, response } => {
419 this.on_receipts_request(peer_id, request, response)
420 }
421 IncomingEthRequest::GetReceipts69 { peer_id, request, response } => {
422 this.on_receipts69_request(peer_id, request, response)
423 }
424 IncomingEthRequest::GetReceipts70 { peer_id, request, response } => {
425 this.on_receipts70_request(peer_id, request, response)
426 }
427 IncomingEthRequest::GetBlockAccessLists { peer_id, request, response } => {
428 this.on_block_access_lists_request(peer_id, request, response)
429 }
430 IncomingEthRequest::GetCells { peer_id, request, response } => {
431 this.on_cells_request(peer_id, request, response)
432 }
433 }
434 },
435 );
436
437 this.metrics.acc_duration_poll_eth_req_handler.set(acc.as_secs_f64());
438
439 if maybe_more_incoming_requests {
441 cx.waker().wake_by_ref();
443 }
444
445 Poll::Pending
446 }
447}
448
449#[derive(Debug)]
451pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
452 GetBlockHeaders {
456 peer_id: PeerId,
458 request: GetBlockHeaders,
460 response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
462 },
463 GetBlockBodies {
467 peer_id: PeerId,
469 request: GetBlockBodies,
471 response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
473 },
474 GetNodeData {
478 peer_id: PeerId,
480 request: GetNodeData,
482 response: oneshot::Sender<RequestResult<NodeData>>,
484 },
485 GetReceipts {
489 peer_id: PeerId,
491 request: GetReceipts,
493 response: oneshot::Sender<RequestResult<Receipts<N::Receipt>>>,
495 },
496 GetReceipts69 {
500 peer_id: PeerId,
502 request: GetReceipts,
504 response: oneshot::Sender<RequestResult<Receipts69<N::Receipt>>>,
506 },
507 GetReceipts70 {
511 peer_id: PeerId,
513 request: GetReceipts70,
515 response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
517 },
518 GetBlockAccessLists {
522 peer_id: PeerId,
524 request: GetBlockAccessLists,
526 response: oneshot::Sender<RequestResult<BlockAccessLists>>,
528 },
529 GetCells {
533 peer_id: PeerId,
535 request: GetCells,
537 response: oneshot::Sender<RequestResult<Cells>>,
539 },
540}
541
542#[cfg(test)]
543mod tests {
544 use super::*;
545 use alloy_eips::{
546 eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
547 eip7594::{BlobTransactionSidecarVariant, Cell},
548 };
549 use alloy_primitives::{TxHash, B128, B256};
550 use reth_network_api::test_utils::PeersHandle;
551 use reth_storage_api::noop::NoopProvider;
552 use reth_transaction_pool::blobstore::{BlobStoreCleanupStat, BlobStoreError};
553 use std::sync::{
554 atomic::{AtomicUsize, Ordering},
555 Arc,
556 };
557 use tokio::sync::mpsc;
558
559 #[derive(Debug, Default)]
560 struct CountingBlobStore {
561 get_cells_calls: Arc<AtomicUsize>,
562 }
563
564 impl BlobStore for CountingBlobStore {
565 fn insert(
566 &self,
567 _tx: B256,
568 _data: BlobTransactionSidecarVariant,
569 ) -> Result<(), BlobStoreError> {
570 Ok(())
571 }
572
573 fn insert_all(
574 &self,
575 _txs: Vec<(B256, BlobTransactionSidecarVariant)>,
576 ) -> Result<(), BlobStoreError> {
577 Ok(())
578 }
579
580 fn delete(&self, _tx: B256) -> Result<(), BlobStoreError> {
581 Ok(())
582 }
583
584 fn delete_all(&self, _txs: Vec<B256>) -> Result<(), BlobStoreError> {
585 Ok(())
586 }
587
588 fn cleanup(&self) -> BlobStoreCleanupStat {
589 BlobStoreCleanupStat::default()
590 }
591
592 fn get(
593 &self,
594 _tx: B256,
595 ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
596 Ok(None)
597 }
598
599 fn contains(&self, _tx: B256) -> Result<bool, BlobStoreError> {
600 Ok(false)
601 }
602
603 fn get_all(
604 &self,
605 _txs: Vec<B256>,
606 ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
607 Ok(vec![])
608 }
609
610 fn get_exact(
611 &self,
612 txs: Vec<B256>,
613 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
614 if txs.is_empty() {
615 return Ok(vec![])
616 }
617
618 Err(BlobStoreError::MissingSidecar(txs[0]))
619 }
620
621 fn get_by_versioned_hashes_v1(
622 &self,
623 versioned_hashes: &[B256],
624 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
625 Ok(vec![None; versioned_hashes.len()])
626 }
627
628 fn get_by_versioned_hashes_v2(
629 &self,
630 _versioned_hashes: &[B256],
631 ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
632 Ok(None)
633 }
634
635 fn get_by_versioned_hashes_v3(
636 &self,
637 versioned_hashes: &[B256],
638 ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
639 Ok(vec![None; versioned_hashes.len()])
640 }
641
642 fn get_by_versioned_hashes_v4(
643 &self,
644 versioned_hashes: &[B256],
645 _indices_bitarray: B128,
646 ) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
647 Ok(vec![None; versioned_hashes.len()])
648 }
649
650 fn get_cells(
651 &self,
652 _tx_hash: TxHash,
653 _indices_bitarray: B128,
654 ) -> Result<Option<Vec<Cell>>, BlobStoreError> {
655 self.get_cells_calls.fetch_add(1, Ordering::Relaxed);
656 Ok(None)
657 }
658
659 fn data_size_hint(&self) -> Option<usize> {
660 Some(0)
661 }
662
663 fn blobs_len(&self) -> usize {
664 0
665 }
666 }
667
668 #[tokio::test]
669 async fn get_cells_request_limits_blob_store_lookups() {
670 let (peers_tx, _) = mpsc::unbounded_channel();
671 let (_incoming_tx, incoming_rx) = mpsc::channel(1);
672 let get_cells_calls = Arc::new(AtomicUsize::new(0));
673 let blob_store = CountingBlobStore { get_cells_calls: Arc::clone(&get_cells_calls) };
674 let handler = EthRequestHandler::<NoopProvider>::new(
675 NoopProvider::default(),
676 PeersHandle::new(peers_tx),
677 incoming_rx,
678 )
679 .with_blob_store(Box::new(blob_store));
680 let (response, rx) = oneshot::channel();
681 let request =
682 GetCells { hashes: vec![B256::ZERO; MAX_CELLS_SERVE + 1], cell_mask: B128::default() };
683
684 handler.on_cells_request(PeerId::default(), request, response);
685
686 let cells = rx.await.unwrap().unwrap();
687 assert!(cells.hashes.is_empty());
688 assert_eq!(get_cells_calls.load(Ordering::Relaxed), MAX_CELLS_SERVE);
689 }
690}