1use crate::{
4 budget::DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS, metered_poll_nested_stream_with_budget,
5 metrics::EthRequestHandlerMetrics,
6};
7use alloy_consensus::{BlockHeader, ReceiptWithBloom};
8use alloy_eips::BlockHashOrNumber;
9use alloy_rlp::Encodable;
10use futures::StreamExt;
11use reth_eth_wire::{
12 BlockBodies, BlockHeaders, EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetNodeData,
13 GetReceipts, HeadersDirection, NetworkPrimitives, NodeData, Receipts, Receipts69,
14};
15use reth_network_api::test_utils::PeersHandle;
16use reth_network_p2p::error::RequestResult;
17use reth_network_peers::PeerId;
18use reth_primitives_traits::Block;
19use reth_storage_api::{BlockReader, HeaderProvider};
20use std::{
21 future::Future,
22 pin::Pin,
23 task::{Context, Poll},
24 time::Duration,
25};
26use tokio::sync::{mpsc::Receiver, oneshot};
27use tokio_stream::wrappers::ReceiverStream;
28
29pub const MAX_RECEIPTS_SERVE: usize = 1024;
35
36pub const MAX_HEADERS_SERVE: usize = 1024;
40
41pub const MAX_BODIES_SERVE: usize = 1024;
46
47pub const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
49
50#[derive(Debug)]
54#[must_use = "Manager does nothing unless polled."]
55pub struct EthRequestHandler<C, N: NetworkPrimitives = EthNetworkPrimitives> {
56 client: C,
58 #[expect(dead_code)]
61 peers: PeersHandle,
62 incoming_requests: ReceiverStream<IncomingEthRequest<N>>,
64 metrics: EthRequestHandlerMetrics,
66}
67
68impl<C, N: NetworkPrimitives> EthRequestHandler<C, N> {
70 pub fn new(client: C, peers: PeersHandle, incoming: Receiver<IncomingEthRequest<N>>) -> Self {
72 Self {
73 client,
74 peers,
75 incoming_requests: ReceiverStream::new(incoming),
76 metrics: Default::default(),
77 }
78 }
79}
80
81impl<C, N> EthRequestHandler<C, N>
82where
83 N: NetworkPrimitives,
84 C: BlockReader,
85{
86 fn get_headers_response(&self, request: GetBlockHeaders) -> Vec<C::Header> {
88 let GetBlockHeaders { start_block, limit, skip, direction } = request;
89
90 let mut headers = Vec::new();
91
92 let mut block: BlockHashOrNumber = match start_block {
93 BlockHashOrNumber::Hash(start) => start.into(),
94 BlockHashOrNumber::Number(num) => {
95 let Some(hash) = self.client.block_hash(num).unwrap_or_default() else {
96 return headers
97 };
98 hash.into()
99 }
100 };
101
102 let skip = skip as u64;
103 let mut total_bytes = 0;
104
105 for _ in 0..limit {
106 if let Some(header) = self.client.header_by_hash_or_number(block).unwrap_or_default() {
107 let number = header.number();
108 let parent_hash = header.parent_hash();
109
110 total_bytes += header.length();
111 headers.push(header);
112
113 if headers.len() >= MAX_HEADERS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
114 break
115 }
116
117 match direction {
118 HeadersDirection::Rising => {
119 if let Some(next) = number.checked_add(1).and_then(|n| n.checked_add(skip))
120 {
121 block = next.into()
122 } else {
123 break
124 }
125 }
126 HeadersDirection::Falling => {
127 if skip > 0 {
128 if let Some(next) =
131 number.checked_sub(1).and_then(|num| num.checked_sub(skip))
132 {
133 block = next.into()
134 } else {
135 break
136 }
137 } else {
138 block = parent_hash.into()
139 }
140 }
141 }
142 } else {
143 break
144 }
145 }
146
147 headers
148 }
149
150 fn on_headers_request(
151 &self,
152 _peer_id: PeerId,
153 request: GetBlockHeaders,
154 response: oneshot::Sender<RequestResult<BlockHeaders<C::Header>>>,
155 ) {
156 self.metrics.eth_headers_requests_received_total.increment(1);
157 let headers = self.get_headers_response(request);
158 let _ = response.send(Ok(BlockHeaders(headers)));
159 }
160
161 fn on_bodies_request(
162 &self,
163 _peer_id: PeerId,
164 request: GetBlockBodies,
165 response: oneshot::Sender<RequestResult<BlockBodies<<C::Block as Block>::Body>>>,
166 ) {
167 self.metrics.eth_bodies_requests_received_total.increment(1);
168 let mut bodies = Vec::new();
169
170 let mut total_bytes = 0;
171
172 for hash in request.0 {
173 if let Some(block) = self.client.block_by_hash(hash).unwrap_or_default() {
174 let body = block.into_body();
175 total_bytes += body.length();
176 bodies.push(body);
177
178 if bodies.len() >= MAX_BODIES_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
179 break
180 }
181 } else {
182 break
183 }
184 }
185
186 let _ = response.send(Ok(BlockBodies(bodies)));
187 }
188
189 fn on_receipts_request(
190 &self,
191 _peer_id: PeerId,
192 request: GetReceipts,
193 response: oneshot::Sender<RequestResult<Receipts<C::Receipt>>>,
194 ) {
195 self.metrics.eth_receipts_requests_received_total.increment(1);
196
197 let receipts = self.get_receipts_response(request, |receipts_by_block| {
198 receipts_by_block.into_iter().map(ReceiptWithBloom::from).collect::<Vec<_>>()
199 });
200
201 let _ = response.send(Ok(Receipts(receipts)));
202 }
203
204 fn on_receipts69_request(
205 &self,
206 _peer_id: PeerId,
207 request: GetReceipts,
208 response: oneshot::Sender<RequestResult<Receipts69<C::Receipt>>>,
209 ) {
210 self.metrics.eth_receipts_requests_received_total.increment(1);
211
212 let receipts = self.get_receipts_response(request, |receipts_by_block| {
213 receipts_by_block
215 });
216
217 let _ = response.send(Ok(Receipts69(receipts)));
218 }
219
220 #[inline]
221 fn get_receipts_response<T, F>(&self, request: GetReceipts, transform_fn: F) -> Vec<Vec<T>>
222 where
223 F: Fn(Vec<C::Receipt>) -> Vec<T>,
224 T: Encodable,
225 {
226 let mut receipts = Vec::new();
227 let mut total_bytes = 0;
228
229 for hash in request.0 {
230 if let Some(receipts_by_block) =
231 self.client.receipts_by_block(BlockHashOrNumber::Hash(hash)).unwrap_or_default()
232 {
233 let transformed_receipts = transform_fn(receipts_by_block);
234 total_bytes += transformed_receipts.length();
235 receipts.push(transformed_receipts);
236
237 if receipts.len() >= MAX_RECEIPTS_SERVE || total_bytes > SOFT_RESPONSE_LIMIT {
238 break
239 }
240 } else {
241 break
242 }
243 }
244
245 receipts
246 }
247}
248
249impl<C, N> Future for EthRequestHandler<C, N>
253where
254 N: NetworkPrimitives,
255 C: BlockReader<Block = N::Block, Receipt = N::Receipt>
256 + HeaderProvider<Header = N::BlockHeader>
257 + Unpin,
258{
259 type Output = ();
260
261 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
262 let this = self.get_mut();
263
264 let mut acc = Duration::ZERO;
265 let maybe_more_incoming_requests = metered_poll_nested_stream_with_budget!(
266 acc,
267 "net::eth",
268 "Incoming eth requests stream",
269 DEFAULT_BUDGET_TRY_DRAIN_DOWNLOADERS,
270 this.incoming_requests.poll_next_unpin(cx),
271 |incoming| {
272 match incoming {
273 IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => {
274 this.on_headers_request(peer_id, request, response)
275 }
276 IncomingEthRequest::GetBlockBodies { peer_id, request, response } => {
277 this.on_bodies_request(peer_id, request, response)
278 }
279 IncomingEthRequest::GetNodeData { .. } => {
280 this.metrics.eth_node_data_requests_received_total.increment(1);
281 }
282 IncomingEthRequest::GetReceipts { peer_id, request, response } => {
283 this.on_receipts_request(peer_id, request, response)
284 }
285 IncomingEthRequest::GetReceipts69 { peer_id, request, response } => {
286 this.on_receipts69_request(peer_id, request, response)
287 }
288 }
289 },
290 );
291
292 this.metrics.acc_duration_poll_eth_req_handler.set(acc.as_secs_f64());
293
294 if maybe_more_incoming_requests {
296 cx.waker().wake_by_ref();
298 }
299
300 Poll::Pending
301 }
302}
303
304#[derive(Debug)]
306pub enum IncomingEthRequest<N: NetworkPrimitives = EthNetworkPrimitives> {
307 GetBlockHeaders {
311 peer_id: PeerId,
313 request: GetBlockHeaders,
315 response: oneshot::Sender<RequestResult<BlockHeaders<N::BlockHeader>>>,
317 },
318 GetBlockBodies {
322 peer_id: PeerId,
324 request: GetBlockBodies,
326 response: oneshot::Sender<RequestResult<BlockBodies<N::BlockBody>>>,
328 },
329 GetNodeData {
333 peer_id: PeerId,
335 request: GetNodeData,
337 response: oneshot::Sender<RequestResult<NodeData>>,
339 },
340 GetReceipts {
344 peer_id: PeerId,
346 request: GetReceipts,
348 response: oneshot::Sender<RequestResult<Receipts<N::Receipt>>>,
350 },
351 GetReceipts69 {
355 peer_id: PeerId,
357 request: GetReceipts,
359 response: oneshot::Sender<RequestResult<Receipts69<N::Receipt>>>,
361 },
362}