Skip to main content

reth_network/
eth_requests.rs

1//! Blocks/Headers management for the p2p network.
2
3use crate::{
4    budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
5    metrics::EthRequestHandlerMetrics,
6};
7use alloy_consensus::{BlockHeader, ReceiptWithBloom};
8use alloy_eips::BlockHashOrNumber;
9use alloy_rlp::Encodable;
10use futures::StreamExt;
11use reth_eth_wire::{
12    BlockAccessLists, BlockBodies, BlockHeaders, Cells, EthNetworkPrimitives, GetBlockAccessLists,
13    GetBlockBodies, GetBlockHeaders, GetCells, GetNodeData, GetReceipts, GetReceipts70,
14    HeadersDirection, NetworkPrimitives, NodeData, Receipts, Receipts69, Receipts70,
15};
16use reth_network_api::test_utils::PeersHandle;
17use reth_network_p2p::error::RequestResult;
18use reth_network_peers::PeerId;
19use reth_primitives_traits::Block;
20use reth_storage_api::{BalProvider, BlockReader, GetBlockAccessListLimit, HeaderProvider};
21use std::{
22    future::Future,
23    pin::Pin,
24    task::{Context, Poll},
25    time::Duration,
26};
27use tokio::sync::{mpsc::Receiver, oneshot};
28use tokio_stream::wrappers::ReceiverStream;
29
30// Limits: <https://github.com/ethereum/go-ethereum/blob/b0d44338bbcefee044f1f635a84487cbbd8f0538/eth/protocols/eth/handler.go#L34-L56>
31
32/// Maximum number of receipts to serve.
33///
34/// Used to limit lookups.
35pub const MAX_RECEIPTS_SERVE: usize = 1024;
36
37/// Maximum number of block headers to serve.
38///
39/// Used to limit lookups.
40pub const MAX_HEADERS_SERVE: usize = 1024;
41
42/// Maximum number of block headers to serve.
43///
44/// Used to limit lookups. With 24KB block sizes nowadays, the practical limit will always be
45/// `SOFT_RESPONSE_LIMIT`.
46pub const MAX_BODIES_SERVE: usize = 1024;
47
48/// Maximum number of block access lists to serve.
49///
50/// Used to limit lookups.
51pub const MAX_BLOCK_ACCESS_LISTS_SERVE: usize = 1024;
52
53/// Maximum size of replies to data retrievals: 2MB
54pub const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
55
56/// Manages eth related requests on top of the p2p network.
57///
58/// This can be spawned to another task and is supposed to be run as background service.
59#[derive(Debug)]
60#[must_use = "Manager does nothing unless polled."]
61pub struct EthRequestHandler<C, N: NetworkPrimitives = EthNetworkPrimitives> {
62    /// The client type that can interact with the chain.
63    client: C,
64    /// Used for reporting peers.
65    // TODO use to report spammers
66    #[expect(dead_code)]
67    peers: PeersHandle,
68    /// Incoming request from the [`NetworkManager`](crate::NetworkManager).
69    incoming_requests: ReceiverStream<IncomingEthRequest<N>>,
70    /// Metrics for the eth request handler.
71    metrics: EthRequestHandlerMetrics,
72}
73
74// === impl EthRequestHandler ===
75impl<C, N: NetworkPrimitives> EthRequestHandler<C, N> {
76    /// Create a new instance
77    pub fn new(client: C, peers: PeersHandle, incoming: Receiver<IncomingEthRequest<N>>) -> Self {
78        Self {
79            client,
80            peers,
81            incoming_requests: ReceiverStream::new(incoming),
82            metrics: Default::default(),
83        }
84    }
85}
86
87impl<C, N> EthRequestHandler<C, N>
88where
89    N: NetworkPrimitives,
90    C: BlockReader,
91{
92    /// Returns the list of requested headers
93    fn get_headers_response(&self, request: GetBlockHeaders) -> Vec<C::Header> {
94        let GetBlockHeaders { start_block, limit, skip, direction } = request;
95
96        let mut headers = Vec::new();
97
98        let mut block: BlockHashOrNumber = match start_block {
99            BlockHashOrNumber::Hash(start) => start.into(),
100            BlockHashOrNumber::Number(num) => {
101                let Some(hash) = self.client.block_hash(num).unwrap_or_default() else {
102                    return headers
103                };
104                hash.into()
105            }
106        };
107
108        let skip = skip as u64;
109        let mut total_bytes = 0;
110
111        for _ in 0..limit {
112            if let Some(header) = self.client.header_by_hash_or_number(block).unwrap_or_default() {
113                let number = header.number();
114                let parent_hash = header.parent_hash();
115
116                total_bytes += header.length();
117                headers.push(header);
118
119                if headers.len() >= MAX_HEADERS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
120                    break
121                }
122
123                match direction {
124                    HeadersDirection::Rising => {
125                        if let Some(next) = number.checked_add(1).and_then(|n| n.checked_add(skip))
126                        {
127                            block = next.into()
128                        } else {
129                            break
130                        }
131                    }
132                    HeadersDirection::Falling => {
133                        if skip > 0 {
134                            // prevent under flows for block.number == 0 and `block.number - skip <
135                            // 0`
136                            if let Some(next) =
137                                number.checked_sub(1).and_then(|num| num.checked_sub(skip))
138                            {
139                                block = next.into()
140                            } else {
141                                break
142                            }
143                        } else {
144                            block = parent_hash.into()
145                        }
146                    }
147                }
148            } else {
149                break
150            }
151        }
152
153        headers
154    }
155
156    fn on_headers_request(
157        &self,
158        _peer_id: PeerId,
159        request: GetBlockHeaders,
160        response: oneshot::Sender<RequestResult<BlockHeaders<C::Header>>>,
161    ) {
162        self.metrics.eth_headers_requests_received_total.increment(1);
163        let headers = self.get_headers_response(request);
164        let _ = response.send(Ok(BlockHeaders(headers)));
165    }
166
167    fn on_bodies_request(
168        &self,
169        _peer_id: PeerId,
170        request: GetBlockBodies,
171        response: oneshot::Sender<RequestResult<BlockBodies<<C::Block as Block>::Body>>>,
172    ) {
173        self.metrics.eth_bodies_requests_received_total.increment(1);
174        let mut bodies = Vec::new();
175
176        let mut total_bytes = 0;
177
178        for hash in request {
179            if let Some(block) = self.client.block_by_hash(hash).unwrap_or_default() {
180                let body = block.into_body();
181                total_bytes += body.length();
182                bodies.push(body);
183
184                if bodies.len() >= MAX_BODIES_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
185                    break
186                }
187            } else {
188                break
189            }
190        }
191
192        let _ = response.send(Ok(BlockBodies(bodies)));
193    }
194
195    fn on_receipts_request(
196        &self,
197        _peer_id: PeerId,
198        request: GetReceipts,
199        response: oneshot::Sender<RequestResult<Receipts<C::Receipt>>>,
200    ) {
201        self.metrics.eth_receipts_requests_received_total.increment(1);
202
203        let receipts = self.get_receipts_response(request, |receipts_by_block| {
204            receipts_by_block.into_iter().map(ReceiptWithBloom::from).collect::<Vec<_>>()
205        });
206
207        let _ = response.send(Ok(Receipts(receipts)));
208    }
209
210    fn on_receipts69_request(
211        &self,
212        _peer_id: PeerId,
213        request: GetReceipts,
214        response: oneshot::Sender<RequestResult<Receipts69<C::Receipt>>>,
215    ) {
216        self.metrics.eth_receipts_requests_received_total.increment(1);
217
218        let receipts = self.get_receipts_response(request, |receipts_by_block| {
219            // skip bloom filter for eth69
220            receipts_by_block
221        });
222
223        let _ = response.send(Ok(Receipts69(receipts)));
224    }
225
226    /// Handles partial responses for [`GetReceipts70`] queries.
227    ///
228    /// This will adhere to the soft limit but allow filling the last vec partially.
229    fn on_receipts70_request(
230        &self,
231        _peer_id: PeerId,
232        request: GetReceipts70,
233        response: oneshot::Sender<RequestResult<Receipts70<C::Receipt>>>,
234    ) {
235        self.metrics.eth_receipts_requests_received_total.increment(1);
236
237        let GetReceipts70 { first_block_receipt_index, block_hashes } = request;
238
239        let mut receipts = Vec::new();
240        let mut total_bytes = 0usize;
241        let mut last_block_incomplete = false;
242
243        for (idx, hash) in block_hashes.into_iter().enumerate() {
244            if idx >= MAX_RECEIPTS_SERVE {
245                break
246            }
247
248            let Some(mut block_receipts) =
249                self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
250            else {
251                break
252            };
253
254            if idx == 0 && first_block_receipt_index > 0 {
255                let skip = first_block_receipt_index as usize;
256                if skip >= block_receipts.len() {
257                    block_receipts.clear();
258                } else {
259                    block_receipts.drain(0..skip);
260                }
261            }
262
263            let block_size = block_receipts.length();
264
265            if total_bytes + block_size <= SOFT_RESPONSE_LIMIT {
266                total_bytes += block_size;
267                receipts.push(block_receipts);
268                continue;
269            }
270
271            let mut partial_block = Vec::new();
272            for receipt in block_receipts {
273                let receipt_size = receipt.length();
274                if total_bytes + receipt_size > SOFT_RESPONSE_LIMIT {
275                    break;
276                }
277                total_bytes += receipt_size;
278                partial_block.push(receipt);
279            }
280
281            receipts.push(partial_block);
282            last_block_incomplete = true;
283            break;
284        }
285
286        let _ = response.send(Ok(Receipts70 { last_block_incomplete, receipts }));
287    }
288
289    #[inline]
290    fn get_receipts_response<T, F>(&self, request: GetReceipts, transform_fn: F) -> Vec<Vec<T>>
291    where
292        F: Fn(Vec<C::Receipt>) -> Vec<T>,
293        T: Encodable,
294    {
295        let mut receipts = Vec::new();
296        let mut total_bytes = 0;
297
298        for hash in request {
299            if let Some(receipts_by_block) =
300                self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
301            {
302                let transformed_receipts = transform_fn(receipts_by_block);
303                total_bytes += transformed_receipts.length();
304                receipts.push(transformed_receipts);
305
306                if receipts.len() >= MAX_RECEIPTS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
307                    break
308                }
309            } else {
310                break
311            }
312        }
313
314        receipts
315    }
316
317    fn on_cells_request(
318        &self,
319        _peer_id: PeerId,
320        _request: GetCells,
321        response: oneshot::Sender<RequestResult<Cells>>,
322    ) {
323        let _ = response.send(Ok(Cells::default()));
324    }
325}
326
327impl<C, N> EthRequestHandler<C, N>
328where
329    N: NetworkPrimitives,
330    C: BalProvider,
331{
332    /// Handles [`GetBlockAccessLists`] queries.
333    ///
334    /// EIP-8159 defines the final `BlockAccessLists` response semantics:
335    /// <https://eips.ethereum.org/EIPS/eip-8159>
336    fn on_block_access_lists_request(
337        &self,
338        _peer_id: PeerId,
339        mut request: GetBlockAccessLists,
340        response: oneshot::Sender<RequestResult<BlockAccessLists>>,
341    ) {
342        request.0.truncate(MAX_BLOCK_ACCESS_LISTS_SERVE);
343
344        let limit = GetBlockAccessListLimit::ResponseSizeSoftLimit(SOFT_RESPONSE_LIMIT);
345        let access_lists =
346            self.client.bal_store().get_by_hashes_with_limit(&request.0, limit).unwrap_or_default();
347        let _ = response.send(Ok(BlockAccessLists(access_lists)));
348    }
349}
350
351/// An endless future.
352///
353/// This should be spawned or used as part of `tokio::select!`.
354impl<C, N> Future for EthRequestHandler<C, N>
355where
356    N: NetworkPrimitives,
357    C: BalProvider
358        + BlockReader<Block = N::Block, Receipt = N::Receipt>
359        + HeaderProvider<Header = N::BlockHeader>
360        + Unpin,
361{
362    type Output = ();
363
364    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
365        let this = self.get_mut();
366
367        let mut acc = Duration::ZERO;
368        let maybe_more_incoming_requests = metered_poll_nested_stream_with_budget!(
369            acc,
370            "net::eth",
371            "Incoming eth requests stream",
372            DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS,
373            this.incoming_requests.poll_next_unpin(cx),
374            |incoming| {
375                match incoming {
376                    IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => {
377                        this.on_headers_request(peer_id, request, response)
378                    }
379                    IncomingEthRequest::GetBlockBodies { peer_id, request, response } => {
380                        this.on_bodies_request(peer_id, request, response)
381                    }
382                    IncomingEthRequest::GetNodeData { .. } => {
383                        this.metrics.eth_node_data_requests_received_total.increment(1);
384                    }
385                    IncomingEthRequest::GetReceipts { peer_id, request, response } => {
386                        this.on_receipts_request(peer_id, request, response)
387                    }
388                    IncomingEthRequest::GetReceipts69 { peer_id, request, response } => {
389                        this.on_receipts69_request(peer_id, request, response)
390                    }
391                    IncomingEthRequest::GetReceipts70 { peer_id, request, response } => {
392                        this.on_receipts70_request(peer_id, request, response)
393                    }
394                    IncomingEthRequest::GetBlockAccessLists { peer_id, request, response } => {
395                        this.on_block_access_lists_request(peer_id, request, response)
396                    }
397                    IncomingEthRequest::GetCells { peer_id, request, response } => {
398                        this.on_cells_request(peer_id, request, response)
399                    }
400                }
401            },
402        );
403
404        this.metrics.acc_duration_poll_eth_req_handler.set(acc.as_secs_f64());
405
406        // stream is fully drained and import futures pending
407        if maybe_more_incoming_requests {
408            // make sure we're woken up again
409            cx.waker().wake_by_ref();
410        }
411
412        Poll::Pending
413    }
414}
415
416/// All `eth` request related to blocks delegated by the network.
417#[derive(Debug)]
418pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
419    /// Request Block headers from the peer.
420    ///
421    /// The response should be sent through the channel.
422    GetBlockHeaders {
423        /// The ID of the peer to request block headers from.
424        peer_id: PeerId,
425        /// The specific block headers requested.
426        request: GetBlockHeaders,
427        /// The channel sender for the response containing block headers.
428        response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
429    },
430    /// Request Block bodies from the peer.
431    ///
432    /// The response should be sent through the channel.
433    GetBlockBodies {
434        /// The ID of the peer to request block bodies from.
435        peer_id: PeerId,
436        /// The specific block bodies requested.
437        request: GetBlockBodies,
438        /// The channel sender for the response containing block bodies.
439        response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
440    },
441    /// Request Node Data from the peer.
442    ///
443    /// The response should be sent through the channel.
444    GetNodeData {
445        /// The ID of the peer to request node data from.
446        peer_id: PeerId,
447        /// The specific node data requested.
448        request: GetNodeData,
449        /// The channel sender for the response containing node data.
450        response: oneshot::Sender<RequestResult<NodeData>>,
451    },
452    /// Request Receipts from the peer.
453    ///
454    /// The response should be sent through the channel.
455    GetReceipts {
456        /// The ID of the peer to request receipts from.
457        peer_id: PeerId,
458        /// The specific receipts requested.
459        request: GetReceipts,
460        /// The channel sender for the response containing receipts.
461        response: oneshot::Sender<RequestResult<Receipts<N::Receipt>>>,
462    },
463    /// Request Receipts from the peer without bloom filter.
464    ///
465    /// The response should be sent through the channel.
466    GetReceipts69 {
467        /// The ID of the peer to request receipts from.
468        peer_id: PeerId,
469        /// The specific receipts requested.
470        request: GetReceipts,
471        /// The channel sender for the response containing Receipts69.
472        response: oneshot::Sender<RequestResult<Receipts69<N::Receipt>>>,
473    },
474    /// Request Receipts from the peer using eth/70.
475    ///
476    /// The response should be sent through the channel.
477    GetReceipts70 {
478        /// The ID of the peer to request receipts from.
479        peer_id: PeerId,
480        /// The specific receipts requested including the `firstBlockReceiptIndex`.
481        request: GetReceipts70,
482        /// The channel sender for the response containing Receipts70.
483        response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
484    },
485    /// Request Block Access Lists from the peer.
486    ///
487    /// The response should be sent through the channel.
488    GetBlockAccessLists {
489        /// The ID of the peer to request block access lists from.
490        peer_id: PeerId,
491        /// The requested block hashes.
492        request: GetBlockAccessLists,
493        /// The channel sender for the response containing block access lists.
494        response: oneshot::Sender<RequestResult<BlockAccessLists>>,
495    },
496    /// Request Cells from the peer.
497    ///
498    /// The response should be sent through the channel.
499    GetCells {
500        /// The ID of the peer to request cells from.
501        peer_id: PeerId,
502        /// The requested block hashes.
503        request: GetCells,
504        /// The channel sender for the response containing cells.
505        response: oneshot::Sender<RequestResult<Cells>>,
506    },
507}