Skip to main content

reth_network/
eth_requests.rs

1//! Blocks/Headers management for the p2p network.
2
3use crate::{
4    budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
5    metrics::EthRequestHandlerMetrics,
6};
7use alloy_consensus::{BlockHeader, ReceiptWithBloom};
8use alloy_eips::BlockHashOrNumber;
9use alloy_rlp::Encodable;
10use futures::StreamExt;
11use reth_eth_wire::{
12    BlockAccessLists, BlockBodies, BlockHeaders, Cells, EthNetworkPrimitives, GetBlockAccessLists,
13    GetBlockBodies, GetBlockHeaders, GetCells, GetNodeData, GetReceipts, GetReceipts70,
14    HeadersDirection, NetworkPrimitives, NodeData, Receipts, Receipts69, Receipts70,
15};
16use reth_network_api::test_utils::PeersHandle;
17use reth_network_p2p::error::RequestResult;
18use reth_network_peers::PeerId;
19use reth_primitives_traits::Block;
20use reth_storage_api::{BalProvider, BlockReader, GetBlockAccessListLimit, HeaderProvider};
21use reth_transaction_pool::{blobstore::NoopBlobStore, BlobStore};
22use std::{
23    future::Future,
24    pin::Pin,
25    task::{Context, Poll},
26    time::Duration,
27};
28use tokio::sync::{mpsc::Receiver, oneshot};
29use tokio_stream::wrappers::ReceiverStream;
30
31// Limits: <https://github.com/ethereum/go-ethereum/blob/b0d44338bbcefee044f1f635a84487cbbd8f0538/eth/protocols/eth/handler.go#L34-L56>
32
33/// Maximum number of receipts to serve.
34///
35/// Used to limit lookups.
36pub const MAX_RECEIPTS_SERVE: usize = 1024;
37
38/// Maximum number of block headers to serve.
39///
40/// Used to limit lookups.
41pub const MAX_HEADERS_SERVE: usize = 1024;
42
43/// Maximum number of block headers to serve.
44///
45/// Used to limit lookups. With 24KB block sizes nowadays, the practical limit will always be
46/// `SOFT_RESPONSE_LIMIT`.
47pub const MAX_BODIES_SERVE: usize = 1024;
48
49/// Maximum number of block access lists to serve.
50///
51/// Used to limit lookups.
52pub const MAX_BLOCK_ACCESS_LISTS_SERVE: usize = 1024;
53
54/// Maximum number of cell lookups to serve.
55///
56/// Used to limit lookups.
57pub const MAX_CELLS_SERVE: usize = 1024;
58
59/// Maximum size of replies to data retrievals: 2MB
60pub const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
61
62/// Manages eth related requests on top of the p2p network.
63///
64/// This can be spawned to another task and is supposed to be run as background service.
65#[derive(Debug)]
66#[must_use = "Manager does nothing unless polled."]
67pub struct EthRequestHandler<C, N: NetworkPrimitives = EthNetworkPrimitives> {
68    /// The client type that can interact with the chain.
69    client: C,
70    /// Blob store used for serving blob cell requests.
71    blob_store: Box<dyn BlobStore>,
72    /// Used for reporting peers.
73    // TODO use to report spammers
74    #[expect(dead_code)]
75    peers: PeersHandle,
76    /// Incoming request from the [`NetworkManager`](crate::NetworkManager).
77    incoming_requests: ReceiverStream<IncomingEthRequest<N>>,
78    /// Metrics for the eth request handler.
79    metrics: EthRequestHandlerMetrics,
80}
81
82// === impl EthRequestHandler ===
83impl<C, N: NetworkPrimitives> EthRequestHandler<C, N> {
84    /// Create a new instance
85    pub fn new(client: C, peers: PeersHandle, incoming: Receiver<IncomingEthRequest<N>>) -> Self {
86        Self {
87            client,
88            blob_store: Box::<NoopBlobStore>::default(),
89            peers,
90            incoming_requests: ReceiverStream::new(incoming),
91            metrics: Default::default(),
92        }
93    }
94
95    /// Set blob store for the request handler
96    pub fn with_blob_store(mut self, blob_store: Box<dyn BlobStore>) -> Self {
97        self.blob_store = blob_store;
98        self
99    }
100}
101
102impl<C, N> EthRequestHandler<C, N>
103where
104    N: NetworkPrimitives,
105    C: BlockReader,
106{
107    /// Returns the list of requested headers
108    fn get_headers_response(&self, request: GetBlockHeaders) -> Vec<C::Header> {
109        let GetBlockHeaders { start_block, limit, skip, direction } = request;
110
111        let mut headers = Vec::new();
112
113        let mut block: BlockHashOrNumber = match start_block {
114            BlockHashOrNumber::Hash(start) => start.into(),
115            BlockHashOrNumber::Number(num) => {
116                let Some(hash) = self.client.block_hash(num).unwrap_or_default() else {
117                    return headers
118                };
119                hash.into()
120            }
121        };
122
123        let skip = skip as u64;
124        let mut total_bytes = 0;
125
126        for _ in 0..limit {
127            if let Some(header) = self.client.header_by_hash_or_number(block).unwrap_or_default() {
128                let number = header.number();
129                let parent_hash = header.parent_hash();
130
131                total_bytes += header.length();
132                headers.push(header);
133
134                if headers.len() >= MAX_HEADERS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
135                    break
136                }
137
138                match direction {
139                    HeadersDirection::Rising => {
140                        if let Some(next) = number.checked_add(1).and_then(|n| n.checked_add(skip))
141                        {
142                            block = next.into()
143                        } else {
144                            break
145                        }
146                    }
147                    HeadersDirection::Falling => {
148                        if skip > 0 {
149                            // prevent under flows for block.number == 0 and `block.number - skip <
150                            // 0`
151                            if let Some(next) =
152                                number.checked_sub(1).and_then(|num| num.checked_sub(skip))
153                            {
154                                block = next.into()
155                            } else {
156                                break
157                            }
158                        } else {
159                            block = parent_hash.into()
160                        }
161                    }
162                }
163            } else {
164                break
165            }
166        }
167
168        headers
169    }
170
171    fn on_headers_request(
172        &self,
173        _peer_id: PeerId,
174        request: GetBlockHeaders,
175        response: oneshot::Sender<RequestResult<BlockHeaders<C::Header>>>,
176    ) {
177        self.metrics.eth_headers_requests_received_total.increment(1);
178        let headers = self.get_headers_response(request);
179        let _ = response.send(Ok(BlockHeaders(headers)));
180    }
181
182    fn on_bodies_request(
183        &self,
184        _peer_id: PeerId,
185        request: GetBlockBodies,
186        response: oneshot::Sender<RequestResult<BlockBodies<<C::Block as Block>::Body>>>,
187    ) {
188        self.metrics.eth_bodies_requests_received_total.increment(1);
189        let mut bodies = Vec::new();
190
191        let mut total_bytes = 0;
192
193        for hash in request {
194            if let Some(block) = self.client.block_by_hash(hash).unwrap_or_default() {
195                let body = block.into_body();
196                total_bytes += body.length();
197                bodies.push(body);
198
199                if bodies.len() >= MAX_BODIES_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
200                    break
201                }
202            } else {
203                break
204            }
205        }
206
207        let _ = response.send(Ok(BlockBodies(bodies)));
208    }
209
210    fn on_receipts_request(
211        &self,
212        _peer_id: PeerId,
213        request: GetReceipts,
214        response: oneshot::Sender<RequestResult<Receipts<C::Receipt>>>,
215    ) {
216        self.metrics.eth_receipts_requests_received_total.increment(1);
217
218        let receipts = self.get_receipts_response(request, |receipts_by_block| {
219            receipts_by_block.into_iter().map(ReceiptWithBloom::from).collect::<Vec<_>>()
220        });
221
222        let _ = response.send(Ok(Receipts(receipts)));
223    }
224
225    fn on_receipts69_request(
226        &self,
227        _peer_id: PeerId,
228        request: GetReceipts,
229        response: oneshot::Sender<RequestResult<Receipts69<C::Receipt>>>,
230    ) {
231        self.metrics.eth_receipts_requests_received_total.increment(1);
232
233        let receipts = self.get_receipts_response(request, |receipts_by_block| {
234            // skip bloom filter for eth69
235            receipts_by_block
236        });
237
238        let _ = response.send(Ok(Receipts69(receipts)));
239    }
240
241    /// Handles partial responses for [`GetReceipts70`] queries.
242    ///
243    /// This will adhere to the soft limit but allow filling the last vec partially.
244    fn on_receipts70_request(
245        &self,
246        _peer_id: PeerId,
247        request: GetReceipts70,
248        response: oneshot::Sender<RequestResult<Receipts70<C::Receipt>>>,
249    ) {
250        self.metrics.eth_receipts_requests_received_total.increment(1);
251
252        let GetReceipts70 { first_block_receipt_index, block_hashes } = request;
253
254        let mut receipts = Vec::new();
255        let mut total_bytes = 0usize;
256        let mut last_block_incomplete = false;
257
258        for (idx, hash) in block_hashes.into_iter().enumerate() {
259            if idx >= MAX_RECEIPTS_SERVE {
260                break
261            }
262
263            let Some(mut block_receipts) =
264                self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
265            else {
266                break
267            };
268
269            if idx == 0 && first_block_receipt_index > 0 {
270                let skip = first_block_receipt_index as usize;
271                if skip >= block_receipts.len() {
272                    block_receipts.clear();
273                } else {
274                    block_receipts.drain(0..skip);
275                }
276            }
277
278            let block_size = block_receipts.length();
279
280            if total_bytes + block_size <= SOFT_RESPONSE_LIMIT {
281                total_bytes += block_size;
282                receipts.push(block_receipts);
283                continue;
284            }
285
286            let mut partial_block = Vec::new();
287            for receipt in block_receipts {
288                let receipt_size = receipt.length();
289                if total_bytes + receipt_size > SOFT_RESPONSE_LIMIT {
290                    break;
291                }
292                total_bytes += receipt_size;
293                partial_block.push(receipt);
294            }
295
296            receipts.push(partial_block);
297            last_block_incomplete = true;
298            break;
299        }
300
301        let _ = response.send(Ok(Receipts70 { last_block_incomplete, receipts }));
302    }
303
304    #[inline]
305    fn get_receipts_response<T, F>(&self, request: GetReceipts, transform_fn: F) -> Vec<Vec<T>>
306    where
307        F: Fn(Vec<C::Receipt>) -> Vec<T>,
308        T: Encodable,
309    {
310        let mut receipts = Vec::new();
311        let mut total_bytes = 0;
312
313        for hash in request {
314            if let Some(receipts_by_block) =
315                self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
316            {
317                let transformed_receipts = transform_fn(receipts_by_block);
318                total_bytes += transformed_receipts.length();
319                receipts.push(transformed_receipts);
320
321                if receipts.len() >= MAX_RECEIPTS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
322                    break
323                }
324            } else {
325                break
326            }
327        }
328
329        receipts
330    }
331
332    fn on_cells_request(
333        &self,
334        _peer_id: PeerId,
335        request: GetCells,
336        response: oneshot::Sender<RequestResult<Cells>>,
337    ) {
338        let mut cells_response = Cells { cell_mask: request.cell_mask, ..Default::default() };
339
340        for hash in request.hashes.into_iter().take(MAX_CELLS_SERVE) {
341            let Some(cells) =
342                self.blob_store.get_cells(hash, request.cell_mask).unwrap_or_default()
343            else {
344                continue;
345            };
346
347            cells_response.hashes.push(hash);
348            cells_response.cells.push(cells);
349
350            if cells_response.length() > SOFT_RESPONSE_LIMIT {
351                break
352            }
353        }
354
355        let _ = response.send(Ok(cells_response));
356    }
357}
358
359impl<C, N> EthRequestHandler<C, N>
360where
361    N: NetworkPrimitives,
362    C: BalProvider,
363{
364    /// Handles [`GetBlockAccessLists`] queries.
365    ///
366    /// EIP-8159 defines the final `BlockAccessLists` response semantics:
367    /// <https://eips.ethereum.org/EIPS/eip-8159>
368    fn on_block_access_lists_request(
369        &self,
370        _peer_id: PeerId,
371        mut request: GetBlockAccessLists,
372        response: oneshot::Sender<RequestResult<BlockAccessLists>>,
373    ) {
374        self.metrics.eth_block_access_lists_requests_received_total.increment(1);
375        request.0.truncate(MAX_BLOCK_ACCESS_LISTS_SERVE);
376
377        let limit = GetBlockAccessListLimit::ResponseSizeSoftLimit(SOFT_RESPONSE_LIMIT);
378        let access_lists =
379            self.client.bal_store().get_by_hashes_with_limit(&request.0, limit).unwrap_or_default();
380        let _ = response.send(Ok(BlockAccessLists(access_lists)));
381    }
382}
383
384/// An endless future.
385///
386/// This should be spawned or used as part of `tokio::select!`.
387impl<C, N> Future for EthRequestHandler<C, N>
388where
389    N: NetworkPrimitives,
390    C: BalProvider
391        + BlockReader<Block = N::Block, Receipt = N::Receipt>
392        + HeaderProvider<Header = N::BlockHeader>
393        + Unpin,
394{
395    type Output = ();
396
397    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
398        let this = self.get_mut();
399
400        let mut acc = Duration::ZERO;
401        let maybe_more_incoming_requests = metered_poll_nested_stream_with_budget!(
402            acc,
403            "net::eth",
404            "Incoming eth requests stream",
405            DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS,
406            this.incoming_requests.poll_next_unpin(cx),
407            |incoming| {
408                match incoming {
409                    IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => {
410                        this.on_headers_request(peer_id, request, response)
411                    }
412                    IncomingEthRequest::GetBlockBodies { peer_id, request, response } => {
413                        this.on_bodies_request(peer_id, request, response)
414                    }
415                    IncomingEthRequest::GetNodeData { .. } => {
416                        this.metrics.eth_node_data_requests_received_total.increment(1);
417                    }
418                    IncomingEthRequest::GetReceipts { peer_id, request, response } => {
419                        this.on_receipts_request(peer_id, request, response)
420                    }
421                    IncomingEthRequest::GetReceipts69 { peer_id, request, response } => {
422                        this.on_receipts69_request(peer_id, request, response)
423                    }
424                    IncomingEthRequest::GetReceipts70 { peer_id, request, response } => {
425                        this.on_receipts70_request(peer_id, request, response)
426                    }
427                    IncomingEthRequest::GetBlockAccessLists { peer_id, request, response } => {
428                        this.on_block_access_lists_request(peer_id, request, response)
429                    }
430                    IncomingEthRequest::GetCells { peer_id, request, response } => {
431                        this.on_cells_request(peer_id, request, response)
432                    }
433                }
434            },
435        );
436
437        this.metrics.acc_duration_poll_eth_req_handler.set(acc.as_secs_f64());
438
439        // stream is fully drained and import futures pending
440        if maybe_more_incoming_requests {
441            // make sure we're woken up again
442            cx.waker().wake_by_ref();
443        }
444
445        Poll::Pending
446    }
447}
448
449/// All `eth` request related to blocks delegated by the network.
450#[derive(Debug)]
451pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
452    /// Request Block headers from the peer.
453    ///
454    /// The response should be sent through the channel.
455    GetBlockHeaders {
456        /// The ID of the peer to request block headers from.
457        peer_id: PeerId,
458        /// The specific block headers requested.
459        request: GetBlockHeaders,
460        /// The channel sender for the response containing block headers.
461        response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
462    },
463    /// Request Block bodies from the peer.
464    ///
465    /// The response should be sent through the channel.
466    GetBlockBodies {
467        /// The ID of the peer to request block bodies from.
468        peer_id: PeerId,
469        /// The specific block bodies requested.
470        request: GetBlockBodies,
471        /// The channel sender for the response containing block bodies.
472        response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
473    },
474    /// Request Node Data from the peer.
475    ///
476    /// The response should be sent through the channel.
477    GetNodeData {
478        /// The ID of the peer to request node data from.
479        peer_id: PeerId,
480        /// The specific node data requested.
481        request: GetNodeData,
482        /// The channel sender for the response containing node data.
483        response: oneshot::Sender<RequestResult<NodeData>>,
484    },
485    /// Request Receipts from the peer.
486    ///
487    /// The response should be sent through the channel.
488    GetReceipts {
489        /// The ID of the peer to request receipts from.
490        peer_id: PeerId,
491        /// The specific receipts requested.
492        request: GetReceipts,
493        /// The channel sender for the response containing receipts.
494        response: oneshot::Sender<RequestResult<Receipts<N::Receipt>>>,
495    },
496    /// Request Receipts from the peer without bloom filter.
497    ///
498    /// The response should be sent through the channel.
499    GetReceipts69 {
500        /// The ID of the peer to request receipts from.
501        peer_id: PeerId,
502        /// The specific receipts requested.
503        request: GetReceipts,
504        /// The channel sender for the response containing Receipts69.
505        response: oneshot::Sender<RequestResult<Receipts69<N::Receipt>>>,
506    },
507    /// Request Receipts from the peer using eth/70.
508    ///
509    /// The response should be sent through the channel.
510    GetReceipts70 {
511        /// The ID of the peer to request receipts from.
512        peer_id: PeerId,
513        /// The specific receipts requested including the `firstBlockReceiptIndex`.
514        request: GetReceipts70,
515        /// The channel sender for the response containing Receipts70.
516        response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
517    },
518    /// Request Block Access Lists from the peer.
519    ///
520    /// The response should be sent through the channel.
521    GetBlockAccessLists {
522        /// The ID of the peer to request block access lists from.
523        peer_id: PeerId,
524        /// The requested block hashes.
525        request: GetBlockAccessLists,
526        /// The channel sender for the response containing block access lists.
527        response: oneshot::Sender<RequestResult<BlockAccessLists>>,
528    },
529    /// Request Cells from the peer.
530    ///
531    /// The response should be sent through the channel.
532    GetCells {
533        /// The ID of the peer to request cells from.
534        peer_id: PeerId,
535        /// The requested block hashes.
536        request: GetCells,
537        /// The channel sender for the response containing cells.
538        response: oneshot::Sender<RequestResult<Cells>>,
539    },
540}
541
542#[cfg(test)]
543mod tests {
544    use super::*;
545    use alloy_eips::{
546        eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
547        eip7594::{BlobTransactionSidecarVariant, Cell},
548    };
549    use alloy_primitives::{TxHash, B128, B256};
550    use reth_network_api::test_utils::PeersHandle;
551    use reth_storage_api::noop::NoopProvider;
552    use reth_transaction_pool::blobstore::{BlobStoreCleanupStat, BlobStoreError};
553    use std::sync::{
554        atomic::{AtomicUsize, Ordering},
555        Arc,
556    };
557    use tokio::sync::mpsc;
558
559    #[derive(Debug, Default)]
560    struct CountingBlobStore {
561        get_cells_calls: Arc<AtomicUsize>,
562    }
563
564    impl BlobStore for CountingBlobStore {
565        fn insert(
566            &self,
567            _tx: B256,
568            _data: BlobTransactionSidecarVariant,
569        ) -> Result<(), BlobStoreError> {
570            Ok(())
571        }
572
573        fn insert_all(
574            &self,
575            _txs: Vec<(B256, BlobTransactionSidecarVariant)>,
576        ) -> Result<(), BlobStoreError> {
577            Ok(())
578        }
579
580        fn delete(&self, _tx: B256) -> Result<(), BlobStoreError> {
581            Ok(())
582        }
583
584        fn delete_all(&self, _txs: Vec<B256>) -> Result<(), BlobStoreError> {
585            Ok(())
586        }
587
588        fn cleanup(&self) -> BlobStoreCleanupStat {
589            BlobStoreCleanupStat::default()
590        }
591
592        fn get(
593            &self,
594            _tx: B256,
595        ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
596            Ok(None)
597        }
598
599        fn contains(&self, _tx: B256) -> Result<bool, BlobStoreError> {
600            Ok(false)
601        }
602
603        fn get_all(
604            &self,
605            _txs: Vec<B256>,
606        ) -> Result<Vec<(B256, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
607            Ok(vec![])
608        }
609
610        fn get_exact(
611            &self,
612            txs: Vec<B256>,
613        ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
614            if txs.is_empty() {
615                return Ok(vec![])
616            }
617
618            Err(BlobStoreError::MissingSidecar(txs[0]))
619        }
620
621        fn get_by_versioned_hashes_v1(
622            &self,
623            versioned_hashes: &[B256],
624        ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
625            Ok(vec![None; versioned_hashes.len()])
626        }
627
628        fn get_by_versioned_hashes_v2(
629            &self,
630            _versioned_hashes: &[B256],
631        ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
632            Ok(None)
633        }
634
635        fn get_by_versioned_hashes_v3(
636            &self,
637            versioned_hashes: &[B256],
638        ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
639            Ok(vec![None; versioned_hashes.len()])
640        }
641
642        fn get_by_versioned_hashes_v4(
643            &self,
644            versioned_hashes: &[B256],
645            _indices_bitarray: B128,
646        ) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
647            Ok(vec![None; versioned_hashes.len()])
648        }
649
650        fn get_cells(
651            &self,
652            _tx_hash: TxHash,
653            _indices_bitarray: B128,
654        ) -> Result<Option<Vec<Cell>>, BlobStoreError> {
655            self.get_cells_calls.fetch_add(1, Ordering::Relaxed);
656            Ok(None)
657        }
658
659        fn data_size_hint(&self) -> Option<usize> {
660            Some(0)
661        }
662
663        fn blobs_len(&self) -> usize {
664            0
665        }
666    }
667
668    #[tokio::test]
669    async fn get_cells_request_limits_blob_store_lookups() {
670        let (peers_tx, _) = mpsc::unbounded_channel();
671        let (_incoming_tx, incoming_rx) = mpsc::channel(1);
672        let get_cells_calls = Arc::new(AtomicUsize::new(0));
673        let blob_store = CountingBlobStore { get_cells_calls: Arc::clone(&get_cells_calls) };
674        let handler = EthRequestHandler::<NoopProvider>::new(
675            NoopProvider::default(),
676            PeersHandle::new(peers_tx),
677            incoming_rx,
678        )
679        .with_blob_store(Box::new(blob_store));
680        let (response, rx) = oneshot::channel();
681        let request =
682            GetCells { hashes: vec![B256::ZERO; MAX_CELLS_SERVE + 1], cell_mask: B128::default() };
683
684        handler.on_cells_request(PeerId::default(), request, response);
685
686        let cells = rx.await.unwrap().unwrap();
687        assert!(cells.hashes.is_empty());
688        assert_eq!(get_cells_calls.load(Ordering::Relaxed), MAX_CELLS_SERVE);
689    }
690}