reth_network/
eth_requests.rs

1//! Blocks/Headers management for the p2p network.
2
3use crate::{
4    budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
5    metrics::EthRequestHandlerMetrics,
6};
7use alloy_consensus::{BlockHeader, ReceiptWithBloom};
8use alloy_eips::BlockHashOrNumber;
9use alloy_rlp::Encodable;
10use futures::StreamExt;
11use reth_eth_wire::{
12    BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetNodeData,
13    GetReceipts, GetReceipts70, HeadersDirection, NetworkPrimitives, NodeData, Receipts,
14    Receipts69, Receipts70,
15};
16use reth_network_api::test_utils::PeersHandle;
17use reth_network_p2p::error::RequestResult;
18use reth_network_peers::PeerId;
19use reth_primitives_traits::Block;
20use reth_storage_api::{BlockReader, HeaderProvider};
21use std::{
22    future::Future,
23    pin::Pin,
24    task::{Context, Poll},
25    time::Duration,
26};
27use tokio::sync::{mpsc::Receiver, oneshot};
28use tokio_stream::wrappers::ReceiverStream;
29
30// Limits: <https://github.com/ethereum/go-ethereum/blob/b0d44338bbcefee044f1f635a84487cbbd8f0538/eth/protocols/eth/handler.go#L34-L56>
31
32/// Maximum number of receipts to serve.
33///
34/// Used to limit lookups.
35pub const MAX_RECEIPTS_SERVE: usize = 1024;
36
37/// Maximum number of block headers to serve.
38///
39/// Used to limit lookups.
40pub const MAX_HEADERS_SERVE: usize = 1024;
41
42/// Maximum number of block headers to serve.
43///
44/// Used to limit lookups. With 24KB block sizes nowadays, the practical limit will always be
45/// `SOFT_RESPONSE_LIMIT`.
46pub const MAX_BODIES_SERVE: usize = 1024;
47
48/// Maximum size of replies to data retrievals: 2MB
49pub const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
50
51/// Manages eth related requests on top of the p2p network.
52///
53/// This can be spawned to another task and is supposed to be run as background service.
54#[derive(Debug)]
55#[must_use = "Manager does nothing unless polled."]
56pub struct EthRequestHandler<C, N: NetworkPrimitives = EthNetworkPrimitives> {
57    /// The client type that can interact with the chain.
58    client: C,
59    /// Used for reporting peers.
60    // TODO use to report spammers
61    #[expect(dead_code)]
62    peers: PeersHandle,
63    /// Incoming request from the [`NetworkManager`](crate::NetworkManager).
64    incoming_requests: ReceiverStream<IncomingEthRequest<N>>,
65    /// Metrics for the eth request handler.
66    metrics: EthRequestHandlerMetrics,
67}
68
69// === impl EthRequestHandler ===
70impl<C, N: NetworkPrimitives> EthRequestHandler<C, N> {
71    /// Create a new instance
72    pub fn new(client: C, peers: PeersHandle, incoming: Receiver<IncomingEthRequest<N>>) -> Self {
73        Self {
74            client,
75            peers,
76            incoming_requests: ReceiverStream::new(incoming),
77            metrics: Default::default(),
78        }
79    }
80}
81
82impl<C, N> EthRequestHandler<C, N>
83where
84    N: NetworkPrimitives,
85    C: BlockReader,
86{
87    /// Returns the list of requested headers
88    fn get_headers_response(&self, request: GetBlockHeaders) -> Vec<C::Header> {
89        let GetBlockHeaders { start_block, limit, skip, direction } = request;
90
91        let mut headers = Vec::new();
92
93        let mut block: BlockHashOrNumber = match start_block {
94            BlockHashOrNumber::Hash(start) => start.into(),
95            BlockHashOrNumber::Number(num) => {
96                let Some(hash) = self.client.block_hash(num).unwrap_or_default() else {
97                    return headers
98                };
99                hash.into()
100            }
101        };
102
103        let skip = skip as u64;
104        let mut total_bytes = 0;
105
106        for _ in 0..limit {
107            if let Some(header) = self.client.header_by_hash_or_number(block).unwrap_or_default() {
108                let number = header.number();
109                let parent_hash = header.parent_hash();
110
111                total_bytes += header.length();
112                headers.push(header);
113
114                if headers.len() >= MAX_HEADERS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
115                    break
116                }
117
118                match direction {
119                    HeadersDirection::Rising => {
120                        if let Some(next) = number.checked_add(1).and_then(|n| n.checked_add(skip))
121                        {
122                            block = next.into()
123                        } else {
124                            break
125                        }
126                    }
127                    HeadersDirection::Falling => {
128                        if skip > 0 {
129                            // prevent under flows for block.number == 0 and `block.number - skip <
130                            // 0`
131                            if let Some(next) =
132                                number.checked_sub(1).and_then(|num| num.checked_sub(skip))
133                            {
134                                block = next.into()
135                            } else {
136                                break
137                            }
138                        } else {
139                            block = parent_hash.into()
140                        }
141                    }
142                }
143            } else {
144                break
145            }
146        }
147
148        headers
149    }
150
151    fn on_headers_request(
152        &self,
153        _peer_id: PeerId,
154        request: GetBlockHeaders,
155        response: oneshot::Sender<RequestResult<BlockHeaders<C::Header>>>,
156    ) {
157        self.metrics.eth_headers_requests_received_total.increment(1);
158        let headers = self.get_headers_response(request);
159        let _ = response.send(Ok(BlockHeaders(headers)));
160    }
161
162    fn on_bodies_request(
163        &self,
164        _peer_id: PeerId,
165        request: GetBlockBodies,
166        response: oneshot::Sender<RequestResult<BlockBodies<<C::Block as Block>::Body>>>,
167    ) {
168        self.metrics.eth_bodies_requests_received_total.increment(1);
169        let mut bodies = Vec::new();
170
171        let mut total_bytes = 0;
172
173        for hash in request.0 {
174            if let Some(block) = self.client.block_by_hash(hash).unwrap_or_default() {
175                let body = block.into_body();
176                total_bytes += body.length();
177                bodies.push(body);
178
179                if bodies.len() >= MAX_BODIES_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
180                    break
181                }
182            } else {
183                break
184            }
185        }
186
187        let _ = response.send(Ok(BlockBodies(bodies)));
188    }
189
190    fn on_receipts_request(
191        &self,
192        _peer_id: PeerId,
193        request: GetReceipts,
194        response: oneshot::Sender<RequestResult<Receipts<C::Receipt>>>,
195    ) {
196        self.metrics.eth_receipts_requests_received_total.increment(1);
197
198        let receipts = self.get_receipts_response(request, |receipts_by_block| {
199            receipts_by_block.into_iter().map(ReceiptWithBloom::from).collect::<Vec<_>>()
200        });
201
202        let _ = response.send(Ok(Receipts(receipts)));
203    }
204
205    fn on_receipts69_request(
206        &self,
207        _peer_id: PeerId,
208        request: GetReceipts,
209        response: oneshot::Sender<RequestResult<Receipts69<C::Receipt>>>,
210    ) {
211        self.metrics.eth_receipts_requests_received_total.increment(1);
212
213        let receipts = self.get_receipts_response(request, |receipts_by_block| {
214            // skip bloom filter for eth69
215            receipts_by_block
216        });
217
218        let _ = response.send(Ok(Receipts69(receipts)));
219    }
220
221    /// Handles partial responses for [`GetReceipts70`] queries.
222    ///
223    /// This will adhere to the soft limit but allow filling the last vec partially.
224    fn on_receipts70_request(
225        &self,
226        _peer_id: PeerId,
227        request: GetReceipts70,
228        response: oneshot::Sender<RequestResult<Receipts70<C::Receipt>>>,
229    ) {
230        self.metrics.eth_receipts_requests_received_total.increment(1);
231
232        let GetReceipts70 { first_block_receipt_index, block_hashes } = request;
233
234        let mut receipts = Vec::new();
235        let mut total_bytes = 0usize;
236        let mut last_block_incomplete = false;
237
238        for (idx, hash) in block_hashes.into_iter().enumerate() {
239            if idx >= MAX_RECEIPTS_SERVE {
240                break
241            }
242
243            let Some(mut block_receipts) =
244                self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
245            else {
246                break
247            };
248
249            if idx == 0 && first_block_receipt_index > 0 {
250                let skip = first_block_receipt_index as usize;
251                if skip >= block_receipts.len() {
252                    block_receipts.clear();
253                } else {
254                    block_receipts.drain(0..skip);
255                }
256            }
257
258            let block_size = block_receipts.length();
259
260            if total_bytes + block_size <= SOFT_RESPONSE_LIMIT {
261                total_bytes += block_size;
262                receipts.push(block_receipts);
263                continue;
264            }
265
266            let mut partial_block = Vec::new();
267            for receipt in block_receipts {
268                let receipt_size = receipt.length();
269                if total_bytes + receipt_size > SOFT_RESPONSE_LIMIT {
270                    break;
271                }
272                total_bytes += receipt_size;
273                partial_block.push(receipt);
274            }
275
276            receipts.push(partial_block);
277            last_block_incomplete = true;
278            break;
279        }
280
281        let _ = response.send(Ok(Receipts70 { last_block_incomplete, receipts }));
282    }
283
284    #[inline]
285    fn get_receipts_response<T, F>(&self, request: GetReceipts, transform_fn: F) -> Vec<Vec<T>>
286    where
287        F: Fn(Vec<C::Receipt>) -> Vec<T>,
288        T: Encodable,
289    {
290        let mut receipts = Vec::new();
291        let mut total_bytes = 0;
292
293        for hash in request.0 {
294            if let Some(receipts_by_block) =
295                self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
296            {
297                let transformed_receipts = transform_fn(receipts_by_block);
298                total_bytes += transformed_receipts.length();
299                receipts.push(transformed_receipts);
300
301                if receipts.len() >= MAX_RECEIPTS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
302                    break
303                }
304            } else {
305                break
306            }
307        }
308
309        receipts
310    }
311}
312
313/// An endless future.
314///
315/// This should be spawned or used as part of `tokio::select!`.
316impl<C, N> Future for EthRequestHandler<C, N>
317where
318    N: NetworkPrimitives,
319    C: BlockReader<Block = N::Block, Receipt = N::Receipt>
320        + HeaderProvider<Header = N::BlockHeader>
321        + Unpin,
322{
323    type Output = ();
324
325    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
326        let this = self.get_mut();
327
328        let mut acc = Duration::ZERO;
329        let maybe_more_incoming_requests = metered_poll_nested_stream_with_budget!(
330            acc,
331            "net::eth",
332            "Incoming eth requests stream",
333            DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS,
334            this.incoming_requests.poll_next_unpin(cx),
335            |incoming| {
336                match incoming {
337                    IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => {
338                        this.on_headers_request(peer_id, request, response)
339                    }
340                    IncomingEthRequest::GetBlockBodies { peer_id, request, response } => {
341                        this.on_bodies_request(peer_id, request, response)
342                    }
343                    IncomingEthRequest::GetNodeData { .. } => {
344                        this.metrics.eth_node_data_requests_received_total.increment(1);
345                    }
346                    IncomingEthRequest::GetReceipts { peer_id, request, response } => {
347                        this.on_receipts_request(peer_id, request, response)
348                    }
349                    IncomingEthRequest::GetReceipts69 { peer_id, request, response } => {
350                        this.on_receipts69_request(peer_id, request, response)
351                    }
352                    IncomingEthRequest::GetReceipts70 { peer_id, request, response } => {
353                        this.on_receipts70_request(peer_id, request, response)
354                    }
355                }
356            },
357        );
358
359        this.metrics.acc_duration_poll_eth_req_handler.set(acc.as_secs_f64());
360
361        // stream is fully drained and import futures pending
362        if maybe_more_incoming_requests {
363            // make sure we're woken up again
364            cx.waker().wake_by_ref();
365        }
366
367        Poll::Pending
368    }
369}
370
371/// All `eth` request related to blocks delegated by the network.
372#[derive(Debug)]
373pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
374    /// Request Block headers from the peer.
375    ///
376    /// The response should be sent through the channel.
377    GetBlockHeaders {
378        /// The ID of the peer to request block headers from.
379        peer_id: PeerId,
380        /// The specific block headers requested.
381        request: GetBlockHeaders,
382        /// The channel sender for the response containing block headers.
383        response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
384    },
385    /// Request Block bodies from the peer.
386    ///
387    /// The response should be sent through the channel.
388    GetBlockBodies {
389        /// The ID of the peer to request block bodies from.
390        peer_id: PeerId,
391        /// The specific block bodies requested.
392        request: GetBlockBodies,
393        /// The channel sender for the response containing block bodies.
394        response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
395    },
396    /// Request Node Data from the peer.
397    ///
398    /// The response should be sent through the channel.
399    GetNodeData {
400        /// The ID of the peer to request node data from.
401        peer_id: PeerId,
402        /// The specific node data requested.
403        request: GetNodeData,
404        /// The channel sender for the response containing node data.
405        response: oneshot::Sender<RequestResult<NodeData>>,
406    },
407    /// Request Receipts from the peer.
408    ///
409    /// The response should be sent through the channel.
410    GetReceipts {
411        /// The ID of the peer to request receipts from.
412        peer_id: PeerId,
413        /// The specific receipts requested.
414        request: GetReceipts,
415        /// The channel sender for the response containing receipts.
416        response: oneshot::Sender<RequestResult<Receipts<N::Receipt>>>,
417    },
418    /// Request Receipts from the peer without bloom filter.
419    ///
420    /// The response should be sent through the channel.
421    GetReceipts69 {
422        /// The ID of the peer to request receipts from.
423        peer_id: PeerId,
424        /// The specific receipts requested.
425        request: GetReceipts,
426        /// The channel sender for the response containing Receipts69.
427        response: oneshot::Sender<RequestResult<Receipts69<N::Receipt>>>,
428    },
429    /// Request Receipts from the peer using eth/70.
430    ///
431    /// The response should be sent through the channel.
432    GetReceipts70 {
433        /// The ID of the peer to request receipts from.
434        peer_id: PeerId,
435        /// The specific receipts requested including the `firstBlockReceiptIndex`.
436        request: GetReceipts70,
437        /// The channel sender for the response containing Receipts70.
438        response: oneshot::Sender<RequestResult<Receipts70<N::Receipt>>>,
439    },
440}