1use crate::types::{BlockAccessLists, Receipts69, Receipts70};
7use alloy_consensus::{BlockHeader, ReceiptWithBloom};
8use alloy_primitives::{Bytes, B256};
9use futures::FutureExt;
10use reth_eth_wire::{
11 message::RequestPair, BlockBodies, BlockHeaders, BlockRangeUpdate, EthMessage,
12 EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetReceipts, NetworkPrimitives,
13 NewBlock, NewBlockHashes, NewBlockPayload, NewPooledTransactionHashes, NodeData,
14 PooledTransactions, Receipts, SharedTransactions, Transactions,
15};
16use reth_eth_wire_types::RawCapabilityMessage;
17use reth_network_api::PeerRequest;
18use reth_network_p2p::error::{RequestError, RequestResult};
19use reth_primitives_traits::Block;
20use std::{
21 sync::Arc,
22 task::{ready, Context, Poll},
23};
24use tokio::sync::oneshot;
25
26#[derive(Debug, Clone)]
28pub struct NewBlockMessage<P = NewBlock<reth_ethereum_primitives::Block>> {
29 pub hash: B256,
31 pub block: Arc<P>,
33}
34
35impl<P: NewBlockPayload> NewBlockMessage<P> {
38 pub fn number(&self) -> u64 {
40 self.block.block().header().number()
41 }
42}
43
44#[derive(Debug)]
47pub enum PeerMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
48 NewBlockHashes(NewBlockHashes),
50 NewBlock(NewBlockMessage<N::NewBlockPayload>),
52 ReceivedTransaction(Transactions<N::BroadcastedTransaction>),
54 SendTransactions(SharedTransactions<N::BroadcastedTransaction>),
56 PooledTransactions(NewPooledTransactionHashes),
58 EthRequest(PeerRequest<N>),
60 BlockRangeUpdated(BlockRangeUpdate),
62 Other(RawCapabilityMessage),
66}
67
68impl<N: NetworkPrimitives> PeerMessage<N> {
69 pub const fn message_kind(&self) -> &'static str {
71 match self {
72 Self::NewBlockHashes(_) => "NewBlockHashes",
73 Self::NewBlock(_) => "NewBlock",
74 Self::ReceivedTransaction(_) => "ReceivedTransaction",
75 Self::SendTransactions(_) => "SendTransactions",
76 Self::PooledTransactions(_) => "PooledTransactions",
77 Self::EthRequest(_) => "EthRequest",
78 Self::BlockRangeUpdated(_) => "BlockRangeUpdated",
79 Self::Other(_) => "Other",
80 }
81 }
82
83 pub fn message_item_count(&self) -> usize {
85 match self {
86 Self::NewBlockHashes(msg) => msg.len(),
87 Self::ReceivedTransaction(msg) => msg.len(),
88 Self::SendTransactions(msg) => msg.len(),
89 Self::PooledTransactions(msg) => msg.len(),
90 Self::NewBlock(_) |
91 Self::EthRequest(_) |
92 Self::BlockRangeUpdated(_) |
93 Self::Other(_) => 1,
94 }
95 }
96}
97
98#[derive(Debug, Clone, PartialEq, Eq)]
100pub enum BlockRequest {
101 GetBlockHeaders(GetBlockHeaders),
105
106 GetBlockBodies(GetBlockBodies),
110
111 GetReceipts(GetReceipts),
115}
116
117#[derive(Debug)]
119pub enum PeerResponse<N: NetworkPrimitives = EthNetworkPrimitives> {
120 BlockHeaders {
122 response: oneshot::Receiver<RequestResult<BlockHeaders<N::BlockHeader>>>,
124 },
125 BlockBodies {
127 response: oneshot::Receiver<RequestResult<BlockBodies<N::BlockBody>>>,
129 },
130 PooledTransactions {
132 response: oneshot::Receiver<RequestResult<PooledTransactions<N::PooledTransaction>>>,
134 },
135 NodeData {
137 response: oneshot::Receiver<RequestResult<NodeData>>,
139 },
140 Receipts {
142 response: oneshot::Receiver<RequestResult<Receipts<N::Receipt>>>,
144 },
145 Receipts69 {
151 response: oneshot::Receiver<RequestResult<Receipts69<N::Receipt>>>,
153 },
154 Receipts70 {
156 response: oneshot::Receiver<RequestResult<Receipts70<N::Receipt>>>,
158 },
159 BlockAccessLists {
161 response: oneshot::Receiver<RequestResult<BlockAccessLists>>,
163 },
164}
165
166impl<N: NetworkPrimitives> PeerResponse<N> {
169 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerResponseResult<N>> {
171 macro_rules! poll_request {
172 ($response:ident, $item:ident, $cx:ident) => {
173 match ready!($response.poll_unpin($cx)) {
174 Ok(res) => PeerResponseResult::$item(res.map(|item| item.0)),
175 Err(err) => PeerResponseResult::$item(Err(err.into())),
176 }
177 };
178 }
179
180 let res = match self {
181 Self::BlockHeaders { response } => {
182 poll_request!(response, BlockHeaders, cx)
183 }
184 Self::BlockBodies { response } => {
185 poll_request!(response, BlockBodies, cx)
186 }
187 Self::PooledTransactions { response } => {
188 poll_request!(response, PooledTransactions, cx)
189 }
190 Self::NodeData { response } => {
191 poll_request!(response, NodeData, cx)
192 }
193 Self::Receipts { response } => {
194 poll_request!(response, Receipts, cx)
195 }
196 Self::Receipts69 { response } => {
197 poll_request!(response, Receipts69, cx)
198 }
199 Self::Receipts70 { response } => match ready!(response.poll_unpin(cx)) {
200 Ok(res) => PeerResponseResult::Receipts70(res),
201 Err(err) => PeerResponseResult::Receipts70(Err(err.into())),
202 },
203 Self::BlockAccessLists { response } => match ready!(response.poll_unpin(cx)) {
204 Ok(res) => PeerResponseResult::BlockAccessLists(res),
205 Err(err) => PeerResponseResult::BlockAccessLists(Err(err.into())),
206 },
207 };
208 Poll::Ready(res)
209 }
210}
211
212#[derive(Debug)]
214pub enum PeerResponseResult<N: NetworkPrimitives = EthNetworkPrimitives> {
215 BlockHeaders(RequestResult<Vec<N::BlockHeader>>),
217 BlockBodies(RequestResult<Vec<N::BlockBody>>),
219 PooledTransactions(RequestResult<Vec<N::PooledTransaction>>),
221 NodeData(RequestResult<Vec<Bytes>>),
223 Receipts(RequestResult<Vec<Vec<ReceiptWithBloom<N::Receipt>>>>),
225 Receipts69(RequestResult<Vec<Vec<N::Receipt>>>),
227 Receipts70(RequestResult<Receipts70<N::Receipt>>),
229 BlockAccessLists(RequestResult<BlockAccessLists>),
231}
232
233impl<N: NetworkPrimitives> PeerResponseResult<N> {
236 pub fn try_into_message(self, id: u64) -> RequestResult<EthMessage<N>> {
238 macro_rules! to_message {
239 ($response:ident, $item:ident, $request_id:ident) => {
240 match $response {
241 Ok(res) => {
242 let request = RequestPair { request_id: $request_id, message: $item(res) };
243 Ok(EthMessage::$item(request))
244 }
245 Err(err) => Err(err),
246 }
247 };
248 }
249 match self {
250 Self::BlockHeaders(resp) => {
251 to_message!(resp, BlockHeaders, id)
252 }
253 Self::BlockBodies(resp) => {
254 to_message!(resp, BlockBodies, id)
255 }
256 Self::PooledTransactions(resp) => {
257 to_message!(resp, PooledTransactions, id)
258 }
259 Self::NodeData(resp) => {
260 to_message!(resp, NodeData, id)
261 }
262 Self::Receipts(resp) => {
263 to_message!(resp, Receipts, id)
264 }
265 Self::Receipts69(resp) => {
266 to_message!(resp, Receipts69, id)
267 }
268 Self::Receipts70(resp) => match resp {
269 Ok(res) => {
270 let request = RequestPair { request_id: id, message: res };
271 Ok(EthMessage::Receipts70(request))
272 }
273 Err(err) => Err(err),
274 },
275 Self::BlockAccessLists(resp) => match resp {
276 Ok(res) => {
277 let request = RequestPair { request_id: id, message: res };
278 Ok(EthMessage::BlockAccessLists(request))
279 }
280 Err(err) => Err(err),
281 },
282 }
283 }
284
285 pub fn err(&self) -> Option<&RequestError> {
287 match self {
288 Self::BlockHeaders(res) => res.as_ref().err(),
289 Self::BlockBodies(res) => res.as_ref().err(),
290 Self::PooledTransactions(res) => res.as_ref().err(),
291 Self::NodeData(res) => res.as_ref().err(),
292 Self::Receipts(res) => res.as_ref().err(),
293 Self::Receipts69(res) => res.as_ref().err(),
294 Self::Receipts70(res) => res.as_ref().err(),
295 Self::BlockAccessLists(res) => res.as_ref().err(),
296 }
297 }
298
299 pub fn is_err(&self) -> bool {
301 self.err().is_some()
302 }
303}