Skip to main content

reth_network/
eth_requests.rs

1//! Blocks/Headers management for the p2p network.
2
3use crate::{
4    budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
5    metrics::EthRequestHandlerMetrics,
6};
7use alloy_consensus::{BlockHeader, ReceiptWithBloom};
8use alloy_eips::BlockHashOrNumber;
9use alloy_primitives::Bytes;
10use alloy_rlp::Encodable;
11use futures::StreamExt;
12use reth_eth_wire::{
13    BlockAccessLists, BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockAccessLists,
14    GetBlockBodies, GetBlockHeaders, GetNodeData, GetReceipts, GetReceipts70, HeadersDirection,
15    NetworkPrimitives, NodeData, Receipts, Receipts69, Receipts70,
16};
17use reth_network_api::test_utils::PeersHandle;
18use reth_network_p2p::error::RequestResult;
19use reth_network_peers::PeerId;
20use reth_primitives_traits::Block;
21use reth_storage_api::{BlockReader, HeaderProvider};
22use std::{
23    future::Future,
24    pin::Pin,
25    task::{Context, Poll},
26    time::Duration,
27};
28use tokio::sync::{mpsc::Receiver, oneshot};
29use tokio_stream::wrappers::ReceiverStream;
30
31// Limits: <https://github.com/ethereum/go-ethereum/blob/b0d44338bbcefee044f1f635a84487cbbd8f0538/eth/protocols/eth/handler.go#L34-L56>
32
33/// Maximum number of receipts to serve.
34///
35/// Used to limit lookups.
36pub const MAX_RECEIPTS_SERVE: usize = 1024;
37
38/// Maximum number of block headers to serve.
39///
40/// Used to limit lookups.
41pub const MAX_HEADERS_SERVE: usize = 1024;
42
43/// Maximum number of block headers to serve.
44///
45/// Used to limit lookups. With 24KB block sizes nowadays, the practical limit will always be
46/// `SOFT_RESPONSE_LIMIT`.
47pub const MAX_BODIES_SERVE: usize = 1024;
48
49/// Maximum size of replies to data retrievals: 2MB
50pub const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
51
52/// Manages eth related requests on top of the p2p network.
53///
54/// This can be spawned to another task and is supposed to be run as background service.
55#[derive(Debug)]
56#[must_use = "Manager does nothing unless polled."]
57pub struct EthRequestHandler<C, N: NetworkPrimitives = EthNetworkPrimitives> {
58    /// The client type that can interact with the chain.
59    client: C,
60    /// Used for reporting peers.
61    // TODO use to report spammers
62    #[expect(dead_code)]
63    peers: PeersHandle,
64    /// Incoming request from the [`NetworkManager`](crate::NetworkManager).
65    incoming_requests: ReceiverStream<IncomingEthRequest<N>>,
66    /// Metrics for the eth request handler.
67    metrics: EthRequestHandlerMetrics,
68}
69
70// === impl EthRequestHandler ===
71impl<C, N: NetworkPrimitives> EthRequestHandler<C, N> {
72    /// Create a new instance
73    pub fn new(client: C, peers: PeersHandle, incoming: Receiver<IncomingEthRequest<N>>) -> Self {
74        Self {
75            client,
76            peers,
77            incoming_requests: ReceiverStream::new(incoming),
78            metrics: Default::default(),
79        }
80    }
81}
82
83impl<C, N> EthRequestHandler<C, N>
84where
85    N: NetworkPrimitives,
86    C: BlockReader,
87{
88    /// Returns the list of requested headers
89    fn get_headers_response(&self, request: GetBlockHeaders) -> Vec<C::Header> {
90        let GetBlockHeaders { start_block, limit, skip, direction } = request;
91
92        let mut headers = Vec::new();
93
94        let mut block: BlockHashOrNumber = match start_block {
95            BlockHashOrNumber::Hash(start) => start.into(),
96            BlockHashOrNumber::Number(num) => {
97                let Some(hash) = self.client.block_hash(num).unwrap_or_default() else {
98                    return headers
99                };
100                hash.into()
101            }
102        };
103
104        let skip = skip as u64;
105        let mut total_bytes = 0;
106
107        for _ in 0..limit {
108            if let Some(header) = self.client.header_by_hash_or_number(block).unwrap_or_default() {
109                let number = header.number();
110                let parent_hash = header.parent_hash();
111
112                total_bytes += header.length();
113                headers.push(header);
114
115                if headers.len() >= MAX_HEADERS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
116                    break
117                }
118
119                match direction {
120                    HeadersDirection::Rising => {
121                        if let Some(next) = number.checked_add(1).and_then(|n| n.checked_add(skip))
122                        {
123                            block = next.into()
124                        } else {
125                            break
126                        }
127                    }
128                    HeadersDirection::Falling => {
129                        if skip > 0 {
130                            // prevent under flows for block.number == 0 and `block.number - skip <
131                            // 0`
132                            if let Some(next) =
133                                number.checked_sub(1).and_then(|num| num.checked_sub(skip))
134                            {
135                                block = next.into()
136                            } else {
137                                break
138                            }
139                        } else {
140                            block = parent_hash.into()
141                        }
142                    }
143                }
144            } else {
145                break
146            }
147        }
148
149        headers
150    }
151
152    fn on_headers_request(
153        &self,
154        _peer_id: PeerId,
155        request: GetBlockHeaders,
156        response: oneshot::Sender<RequestResult<BlockHeaders<C::Header>>>,
157    ) {
158        self.metrics.eth_headers_requests_received_total.increment(1);
159        let headers = self.get_headers_response(request);
160        let _ = response.send(Ok(BlockHeaders(headers)));
161    }
162
163    fn on_bodies_request(
164        &self,
165        _peer_id: PeerId,
166        request: GetBlockBodies,
167        response: oneshot::Sender<RequestResult<BlockBodies<<C::Block as Block>::Body>>>,
168    ) {
169        self.metrics.eth_bodies_requests_received_total.increment(1);
170        let mut bodies = Vec::new();
171
172        let mut total_bytes = 0;
173
174        for hash in request {
175            if let Some(block) = self.client.block_by_hash(hash).unwrap_or_default() {
176                let body = block.into_body();
177                total_bytes += body.length();
178                bodies.push(body);
179
180                if bodies.len() >= MAX_BODIES_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
181                    break
182                }
183            } else {
184                break
185            }
186        }
187
188        let _ = response.send(Ok(BlockBodies(bodies)));
189    }
190
191    fn on_receipts_request(
192        &self,
193        _peer_id: PeerId,
194        request: GetReceipts,
195        response: oneshot::Sender<RequestResult<Receipts<C::Receipt>>>,
196    ) {
197        self.metrics.eth_receipts_requests_received_total.increment(1);
198
199        let receipts = self.get_receipts_response(request, |receipts_by_block| {
200            receipts_by_block.into_iter().map(ReceiptWithBloom::from).collect::<Vec<_>>()
201        });
202
203        let _ = response.send(Ok(Receipts(receipts)));
204    }
205
206    fn on_receipts69_request(
207        &self,
208        _peer_id: PeerId,
209        request: GetReceipts,
210        response: oneshot::Sender<RequestResult<Receipts69<C::Receipt>>>,
211    ) {
212        self.metrics.eth_receipts_requests_received_total.increment(1);
213
214        let receipts = self.get_receipts_response(request, |receipts_by_block| {
215            // skip bloom filter for eth69
216            receipts_by_block
217        });
218
219        let _ = response.send(Ok(Receipts69(receipts)));
220    }
221
222    /// Handles partial responses for [`GetReceipts70`] queries.
223    ///
224    /// This will adhere to the soft limit but allow filling the last vec partially.
225    fn on_receipts70_request(
226        &self,
227        _peer_id: PeerId,
228        request: GetReceipts70,
229        response: oneshot::Sender<RequestResult<Receipts70<C::Receipt>>>,
230    ) {
231        self.metrics.eth_receipts_requests_received_total.increment(1);
232
233        let GetReceipts70 { first_block_receipt_index, block_hashes } = request;
234
235        let mut receipts = Vec::new();
236        let mut total_bytes = 0usize;
237        let mut last_block_incomplete = false;
238
239        for (idx, hash) in block_hashes.into_iter().enumerate() {
240            if idx >= MAX_RECEIPTS_SERVE {
241                break
242            }
243
244            let Some(mut block_receipts) =
245                self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
246            else {
247                break
248            };
249
250            if idx == 0 && first_block_receipt_index > 0 {
251                let skip = first_block_receipt_index as usize;
252                if skip >= block_receipts.len() {
253                    block_receipts.clear();
254                } else {
255                    block_receipts.drain(0..skip);
256                }
257            }
258
259            let block_size = block_receipts.length();
260
261            if total_bytes + block_size <= SOFT_RESPONSE_LIMIT {
262                total_bytes += block_size;
263                receipts.push(block_receipts);
264                continue;
265            }
266
267            let mut partial_block = Vec::new();
268            for receipt in block_receipts {
269                let receipt_size = receipt.length();
270                if total_bytes + receipt_size > SOFT_RESPONSE_LIMIT {
271                    break;
272                }
273                total_bytes += receipt_size;
274                partial_block.push(receipt);
275            }
276
277            receipts.push(partial_block);
278            last_block_incomplete = true;
279            break;
280        }
281
282        let _ = response.send(Ok(Receipts70 { last_block_incomplete, receipts }));
283    }
284
285    /// Handles [`GetBlockAccessLists`] queries.
286    ///
287    /// For now this returns one empty BAL per requested hash.
288    fn on_block_access_lists_request(
289        &self,
290        _peer_id: PeerId,
291        request: GetBlockAccessLists,
292        response: oneshot::Sender<RequestResult<BlockAccessLists>>,
293    ) {
294        let access_lists = request.0.into_iter().map(|_| Bytes::new()).collect();
295        let _ = response.send(Ok(BlockAccessLists(access_lists)));
296    }
297
298    #[inline]
299    fn get_receipts_response<T, F>(&self, request: GetReceipts, transform_fn: F) -> Vec<Vec<T>>
300    where
301        F: Fn(Vec<C::Receipt>) -> Vec<T>,
302        T: Encodable,
303    {
304        let mut receipts = Vec::new();
305        let mut total_bytes = 0;
306
307        for hash in request {
308            if let Some(receipts_by_block) =
309                self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
310            {
311                let transformed_receipts = transform_fn(receipts_by_block);
312                total_bytes += transformed_receipts.length();
313                receipts.push(transformed_receipts);
314
315                if receipts.len() >= MAX_RECEIPTS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
316                    break
317                }
318            } else {
319                break
320            }
321        }
322
323        receipts
324    }
325}
326
327/// An endless future.
328///
329/// This should be spawned or used as part of `tokio::select!`.
330impl<C, N> Future for EthRequestHandler<C, N>
331where
332    N: NetworkPrimitives,
333    C: BlockReader<Block = N::Block, Receipt = N::Receipt>
334        + HeaderProvider<Header = N::BlockHeader>
335        + Unpin,
336{
337    type Output = ();
338
339    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
340        let this = self.get_mut();
341
342        let mut acc = Duration::ZERO;
343        let maybe_more_incoming_requests = metered_poll_nested_stream_with_budget!(
344            acc,
345            "net::eth",
346            "Incoming eth requests stream",
347            DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS,
348            this.incoming_requests.poll_next_unpin(cx),
349            |incoming| {
350                match incoming {
351                    IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => {
352                        this.on_headers_request(peer_id, request, response)
353                    }
354                    IncomingEthRequest::GetBlockBodies { peer_id, request, response } => {
355                        this.on_bodies_request(peer_id, request, response)
356                    }
357                    IncomingEthRequest::GetNodeData { .. } => {
358                        this.metrics.eth_node_data_requests_received_total.increment(1);
359                    }
360                    IncomingEthRequest::GetReceipts { peer_id, request, response } => {
361                        this.on_receipts_request(peer_id, request, response)
362                    }
363                    IncomingEthRequest::GetReceipts69 { peer_id, request, response } => {
364                        this.on_receipts69_request(peer_id, request, response)
365                    }
366                    IncomingEthRequest::GetReceipts70 { peer_id, request, response } => {
367                        this.on_receipts70_request(peer_id, request, response)
368                    }
369                    IncomingEthRequest::GetBlockAccessLists { peer_id, request, response } => {
370                        this.on_block_access_lists_request(peer_id, request, response)
371                    }
372                }
373            },
374        );
375
376        this.metrics.acc_duration_poll_eth_req_handler.set(acc.as_secs_f64());
377
378        // stream is fully drained and import futures pending
379        if maybe_more_incoming_requests {
380            // make sure we're woken up again
381            cx.waker().wake_by_ref();
382        }
383
384        Poll::Pending
385    }
386}
387
388/// All `eth` request related to blocks delegated by the network.
389#[derive(Debug)]
390pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
391    /// Request Block headers from the peer.
392    ///
393    /// The response should be sent through the channel.
394    GetBlockHeaders {
395        /// The ID of the peer to request block headers from.
396        peer_id: PeerId,
397        /// The specific block headers requested.
398        request: GetBlockHeaders,
399        /// The channel sender for the response containing block headers.
400        response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
401    },
402    /// Request Block bodies from the peer.
403    ///
404    /// The response should be sent through the channel.
405    GetBlockBodies {
406        /// The ID of the peer to request block bodies from.
407        peer_id: PeerId,
408        /// The specific block bodies requested.
409        request: GetBlockBodies,
410        /// The channel sender for the response containing block bodies.
411        response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
412    },
413    /// Request Node Data from the peer.
414    ///
415    /// The response should be sent through the channel.
416    GetNodeData {
417        /// The ID of the peer to request node data from.
418        peer_id: PeerId,
419        /// The specific node data requested.
420        request: GetNodeData,
421        /// The channel sender for the response containing node data.
422        response: oneshot::Sender<RequestResult<NodeData>>,
423    },
424    /// Request Receipts from the peer.
425    ///
426    /// The response should be sent through the channel.
427    GetReceipts {
428        /// The ID of the peer to request receipts from.
429        peer_id: PeerId,
430        /// The specific receipts requested.
431        request: GetReceipts,
432        /// The channel sender for the response containing receipts.
433        response: oneshot::Sender<RequestResult<Receipts<N::Receipt>>>,
434    },
435    /// Request Receipts from the peer without bloom filter.
436    ///
437    /// The response should be sent through the channel.
438    GetReceipts69 {
439        /// The ID of the peer to request receipts from.
440        peer_id: PeerId,
441        /// The specific receipts requested.
442        request: GetReceipts,
443        /// The channel sender for the response containing Receipts69.
444        response: oneshot::Sender<RequestResult<Receipts69<N::Receipt>>>,
445    },
446    /// Request Receipts from the peer using eth/70.
447    ///
448    /// The response should be sent through the channel.
449    GetReceipts70 {
450        /// The ID of the peer to request receipts from.
451        peer_id: PeerId,
452        /// The specific receipts requested including the `firstBlockReceiptIndex`.
453        request: GetReceipts70,
454        /// The channel sender for the response containing Receipts70.
455        response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
456    },
457    /// Request Block Access Lists from the peer.
458    ///
459    /// The response should be sent through the channel.
460    GetBlockAccessLists {
461        /// The ID of the peer to request block access lists from.
462        peer_id: PeerId,
463        /// The requested block hashes.
464        request: GetBlockAccessLists,
465        /// The channel sender for the response containing block access lists.
466        response: oneshot::Sender<RequestResult<BlockAccessLists>>,
467    },
468}