1use crate::types::Receipts69;
7use alloy_consensus::{BlockHeader, ReceiptWithBloom};
8use alloy_primitives::{Bytes, B256};
9use futures::FutureExt;
10use reth_eth_wire::{
11 message::RequestPair, BlockBodies, BlockHeaders, BlockRangeUpdate, EthMessage,
12 EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives, NewBlock,
13 NewBlockHashes, NewBlockPayload, NewPooledTransactionHashes, NodeData, PooledTransactions,
14 Receipts, SharedTransactions, Transactions,
15};
16use reth_eth_wire_types::RawCapabilityMessage;
17use reth_network_api::PeerRequest;
18use reth_network_p2p::error::{RequestError, RequestResult};
19use reth_primitives_traits::Block;
20use std::{
21 sync::Arc,
22 task::{ready, Context, Poll},
23};
24use tokio::sync::oneshot;
25
26#[derive(Debug, Clone)]
28pub struct NewBlockMessage<P = NewBlock<reth_ethereum_primitives::Block>> {
29 pub hash: B256,
31 pub block: Arc<P>,
33}
34
35impl<P: NewBlockPayload> NewBlockMessage<P> {
38 pub fn number(&self) -> u64 {
40 self.block.block().header().number()
41 }
42}
43
44#[derive(Debug)]
47pub enum PeerMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
48 NewBlockHashes(NewBlockHashes),
50 NewBlock(NewBlockMessage<N::NewBlockPayload>),
52 ReceivedTransaction(Transactions<N::BroadcastedTransaction>),
54 SendTransactions(SharedTransactions<N::BroadcastedTransaction>),
56 PooledTransactions(NewPooledTransactionHashes),
58 EthRequest(PeerRequest<N>),
60 BlockRangeUpdated(BlockRangeUpdate),
62 Other(RawCapabilityMessage),
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
70pub enum BlockRequest {
71 GetBlockHeaders(GetBlockHeaders),
75
76 GetBlockBodies(GetBlockBodies),
80}
81
82#[derive(Debug)]
84pub enum PeerResponse<N: NetworkPrimitives = EthNetworkPrimitives> {
85 BlockHeaders {
87 response: oneshot::Receiver<RequestResult<BlockHeaders<N::BlockHeader>>>,
89 },
90 BlockBodies {
92 response: oneshot::Receiver<RequestResult<BlockBodies<N::BlockBody>>>,
94 },
95 PooledTransactions {
97 response: oneshot::Receiver<RequestResult<PooledTransactions<N::PooledTransaction>>>,
99 },
100 NodeData {
102 response: oneshot::Receiver<RequestResult<NodeData>>,
104 },
105 Receipts {
107 response: oneshot::Receiver<RequestResult<Receipts<N::Receipt>>>,
109 },
110 Receipts69 {
116 response: oneshot::Receiver<RequestResult<Receipts69<N::Receipt>>>,
118 },
119}
120
121impl<N: NetworkPrimitives> PeerResponse<N> {
124 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerResponseResult<N>> {
126 macro_rules! poll_request {
127 ($response:ident, $item:ident, $cx:ident) => {
128 match ready!($response.poll_unpin($cx)) {
129 Ok(res) => PeerResponseResult::$item(res.map(|item| item.0)),
130 Err(err) => PeerResponseResult::$item(Err(err.into())),
131 }
132 };
133 }
134
135 let res = match self {
136 Self::BlockHeaders { response } => {
137 poll_request!(response, BlockHeaders, cx)
138 }
139 Self::BlockBodies { response } => {
140 poll_request!(response, BlockBodies, cx)
141 }
142 Self::PooledTransactions { response } => {
143 poll_request!(response, PooledTransactions, cx)
144 }
145 Self::NodeData { response } => {
146 poll_request!(response, NodeData, cx)
147 }
148 Self::Receipts { response } => {
149 poll_request!(response, Receipts, cx)
150 }
151 Self::Receipts69 { response } => {
152 poll_request!(response, Receipts69, cx)
153 }
154 };
155 Poll::Ready(res)
156 }
157}
158
159#[derive(Debug)]
161pub enum PeerResponseResult<N: NetworkPrimitives = EthNetworkPrimitives> {
162 BlockHeaders(RequestResult<Vec<N::BlockHeader>>),
164 BlockBodies(RequestResult<Vec<N::BlockBody>>),
166 PooledTransactions(RequestResult<Vec<N::PooledTransaction>>),
168 NodeData(RequestResult<Vec<Bytes>>),
170 Receipts(RequestResult<Vec<Vec<ReceiptWithBloom<N::Receipt>>>>),
172 Receipts69(RequestResult<Vec<Vec<N::Receipt>>>),
174}
175
176impl<N: NetworkPrimitives> PeerResponseResult<N> {
179 pub fn try_into_message(self, id: u64) -> RequestResult<EthMessage<N>> {
181 macro_rules! to_message {
182 ($response:ident, $item:ident, $request_id:ident) => {
183 match $response {
184 Ok(res) => {
185 let request = RequestPair { request_id: $request_id, message: $item(res) };
186 Ok(EthMessage::$item(request))
187 }
188 Err(err) => Err(err),
189 }
190 };
191 }
192 match self {
193 Self::BlockHeaders(resp) => {
194 to_message!(resp, BlockHeaders, id)
195 }
196 Self::BlockBodies(resp) => {
197 to_message!(resp, BlockBodies, id)
198 }
199 Self::PooledTransactions(resp) => {
200 to_message!(resp, PooledTransactions, id)
201 }
202 Self::NodeData(resp) => {
203 to_message!(resp, NodeData, id)
204 }
205 Self::Receipts(resp) => {
206 to_message!(resp, Receipts, id)
207 }
208 Self::Receipts69(resp) => {
209 to_message!(resp, Receipts69, id)
210 }
211 }
212 }
213
214 pub fn err(&self) -> Option<&RequestError> {
216 match self {
217 Self::BlockHeaders(res) => res.as_ref().err(),
218 Self::BlockBodies(res) => res.as_ref().err(),
219 Self::PooledTransactions(res) => res.as_ref().err(),
220 Self::NodeData(res) => res.as_ref().err(),
221 Self::Receipts(res) => res.as_ref().err(),
222 Self::Receipts69(res) => res.as_ref().err(),
223 }
224 }
225
226 pub fn is_err(&self) -> bool {
228 self.err().is_some()
229 }
230}