reth_network/
eth_requests.rs

1//! Blocks/Headers management for the p2p network.
2
3use crate::{
4    budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
5    metrics::EthRequestHandlerMetrics,
6};
7use alloy_consensus::{BlockHeader, ReceiptWithBloom};
8use alloy_eips::BlockHashOrNumber;
9use alloy_rlp::Encodable;
10use futures::StreamExt;
11use reth_eth_wire::{
12    BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetNodeData,
13    GetReceipts, HeadersDirection, NetworkPrimitives, NodeData, Receipts,
14};
15use reth_network_api::test_utils::PeersHandle;
16use reth_network_p2p::error::RequestResult;
17use reth_network_peers::PeerId;
18use reth_primitives_traits::Block;
19use reth_storage_api::{BlockReader, HeaderProvider};
20use std::{
21    future::Future,
22    pin::Pin,
23    task::{Context, Poll},
24    time::Duration,
25};
26use tokio::sync::{mpsc::Receiver, oneshot};
27use tokio_stream::wrappers::ReceiverStream;
28
29// Limits: <https://github.com/ethereum/go-ethereum/blob/b0d44338bbcefee044f1f635a84487cbbd8f0538/eth/protocols/eth/handler.go#L34-L56>
30
31/// Maximum number of receipts to serve.
32///
33/// Used to limit lookups.
34pub const MAX_RECEIPTS_SERVE: usize = 1024;
35
36/// Maximum number of block headers to serve.
37///
38/// Used to limit lookups.
39pub const MAX_HEADERS_SERVE: usize = 1024;
40
41/// Maximum number of block headers to serve.
42///
43/// Used to limit lookups. With 24KB block sizes nowadays, the practical limit will always be
44/// `SOFT_RESPONSE_LIMIT`.
45pub const MAX_BODIES_SERVE: usize = 1024;
46
47/// Maximum size of replies to data retrievals: 2MB
48pub const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
49
50/// Manages eth related requests on top of the p2p network.
51///
52/// This can be spawned to another task and is supposed to be run as background service.
53#[derive(Debug)]
54#[must_use = "Manager does nothing unless polled."]
55pub struct EthRequestHandler<C, N: NetworkPrimitives = EthNetworkPrimitives> {
56    /// The client type that can interact with the chain.
57    client: C,
58    /// Used for reporting peers.
59    // TODO use to report spammers
60    #[allow(dead_code)]
61    peers: PeersHandle,
62    /// Incoming request from the [`NetworkManager`](crate::NetworkManager).
63    incoming_requests: ReceiverStream<IncomingEthRequest<N>>,
64    /// Metrics for the eth request handler.
65    metrics: EthRequestHandlerMetrics,
66}
67
68// === impl EthRequestHandler ===
69impl<C, N: NetworkPrimitives> EthRequestHandler<C, N> {
70    /// Create a new instance
71    pub fn new(client: C, peers: PeersHandle, incoming: Receiver<IncomingEthRequest<N>>) -> Self {
72        Self {
73            client,
74            peers,
75            incoming_requests: ReceiverStream::new(incoming),
76            metrics: Default::default(),
77        }
78    }
79}
80
81impl<C, N> EthRequestHandler<C, N>
82where
83    N: NetworkPrimitives,
84    C: BlockReader,
85{
86    /// Returns the list of requested headers
87    fn get_headers_response(&self, request: GetBlockHeaders) -> Vec<C::Header> {
88        let GetBlockHeaders { start_block, limit, skip, direction } = request;
89
90        let mut headers = Vec::new();
91
92        let mut block: BlockHashOrNumber = match start_block {
93            BlockHashOrNumber::Hash(start) => start.into(),
94            BlockHashOrNumber::Number(num) => {
95                let Some(hash) = self.client.block_hash(num).unwrap_or_default() else {
96                    return headers
97                };
98                hash.into()
99            }
100        };
101
102        let skip = skip as u64;
103        let mut total_bytes = 0;
104
105        for _ in 0..limit {
106            if let Some(header) = self.client.header_by_hash_or_number(block).unwrap_or_default() {
107                match direction {
108                    HeadersDirection::Rising => {
109                        if let Some(next) = (header.number() + 1).checked_add(skip) {
110                            block = next.into()
111                        } else {
112                            break
113                        }
114                    }
115                    HeadersDirection::Falling => {
116                        if skip > 0 {
117                            // prevent under flows for block.number == 0 and `block.number - skip <
118                            // 0`
119                            if let Some(next) =
120                                header.number().checked_sub(1).and_then(|num| num.checked_sub(skip))
121                            {
122                                block = next.into()
123                            } else {
124                                break
125                            }
126                        } else {
127                            block = header.parent_hash().into()
128                        }
129                    }
130                }
131
132                total_bytes += header.length();
133                headers.push(header);
134
135                if headers.len() >= MAX_HEADERS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
136                    break
137                }
138            } else {
139                break
140            }
141        }
142
143        headers
144    }
145
146    fn on_headers_request(
147        &self,
148        _peer_id: PeerId,
149        request: GetBlockHeaders,
150        response: oneshot::Sender<RequestResult<BlockHeaders<C::Header>>>,
151    ) {
152        self.metrics.eth_headers_requests_received_total.increment(1);
153        let headers = self.get_headers_response(request);
154        let _ = response.send(Ok(BlockHeaders(headers)));
155    }
156
157    fn on_bodies_request(
158        &self,
159        _peer_id: PeerId,
160        request: GetBlockBodies,
161        response: oneshot::Sender<RequestResult<BlockBodies<<C::Block as Block>::Body>>>,
162    ) {
163        self.metrics.eth_bodies_requests_received_total.increment(1);
164        let mut bodies = Vec::new();
165
166        let mut total_bytes = 0;
167
168        for hash in request.0 {
169            if let Some(block) = self.client.block_by_hash(hash).unwrap_or_default() {
170                let body = block.into_body();
171                total_bytes += body.length();
172                bodies.push(body);
173
174                if bodies.len() >= MAX_BODIES_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
175                    break
176                }
177            } else {
178                break
179            }
180        }
181
182        let _ = response.send(Ok(BlockBodies(bodies)));
183    }
184
185    fn on_receipts_request(
186        &self,
187        _peer_id: PeerId,
188        request: GetReceipts,
189        response: oneshot::Sender<RequestResult<Receipts<C::Receipt>>>,
190    ) {
191        self.metrics.eth_receipts_requests_received_total.increment(1);
192
193        let mut receipts = Vec::new();
194
195        let mut total_bytes = 0;
196
197        for hash in request.0 {
198            if let Some(receipts_by_block) =
199                self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
200            {
201                let receipt =
202                    receipts_by_block.into_iter().map(ReceiptWithBloom::from).collect::<Vec<_>>();
203
204                total_bytes += receipt.length();
205                receipts.push(receipt);
206
207                if receipts.len() >= MAX_RECEIPTS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
208                    break
209                }
210            } else {
211                break
212            }
213        }
214
215        let _ = response.send(Ok(Receipts(receipts)));
216    }
217}
218
219/// An endless future.
220///
221/// This should be spawned or used as part of `tokio::select!`.
222impl<C, N> Future for EthRequestHandler<C, N>
223where
224    N: NetworkPrimitives,
225    C: BlockReader<Block = N::Block, Receipt = N::Receipt>
226        + HeaderProvider<Header = N::BlockHeader>
227        + Unpin,
228{
229    type Output = ();
230
231    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
232        let this = self.get_mut();
233
234        let mut acc = Duration::ZERO;
235        let maybe_more_incoming_requests = metered_poll_nested_stream_with_budget!(
236            acc,
237            "net::eth",
238            "Incoming eth requests stream",
239            DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS,
240            this.incoming_requests.poll_next_unpin(cx),
241            |incoming| {
242                match incoming {
243                    IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => {
244                        this.on_headers_request(peer_id, request, response)
245                    }
246                    IncomingEthRequest::GetBlockBodies { peer_id, request, response } => {
247                        this.on_bodies_request(peer_id, request, response)
248                    }
249                    IncomingEthRequest::GetNodeData { .. } => {
250                        this.metrics.eth_node_data_requests_received_total.increment(1);
251                    }
252                    IncomingEthRequest::GetReceipts { peer_id, request, response } => {
253                        this.on_receipts_request(peer_id, request, response)
254                    }
255                }
256            },
257        );
258
259        this.metrics.acc_duration_poll_eth_req_handler.set(acc.as_secs_f64());
260
261        // stream is fully drained and import futures pending
262        if maybe_more_incoming_requests {
263            // make sure we're woken up again
264            cx.waker().wake_by_ref();
265        }
266
267        Poll::Pending
268    }
269}
270
271/// All `eth` request related to blocks delegated by the network.
272#[derive(Debug)]
273pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
274    /// Request Block headers from the peer.
275    ///
276    /// The response should be sent through the channel.
277    GetBlockHeaders {
278        /// The ID of the peer to request block headers from.
279        peer_id: PeerId,
280        /// The specific block headers requested.
281        request: GetBlockHeaders,
282        /// The channel sender for the response containing block headers.
283        response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
284    },
285    /// Request Block bodies from the peer.
286    ///
287    /// The response should be sent through the channel.
288    GetBlockBodies {
289        /// The ID of the peer to request block bodies from.
290        peer_id: PeerId,
291        /// The specific block bodies requested.
292        request: GetBlockBodies,
293        /// The channel sender for the response containing block bodies.
294        response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
295    },
296    /// Request Node Data from the peer.
297    ///
298    /// The response should be sent through the channel.
299    GetNodeData {
300        /// The ID of the peer to request node data from.
301        peer_id: PeerId,
302        /// The specific node data requested.
303        request: GetNodeData,
304        /// The channel sender for the response containing node data.
305        response: oneshot::Sender<RequestResult<NodeData>>,
306    },
307    /// Request Receipts from the peer.
308    ///
309    /// The response should be sent through the channel.
310    GetReceipts {
311        /// The ID of the peer to request receipts from.
312        peer_id: PeerId,
313        /// The specific receipts requested.
314        request: GetReceipts,
315        /// The channel sender for the response containing receipts.
316        response: oneshot::Sender<RequestResult<Receipts<N::Receipt>>>,
317    },
318}