1use crate::types::{BlockAccessLists, Receipts69, Receipts70};
7use alloy_consensus::{BlockHeader, ReceiptWithBloom};
8use alloy_primitives::{Bytes, B256};
9use futures::FutureExt;
10use reth_eth_wire::{
11 message::RequestPair, BlockBodies, BlockHeaders, BlockRangeUpdate, Cells, EthMessage,
12 EthNetworkPrimitives, GetBlockAccessLists, GetBlockBodies, GetBlockHeaders, GetReceipts,
13 NetworkPrimitives, NewBlock, NewBlockHashes, NewBlockPayload, NewPooledTransactionHashes,
14 NodeData, PooledTransactions, Receipts, SharedTransactions, Transactions,
15};
16use reth_eth_wire_types::RawCapabilityMessage;
17use reth_network_api::PeerRequest;
18use reth_network_p2p::error::{RequestError, RequestResult};
19use reth_primitives_traits::Block;
20use std::{
21 sync::Arc,
22 task::{ready, Context, Poll},
23};
24use tokio::sync::oneshot;
25
26#[derive(Debug, Clone)]
28pub struct NewBlockMessage<P = NewBlock<reth_ethereum_primitives::Block>> {
29 pub hash: B256,
31 pub block: Arc<P>,
33}
34
35impl<P: NewBlockPayload> NewBlockMessage<P> {
38 pub fn number(&self) -> u64 {
40 self.block.block().header().number()
41 }
42}
43
44#[derive(Debug)]
47pub enum PeerMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
48 NewBlockHashes(NewBlockHashes),
50 NewBlock(NewBlockMessage<N::NewBlockPayload>),
52 ReceivedTransaction(Transactions<N::BroadcastedTransaction>),
54 SendTransactions(SharedTransactions<N::BroadcastedTransaction>),
56 PooledTransactions(NewPooledTransactionHashes),
58 EthRequest(PeerRequest<N>),
60 BlockRangeUpdated(BlockRangeUpdate),
62 Other(RawCapabilityMessage),
66}
67
68impl<N: NetworkPrimitives> PeerMessage<N> {
69 pub const fn message_kind(&self) -> &'static str {
71 match self {
72 Self::NewBlockHashes(_) => "NewBlockHashes",
73 Self::NewBlock(_) => "NewBlock",
74 Self::ReceivedTransaction(_) => "ReceivedTransaction",
75 Self::SendTransactions(_) => "SendTransactions",
76 Self::PooledTransactions(_) => "PooledTransactions",
77 Self::EthRequest(_) => "EthRequest",
78 Self::BlockRangeUpdated(_) => "BlockRangeUpdated",
79 Self::Other(_) => "Other",
80 }
81 }
82
83 pub const fn is_broadcast(&self) -> bool {
86 matches!(
87 self,
88 Self::NewBlockHashes(_) |
89 Self::NewBlock(_) |
90 Self::SendTransactions(_) |
91 Self::PooledTransactions(_)
92 )
93 }
94
95 pub fn message_item_count(&self) -> usize {
97 match self {
98 Self::NewBlockHashes(msg) => msg.len(),
99 Self::ReceivedTransaction(msg) => msg.len(),
100 Self::SendTransactions(msg) => msg.len(),
101 Self::PooledTransactions(msg) => msg.len(),
102 Self::NewBlock(_) |
103 Self::EthRequest(_) |
104 Self::BlockRangeUpdated(_) |
105 Self::Other(_) => 1,
106 }
107 }
108}
109
110#[derive(Debug, Clone, PartialEq, Eq)]
112pub enum BlockRequest {
113 GetBlockHeaders(GetBlockHeaders),
117
118 GetBlockBodies(GetBlockBodies),
122 GetBlockAccessLists(GetBlockAccessLists),
126
127 GetReceipts(GetReceipts),
131}
132
133#[derive(Debug)]
135pub enum PeerResponse<N: NetworkPrimitives = EthNetworkPrimitives> {
136 BlockHeaders {
138 response: oneshot::Receiver<RequestResult<BlockHeaders<N::BlockHeader>>>,
140 },
141 BlockBodies {
143 response: oneshot::Receiver<RequestResult<BlockBodies<N::BlockBody>>>,
145 },
146 PooledTransactions {
148 response: oneshot::Receiver<RequestResult<PooledTransactions<N::PooledTransaction>>>,
150 },
151 NodeData {
153 response: oneshot::Receiver<RequestResult<NodeData>>,
155 },
156 Receipts {
158 response: oneshot::Receiver<RequestResult<Receipts<N::Receipt>>>,
160 },
161 Receipts69 {
167 response: oneshot::Receiver<RequestResult<Receipts69<N::Receipt>>>,
169 },
170 Receipts70 {
172 response: oneshot::Receiver<RequestResult<Receipts70<N::Receipt>>>,
174 },
175 BlockAccessLists {
177 response: oneshot::Receiver<RequestResult<BlockAccessLists>>,
179 },
180 Cells {
183 response: oneshot::Receiver<RequestResult<Cells>>,
185 },
186}
187
188impl<N: NetworkPrimitives> PeerResponse<N> {
191 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerResponseResult<N>> {
193 macro_rules! poll_request {
194 ($response:ident, $item:ident, $cx:ident) => {
195 match ready!($response.poll_unpin($cx)) {
196 Ok(res) => PeerResponseResult::$item(res.map(|item| item.0)),
197 Err(err) => PeerResponseResult::$item(Err(err.into())),
198 }
199 };
200 }
201
202 let res = match self {
203 Self::BlockHeaders { response } => {
204 poll_request!(response, BlockHeaders, cx)
205 }
206 Self::BlockBodies { response } => {
207 poll_request!(response, BlockBodies, cx)
208 }
209 Self::PooledTransactions { response } => {
210 poll_request!(response, PooledTransactions, cx)
211 }
212 Self::NodeData { response } => {
213 poll_request!(response, NodeData, cx)
214 }
215 Self::Receipts { response } => {
216 poll_request!(response, Receipts, cx)
217 }
218 Self::Receipts69 { response } => {
219 poll_request!(response, Receipts69, cx)
220 }
221 Self::Receipts70 { response } => match ready!(response.poll_unpin(cx)) {
222 Ok(res) => PeerResponseResult::Receipts70(res),
223 Err(err) => PeerResponseResult::Receipts70(Err(err.into())),
224 },
225 Self::BlockAccessLists { response } => match ready!(response.poll_unpin(cx)) {
226 Ok(res) => PeerResponseResult::BlockAccessLists(res),
227 Err(err) => PeerResponseResult::BlockAccessLists(Err(err.into())),
228 },
229 Self::Cells { response } => match ready!(response.poll_unpin(cx)) {
230 Ok(res) => PeerResponseResult::Cells(res),
231 Err(err) => PeerResponseResult::Cells(Err(err.into())),
232 },
233 };
234 Poll::Ready(res)
235 }
236}
237
238#[derive(Debug)]
240pub enum PeerResponseResult<N: NetworkPrimitives = EthNetworkPrimitives> {
241 BlockHeaders(RequestResult<Vec<N::BlockHeader>>),
243 BlockBodies(RequestResult<Vec<N::BlockBody>>),
245 PooledTransactions(RequestResult<Vec<N::PooledTransaction>>),
247 NodeData(RequestResult<Vec<Bytes>>),
249 Receipts(RequestResult<Vec<Vec<ReceiptWithBloom<N::Receipt>>>>),
251 Receipts69(RequestResult<Vec<Vec<N::Receipt>>>),
253 Receipts70(RequestResult<Receipts70<N::Receipt>>),
255 BlockAccessLists(RequestResult<BlockAccessLists>),
257 Cells(RequestResult<Cells>),
259}
260
261impl<N: NetworkPrimitives> PeerResponseResult<N> {
264 pub fn try_into_message(self, id: u64) -> RequestResult<EthMessage<N>> {
266 macro_rules! to_message {
267 ($response:ident, $item:ident, $request_id:ident) => {
268 match $response {
269 Ok(res) => {
270 let request = RequestPair { request_id: $request_id, message: $item(res) };
271 Ok(EthMessage::$item(request))
272 }
273 Err(err) => Err(err),
274 }
275 };
276 }
277 match self {
278 Self::BlockHeaders(resp) => {
279 to_message!(resp, BlockHeaders, id)
280 }
281 Self::BlockBodies(resp) => {
282 to_message!(resp, BlockBodies, id)
283 }
284 Self::PooledTransactions(resp) => {
285 to_message!(resp, PooledTransactions, id)
286 }
287 Self::NodeData(resp) => {
288 to_message!(resp, NodeData, id)
289 }
290 Self::Receipts(resp) => {
291 to_message!(resp, Receipts, id)
292 }
293 Self::Receipts69(resp) => {
294 to_message!(resp, Receipts69, id)
295 }
296 Self::Receipts70(resp) => match resp {
297 Ok(res) => {
298 let request = RequestPair { request_id: id, message: res };
299 Ok(EthMessage::Receipts70(request))
300 }
301 Err(err) => Err(err),
302 },
303 Self::BlockAccessLists(resp) => match resp {
304 Ok(res) => {
305 let request = RequestPair { request_id: id, message: res };
306 Ok(EthMessage::BlockAccessLists(request))
307 }
308 Err(err) => Err(err),
309 },
310 Self::Cells(resp) => match resp {
311 Ok(res) => {
312 let request = RequestPair { request_id: id, message: res };
313 Ok(EthMessage::Cells(request))
314 }
315 Err(err) => Err(err),
316 },
317 }
318 }
319
320 pub fn err(&self) -> Option<&RequestError> {
322 match self {
323 Self::BlockHeaders(res) => res.as_ref().err(),
324 Self::BlockBodies(res) => res.as_ref().err(),
325 Self::PooledTransactions(res) => res.as_ref().err(),
326 Self::NodeData(res) => res.as_ref().err(),
327 Self::Receipts(res) => res.as_ref().err(),
328 Self::Receipts69(res) => res.as_ref().err(),
329 Self::Receipts70(res) => res.as_ref().err(),
330 Self::BlockAccessLists(res) => res.as_ref().err(),
331 Self::Cells(res) => res.as_ref().err(),
332 }
333 }
334
335 pub fn is_err(&self) -> bool {
337 self.err().is_some()
338 }
339}