1use crate::types::{BlockAccessLists, Receipts69, Receipts70};
7use alloy_consensus::{BlockHeader, ReceiptWithBloom};
8use alloy_primitives::{Bytes, B256};
9use futures::FutureExt;
10use reth_eth_wire::{
11 message::RequestPair, BlockBodies, BlockHeaders, BlockRangeUpdate, EthMessage,
12 EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, GetReceipts, NetworkPrimitives,
13 NewBlock, NewBlockHashes, NewBlockPayload, NewPooledTransactionHashes, NodeData,
14 PooledTransactions, Receipts, SharedTransactions, Transactions,
15};
16use reth_eth_wire_types::RawCapabilityMessage;
17use reth_network_api::PeerRequest;
18use reth_network_p2p::error::{RequestError, RequestResult};
19use reth_primitives_traits::Block;
20use std::{
21 sync::Arc,
22 task::{ready, Context, Poll},
23};
24use tokio::sync::oneshot;
25
26#[derive(Debug, Clone)]
28pub struct NewBlockMessage<P = NewBlock<reth_ethereum_primitives::Block>> {
29 pub hash: B256,
31 pub block: Arc<P>,
33}
34
35impl<P: NewBlockPayload> NewBlockMessage<P> {
38 pub fn number(&self) -> u64 {
40 self.block.block().header().number()
41 }
42}
43
44#[derive(Debug)]
47pub enum PeerMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
48 NewBlockHashes(NewBlockHashes),
50 NewBlock(NewBlockMessage<N::NewBlockPayload>),
52 ReceivedTransaction(Transactions<N::BroadcastedTransaction>),
54 SendTransactions(SharedTransactions<N::BroadcastedTransaction>),
56 PooledTransactions(NewPooledTransactionHashes),
58 EthRequest(PeerRequest<N>),
60 BlockRangeUpdated(BlockRangeUpdate),
62 Other(RawCapabilityMessage),
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
70pub enum BlockRequest {
71 GetBlockHeaders(GetBlockHeaders),
75
76 GetBlockBodies(GetBlockBodies),
80
81 GetReceipts(GetReceipts),
85}
86
87#[derive(Debug)]
89pub enum PeerResponse<N: NetworkPrimitives = EthNetworkPrimitives> {
90 BlockHeaders {
92 response: oneshot::Receiver<RequestResult<BlockHeaders<N::BlockHeader>>>,
94 },
95 BlockBodies {
97 response: oneshot::Receiver<RequestResult<BlockBodies<N::BlockBody>>>,
99 },
100 PooledTransactions {
102 response: oneshot::Receiver<RequestResult<PooledTransactions<N::PooledTransaction>>>,
104 },
105 NodeData {
107 response: oneshot::Receiver<RequestResult<NodeData>>,
109 },
110 Receipts {
112 response: oneshot::Receiver<RequestResult<Receipts<N::Receipt>>>,
114 },
115 Receipts69 {
121 response: oneshot::Receiver<RequestResult<Receipts69<N::Receipt>>>,
123 },
124 Receipts70 {
126 response: oneshot::Receiver<RequestResult<Receipts70<N::Receipt>>>,
128 },
129 BlockAccessLists {
131 response: oneshot::Receiver<RequestResult<BlockAccessLists>>,
133 },
134}
135
136impl<N: NetworkPrimitives> PeerResponse<N> {
139 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerResponseResult<N>> {
141 macro_rules! poll_request {
142 ($response:ident, $item:ident, $cx:ident) => {
143 match ready!($response.poll_unpin($cx)) {
144 Ok(res) => PeerResponseResult::$item(res.map(|item| item.0)),
145 Err(err) => PeerResponseResult::$item(Err(err.into())),
146 }
147 };
148 }
149
150 let res = match self {
151 Self::BlockHeaders { response } => {
152 poll_request!(response, BlockHeaders, cx)
153 }
154 Self::BlockBodies { response } => {
155 poll_request!(response, BlockBodies, cx)
156 }
157 Self::PooledTransactions { response } => {
158 poll_request!(response, PooledTransactions, cx)
159 }
160 Self::NodeData { response } => {
161 poll_request!(response, NodeData, cx)
162 }
163 Self::Receipts { response } => {
164 poll_request!(response, Receipts, cx)
165 }
166 Self::Receipts69 { response } => {
167 poll_request!(response, Receipts69, cx)
168 }
169 Self::Receipts70 { response } => match ready!(response.poll_unpin(cx)) {
170 Ok(res) => PeerResponseResult::Receipts70(res),
171 Err(err) => PeerResponseResult::Receipts70(Err(err.into())),
172 },
173 Self::BlockAccessLists { response } => match ready!(response.poll_unpin(cx)) {
174 Ok(res) => PeerResponseResult::BlockAccessLists(res),
175 Err(err) => PeerResponseResult::BlockAccessLists(Err(err.into())),
176 },
177 };
178 Poll::Ready(res)
179 }
180}
181
182#[derive(Debug)]
184pub enum PeerResponseResult<N: NetworkPrimitives = EthNetworkPrimitives> {
185 BlockHeaders(RequestResult<Vec<N::BlockHeader>>),
187 BlockBodies(RequestResult<Vec<N::BlockBody>>),
189 PooledTransactions(RequestResult<Vec<N::PooledTransaction>>),
191 NodeData(RequestResult<Vec<Bytes>>),
193 Receipts(RequestResult<Vec<Vec<ReceiptWithBloom<N::Receipt>>>>),
195 Receipts69(RequestResult<Vec<Vec<N::Receipt>>>),
197 Receipts70(RequestResult<Receipts70<N::Receipt>>),
199 BlockAccessLists(RequestResult<BlockAccessLists>),
201}
202
203impl<N: NetworkPrimitives> PeerResponseResult<N> {
206 pub fn try_into_message(self, id: u64) -> RequestResult<EthMessage<N>> {
208 macro_rules! to_message {
209 ($response:ident, $item:ident, $request_id:ident) => {
210 match $response {
211 Ok(res) => {
212 let request = RequestPair { request_id: $request_id, message: $item(res) };
213 Ok(EthMessage::$item(request))
214 }
215 Err(err) => Err(err),
216 }
217 };
218 }
219 match self {
220 Self::BlockHeaders(resp) => {
221 to_message!(resp, BlockHeaders, id)
222 }
223 Self::BlockBodies(resp) => {
224 to_message!(resp, BlockBodies, id)
225 }
226 Self::PooledTransactions(resp) => {
227 to_message!(resp, PooledTransactions, id)
228 }
229 Self::NodeData(resp) => {
230 to_message!(resp, NodeData, id)
231 }
232 Self::Receipts(resp) => {
233 to_message!(resp, Receipts, id)
234 }
235 Self::Receipts69(resp) => {
236 to_message!(resp, Receipts69, id)
237 }
238 Self::Receipts70(resp) => match resp {
239 Ok(res) => {
240 let request = RequestPair { request_id: id, message: res };
241 Ok(EthMessage::Receipts70(request))
242 }
243 Err(err) => Err(err),
244 },
245 Self::BlockAccessLists(resp) => match resp {
246 Ok(res) => {
247 let request = RequestPair { request_id: id, message: res };
248 Ok(EthMessage::BlockAccessLists(request))
249 }
250 Err(err) => Err(err),
251 },
252 }
253 }
254
255 pub fn err(&self) -> Option<&RequestError> {
257 match self {
258 Self::BlockHeaders(res) => res.as_ref().err(),
259 Self::BlockBodies(res) => res.as_ref().err(),
260 Self::PooledTransactions(res) => res.as_ref().err(),
261 Self::NodeData(res) => res.as_ref().err(),
262 Self::Receipts(res) => res.as_ref().err(),
263 Self::Receipts69(res) => res.as_ref().err(),
264 Self::Receipts70(res) => res.as_ref().err(),
265 Self::BlockAccessLists(res) => res.as_ref().err(),
266 }
267 }
268
269 pub fn is_err(&self) -> bool {
271 self.err().is_some()
272 }
273}