reth_network/
eth_requests.rs

1//! Blocks/Headers management for the p2p network.
2
3use crate::{
4    budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
5    metrics::EthRequestHandlerMetrics,
6};
7use alloy_consensus::{BlockHeader, ReceiptWithBloom};
8use alloy_eips::BlockHashOrNumber;
9use alloy_rlp::Encodable;
10use futures::StreamExt;
11use reth_eth_wire::{
12    BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetNodeData,
13    GetReceipts, HeadersDirection, NetworkPrimitives, NodeData, Receipts, Receipts69,
14};
15use reth_network_api::test_utils::PeersHandle;
16use reth_network_p2p::error::RequestResult;
17use reth_network_peers::PeerId;
18use reth_primitives_traits::Block;
19use reth_storage_api::{BlockReader, HeaderProvider};
20use std::{
21    future::Future,
22    pin::Pin,
23    task::{Context, Poll},
24    time::Duration,
25};
26use tokio::sync::{mpsc::Receiver, oneshot};
27use tokio_stream::wrappers::ReceiverStream;
28
29// Limits: <https://github.com/ethereum/go-ethereum/blob/b0d44338bbcefee044f1f635a84487cbbd8f0538/eth/protocols/eth/handler.go#L34-L56>
30
31/// Maximum number of receipts to serve.
32///
33/// Used to limit lookups.
34pub const MAX_RECEIPTS_SERVE: usize = 1024;
35
36/// Maximum number of block headers to serve.
37///
38/// Used to limit lookups.
39pub const MAX_HEADERS_SERVE: usize = 1024;
40
41/// Maximum number of block headers to serve.
42///
43/// Used to limit lookups. With 24KB block sizes nowadays, the practical limit will always be
44/// `SOFT_RESPONSE_LIMIT`.
45pub const MAX_BODIES_SERVE: usize = 1024;
46
47/// Maximum size of replies to data retrievals: 2MB
48pub const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
49
50/// Manages eth related requests on top of the p2p network.
51///
52/// This can be spawned to another task and is supposed to be run as background service.
53#[derive(Debug)]
54#[must_use = "Manager does nothing unless polled."]
55pub struct EthRequestHandler<C, N: NetworkPrimitives = EthNetworkPrimitives> {
56    /// The client type that can interact with the chain.
57    client: C,
58    /// Used for reporting peers.
59    // TODO use to report spammers
60    #[expect(dead_code)]
61    peers: PeersHandle,
62    /// Incoming request from the [`NetworkManager`](crate::NetworkManager).
63    incoming_requests: ReceiverStream<IncomingEthRequest<N>>,
64    /// Metrics for the eth request handler.
65    metrics: EthRequestHandlerMetrics,
66}
67
68// === impl EthRequestHandler ===
69impl<C, N: NetworkPrimitives> EthRequestHandler<C, N> {
70    /// Create a new instance
71    pub fn new(client: C, peers: PeersHandle, incoming: Receiver<IncomingEthRequest<N>>) -> Self {
72        Self {
73            client,
74            peers,
75            incoming_requests: ReceiverStream::new(incoming),
76            metrics: Default::default(),
77        }
78    }
79}
80
81impl<C, N> EthRequestHandler<C, N>
82where
83    N: NetworkPrimitives,
84    C: BlockReader,
85{
86    /// Returns the list of requested headers
87    fn get_headers_response(&self, request: GetBlockHeaders) -> Vec<C::Header> {
88        let GetBlockHeaders { start_block, limit, skip, direction } = request;
89
90        let mut headers = Vec::new();
91
92        let mut block: BlockHashOrNumber = match start_block {
93            BlockHashOrNumber::Hash(start) => start.into(),
94            BlockHashOrNumber::Number(num) => {
95                let Some(hash) = self.client.block_hash(num).unwrap_or_default() else {
96                    return headers
97                };
98                hash.into()
99            }
100        };
101
102        let skip = skip as u64;
103        let mut total_bytes = 0;
104
105        for _ in 0..limit {
106            if let Some(header) = self.client.header_by_hash_or_number(block).unwrap_or_default() {
107                let number = header.number();
108                let parent_hash = header.parent_hash();
109
110                total_bytes += header.length();
111                headers.push(header);
112
113                if headers.len() >= MAX_HEADERS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
114                    break
115                }
116
117                match direction {
118                    HeadersDirection::Rising => {
119                        if let Some(next) = number.checked_add(1).and_then(|n| n.checked_add(skip))
120                        {
121                            block = next.into()
122                        } else {
123                            break
124                        }
125                    }
126                    HeadersDirection::Falling => {
127                        if skip > 0 {
128                            // prevent under flows for block.number == 0 and `block.number - skip <
129                            // 0`
130                            if let Some(next) =
131                                number.checked_sub(1).and_then(|num| num.checked_sub(skip))
132                            {
133                                block = next.into()
134                            } else {
135                                break
136                            }
137                        } else {
138                            block = parent_hash.into()
139                        }
140                    }
141                }
142            } else {
143                break
144            }
145        }
146
147        headers
148    }
149
150    fn on_headers_request(
151        &self,
152        _peer_id: PeerId,
153        request: GetBlockHeaders,
154        response: oneshot::Sender<RequestResult<BlockHeaders<C::Header>>>,
155    ) {
156        self.metrics.eth_headers_requests_received_total.increment(1);
157        let headers = self.get_headers_response(request);
158        let _ = response.send(Ok(BlockHeaders(headers)));
159    }
160
161    fn on_bodies_request(
162        &self,
163        _peer_id: PeerId,
164        request: GetBlockBodies,
165        response: oneshot::Sender<RequestResult<BlockBodies<<C::Block as Block>::Body>>>,
166    ) {
167        self.metrics.eth_bodies_requests_received_total.increment(1);
168        let mut bodies = Vec::new();
169
170        let mut total_bytes = 0;
171
172        for hash in request.0 {
173            if let Some(block) = self.client.block_by_hash(hash).unwrap_or_default() {
174                let body = block.into_body();
175                total_bytes += body.length();
176                bodies.push(body);
177
178                if bodies.len() >= MAX_BODIES_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
179                    break
180                }
181            } else {
182                break
183            }
184        }
185
186        let _ = response.send(Ok(BlockBodies(bodies)));
187    }
188
189    fn on_receipts_request(
190        &self,
191        _peer_id: PeerId,
192        request: GetReceipts,
193        response: oneshot::Sender<RequestResult<Receipts<C::Receipt>>>,
194    ) {
195        self.metrics.eth_receipts_requests_received_total.increment(1);
196
197        let receipts = self.get_receipts_response(request, |receipts_by_block| {
198            receipts_by_block.into_iter().map(ReceiptWithBloom::from).collect::<Vec<_>>()
199        });
200
201        let _ = response.send(Ok(Receipts(receipts)));
202    }
203
204    fn on_receipts69_request(
205        &self,
206        _peer_id: PeerId,
207        request: GetReceipts,
208        response: oneshot::Sender<RequestResult<Receipts69<C::Receipt>>>,
209    ) {
210        self.metrics.eth_receipts_requests_received_total.increment(1);
211
212        let receipts = self.get_receipts_response(request, |receipts_by_block| {
213            // skip bloom filter for eth69
214            receipts_by_block
215        });
216
217        let _ = response.send(Ok(Receipts69(receipts)));
218    }
219
220    #[inline]
221    fn get_receipts_response<T, F>(&self, request: GetReceipts, transform_fn: F) -> Vec<Vec<T>>
222    where
223        F: Fn(Vec<C::Receipt>) -> Vec<T>,
224        T: Encodable,
225    {
226        let mut receipts = Vec::new();
227        let mut total_bytes = 0;
228
229        for hash in request.0 {
230            if let Some(receipts_by_block) =
231                self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
232            {
233                let transformed_receipts = transform_fn(receipts_by_block);
234                total_bytes += transformed_receipts.length();
235                receipts.push(transformed_receipts);
236
237                if receipts.len() >= MAX_RECEIPTS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
238                    break
239                }
240            } else {
241                break
242            }
243        }
244
245        receipts
246    }
247}
248
249/// An endless future.
250///
251/// This should be spawned or used as part of `tokio::select!`.
252impl<C, N> Future for EthRequestHandler<C, N>
253where
254    N: NetworkPrimitives,
255    C: BlockReader<Block = N::Block, Receipt = N::Receipt>
256        + HeaderProvider<Header = N::BlockHeader>
257        + Unpin,
258{
259    type Output = ();
260
261    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
262        let this = self.get_mut();
263
264        let mut acc = Duration::ZERO;
265        let maybe_more_incoming_requests = metered_poll_nested_stream_with_budget!(
266            acc,
267            "net::eth",
268            "Incoming eth requests stream",
269            DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS,
270            this.incoming_requests.poll_next_unpin(cx),
271            |incoming| {
272                match incoming {
273                    IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => {
274                        this.on_headers_request(peer_id, request, response)
275                    }
276                    IncomingEthRequest::GetBlockBodies { peer_id, request, response } => {
277                        this.on_bodies_request(peer_id, request, response)
278                    }
279                    IncomingEthRequest::GetNodeData { .. } => {
280                        this.metrics.eth_node_data_requests_received_total.increment(1);
281                    }
282                    IncomingEthRequest::GetReceipts { peer_id, request, response } => {
283                        this.on_receipts_request(peer_id, request, response)
284                    }
285                    IncomingEthRequest::GetReceipts69 { peer_id, request, response } => {
286                        this.on_receipts69_request(peer_id, request, response)
287                    }
288                }
289            },
290        );
291
292        this.metrics.acc_duration_poll_eth_req_handler.set(acc.as_secs_f64());
293
294        // stream is fully drained and import futures pending
295        if maybe_more_incoming_requests {
296            // make sure we're woken up again
297            cx.waker().wake_by_ref();
298        }
299
300        Poll::Pending
301    }
302}
303
304/// All `eth` request related to blocks delegated by the network.
305#[derive(Debug)]
306pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
307    /// Request Block headers from the peer.
308    ///
309    /// The response should be sent through the channel.
310    GetBlockHeaders {
311        /// The ID of the peer to request block headers from.
312        peer_id: PeerId,
313        /// The specific block headers requested.
314        request: GetBlockHeaders,
315        /// The channel sender for the response containing block headers.
316        response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
317    },
318    /// Request Block bodies from the peer.
319    ///
320    /// The response should be sent through the channel.
321    GetBlockBodies {
322        /// The ID of the peer to request block bodies from.
323        peer_id: PeerId,
324        /// The specific block bodies requested.
325        request: GetBlockBodies,
326        /// The channel sender for the response containing block bodies.
327        response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
328    },
329    /// Request Node Data from the peer.
330    ///
331    /// The response should be sent through the channel.
332    GetNodeData {
333        /// The ID of the peer to request node data from.
334        peer_id: PeerId,
335        /// The specific node data requested.
336        request: GetNodeData,
337        /// The channel sender for the response containing node data.
338        response: oneshot::Sender<RequestResult<NodeData>>,
339    },
340    /// Request Receipts from the peer.
341    ///
342    /// The response should be sent through the channel.
343    GetReceipts {
344        /// The ID of the peer to request receipts from.
345        peer_id: PeerId,
346        /// The specific receipts requested.
347        request: GetReceipts,
348        /// The channel sender for the response containing receipts.
349        response: oneshot::Sender<RequestResult<Receipts<N::Receipt>>>,
350    },
351    /// Request Receipts from the peer without bloom filter.
352    ///
353    /// The response should be sent through the channel.
354    GetReceipts69 {
355        /// The ID of the peer to request receipts from.
356        peer_id: PeerId,
357        /// The specific receipts requested.
358        request: GetReceipts,
359        /// The channel sender for the response containing Receipts69.
360        response: oneshot::Sender<RequestResult<Receipts69<N::Receipt>>>,
361    },
362}