1use alloy_consensus::{BlockHeader, ReceiptWithBloom};
7use alloy_primitives::{Bytes, B256};
8use futures::FutureExt;
9use reth_eth_wire::{
10 message::RequestPair, BlockBodies, BlockHeaders, EthMessage, EthNetworkPrimitives,
11 GetBlockBodies, GetBlockHeaders, NetworkPrimitives, NewBlock, NewBlockHashes,
12 NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts, SharedTransactions,
13 Transactions,
14};
15use reth_eth_wire_types::RawCapabilityMessage;
16use reth_network_api::PeerRequest;
17use reth_network_p2p::error::{RequestError, RequestResult};
18use std::{
19 sync::Arc,
20 task::{ready, Context, Poll},
21};
22use tokio::sync::oneshot;
23
24#[derive(Debug, Clone)]
26pub struct NewBlockMessage<B = reth_ethereum_primitives::Block> {
27 pub hash: B256,
29 pub block: Arc<NewBlock<B>>,
31}
32
33impl<B: reth_primitives_traits::Block> NewBlockMessage<B> {
36 pub fn number(&self) -> u64 {
38 self.block.block.header().number()
39 }
40}
41
42#[derive(Debug)]
45pub enum PeerMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
46 NewBlockHashes(NewBlockHashes),
48 NewBlock(NewBlockMessage<N::Block>),
50 ReceivedTransaction(Transactions<N::BroadcastedTransaction>),
52 SendTransactions(SharedTransactions<N::BroadcastedTransaction>),
54 PooledTransactions(NewPooledTransactionHashes),
56 EthRequest(PeerRequest<N>),
58 Other(RawCapabilityMessage),
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
66pub enum BlockRequest {
67 GetBlockHeaders(GetBlockHeaders),
71
72 GetBlockBodies(GetBlockBodies),
76}
77
78#[derive(Debug)]
80pub enum PeerResponse<N: NetworkPrimitives = EthNetworkPrimitives> {
81 BlockHeaders {
83 response: oneshot::Receiver<RequestResult<BlockHeaders<N::BlockHeader>>>,
85 },
86 BlockBodies {
88 response: oneshot::Receiver<RequestResult<BlockBodies<N::BlockBody>>>,
90 },
91 PooledTransactions {
93 response: oneshot::Receiver<RequestResult<PooledTransactions<N::PooledTransaction>>>,
95 },
96 NodeData {
98 response: oneshot::Receiver<RequestResult<NodeData>>,
100 },
101 Receipts {
103 response: oneshot::Receiver<RequestResult<Receipts<N::Receipt>>>,
105 },
106}
107
108impl<N: NetworkPrimitives> PeerResponse<N> {
111 pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerResponseResult<N>> {
113 macro_rules! poll_request {
114 ($response:ident, $item:ident, $cx:ident) => {
115 match ready!($response.poll_unpin($cx)) {
116 Ok(res) => PeerResponseResult::$item(res.map(|item| item.0)),
117 Err(err) => PeerResponseResult::$item(Err(err.into())),
118 }
119 };
120 }
121
122 let res = match self {
123 Self::BlockHeaders { response } => {
124 poll_request!(response, BlockHeaders, cx)
125 }
126 Self::BlockBodies { response } => {
127 poll_request!(response, BlockBodies, cx)
128 }
129 Self::PooledTransactions { response } => {
130 poll_request!(response, PooledTransactions, cx)
131 }
132 Self::NodeData { response } => {
133 poll_request!(response, NodeData, cx)
134 }
135 Self::Receipts { response } => {
136 poll_request!(response, Receipts, cx)
137 }
138 };
139 Poll::Ready(res)
140 }
141}
142
143#[derive(Debug)]
145pub enum PeerResponseResult<N: NetworkPrimitives = EthNetworkPrimitives> {
146 BlockHeaders(RequestResult<Vec<N::BlockHeader>>),
148 BlockBodies(RequestResult<Vec<N::BlockBody>>),
150 PooledTransactions(RequestResult<Vec<N::PooledTransaction>>),
152 NodeData(RequestResult<Vec<Bytes>>),
154 Receipts(RequestResult<Vec<Vec<ReceiptWithBloom<N::Receipt>>>>),
156}
157
158impl<N: NetworkPrimitives> PeerResponseResult<N> {
161 pub fn try_into_message(self, id: u64) -> RequestResult<EthMessage<N>> {
163 macro_rules! to_message {
164 ($response:ident, $item:ident, $request_id:ident) => {
165 match $response {
166 Ok(res) => {
167 let request = RequestPair { request_id: $request_id, message: $item(res) };
168 Ok(EthMessage::$item(request))
169 }
170 Err(err) => Err(err),
171 }
172 };
173 }
174 match self {
175 Self::BlockHeaders(resp) => {
176 to_message!(resp, BlockHeaders, id)
177 }
178 Self::BlockBodies(resp) => {
179 to_message!(resp, BlockBodies, id)
180 }
181 Self::PooledTransactions(resp) => {
182 to_message!(resp, PooledTransactions, id)
183 }
184 Self::NodeData(resp) => {
185 to_message!(resp, NodeData, id)
186 }
187 Self::Receipts(resp) => {
188 to_message!(resp, Receipts, id)
189 }
190 }
191 }
192
193 pub fn err(&self) -> Option<&RequestError> {
195 match self {
196 Self::BlockHeaders(res) => res.as_ref().err(),
197 Self::BlockBodies(res) => res.as_ref().err(),
198 Self::PooledTransactions(res) => res.as_ref().err(),
199 Self::NodeData(res) => res.as_ref().err(),
200 Self::Receipts(res) => res.as_ref().err(),
201 }
202 }
203
204 pub fn is_err(&self) -> bool {
206 self.err().is_some()
207 }
208}