reth_network/
message.rs

1//! Capability messaging
2//!
3//! An `RLPx` stream is multiplexed via the prepended message-id of a framed message.
4//! Capabilities are exchanged via the `RLPx` `Hello` message as pairs of `(id, version)`, <https://github.com/ethereum/devp2p/blob/master/rlpx.md#capability-messaging>
5
6use crate::types::{Receipts69, Receipts70};
7use alloy_consensus::{BlockHeader, ReceiptWithBloom};
8use alloy_primitives::{Bytes, B256};
9use futures::FutureExt;
10use reth_eth_wire::{
11    message::RequestPair, BlockBodies, BlockHeaders, BlockRangeUpdate, EthMessage,
12    EthNetworkPrimitives, GetBlockBodies, GetBlockHeaders, NetworkPrimitives, NewBlock,
13    NewBlockHashes, NewBlockPayload, NewPooledTransactionHashes, NodeData, PooledTransactions,
14    Receipts, SharedTransactions, Transactions,
15};
16use reth_eth_wire_types::RawCapabilityMessage;
17use reth_network_api::PeerRequest;
18use reth_network_p2p::error::{RequestError, RequestResult};
19use reth_primitives_traits::Block;
20use std::{
21    sync::Arc,
22    task::{ready, Context, Poll},
23};
24use tokio::sync::oneshot;
25
26/// Internal form of a `NewBlock` message
27#[derive(Debug, Clone)]
28pub struct NewBlockMessage<P = NewBlock<reth_ethereum_primitives::Block>> {
29    /// Hash of the block
30    pub hash: B256,
31    /// Raw received message
32    pub block: Arc<P>,
33}
34
35// === impl NewBlockMessage ===
36
37impl<P: NewBlockPayload> NewBlockMessage<P> {
38    /// Returns the block number of the block
39    pub fn number(&self) -> u64 {
40        self.block.block().header().number()
41    }
42}
43
44/// All Bi-directional eth-message variants that can be sent to a session or received from a
45/// session.
46#[derive(Debug)]
47pub enum PeerMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
48    /// Announce new block hashes
49    NewBlockHashes(NewBlockHashes),
50    /// Broadcast new block.
51    NewBlock(NewBlockMessage<N::NewBlockPayload>),
52    /// Received transactions _from_ the peer
53    ReceivedTransaction(Transactions<N::BroadcastedTransaction>),
54    /// Broadcast transactions _from_ local _to_ a peer.
55    SendTransactions(SharedTransactions<N::BroadcastedTransaction>),
56    /// Send new pooled transactions
57    PooledTransactions(NewPooledTransactionHashes),
58    /// All `eth` request variants.
59    EthRequest(PeerRequest<N>),
60    /// Announces when `BlockRange` is updated.
61    BlockRangeUpdated(BlockRangeUpdate),
62    /// Any other or manually crafted eth message.
63    ///
64    /// Caution: It is expected that this is a valid `eth_` capability message.
65    Other(RawCapabilityMessage),
66}
67
68/// Request Variants that only target block related data.
69#[derive(Debug, Clone, PartialEq, Eq)]
70pub enum BlockRequest {
71    /// Requests block headers from the peer.
72    ///
73    /// The response should be sent through the channel.
74    GetBlockHeaders(GetBlockHeaders),
75
76    /// Requests block bodies from the peer.
77    ///
78    /// The response should be sent through the channel.
79    GetBlockBodies(GetBlockBodies),
80}
81
82/// Corresponding variant for [`PeerRequest`].
83#[derive(Debug)]
84pub enum PeerResponse<N: NetworkPrimitives = EthNetworkPrimitives> {
85    /// Represents a response to a request for block headers.
86    BlockHeaders {
87        /// The receiver channel for the response to a block headers request.
88        response: oneshot::Receiver<RequestResult<BlockHeaders<N::BlockHeader>>>,
89    },
90    /// Represents a response to a request for block bodies.
91    BlockBodies {
92        /// The receiver channel for the response to a block bodies request.
93        response: oneshot::Receiver<RequestResult<BlockBodies<N::BlockBody>>>,
94    },
95    /// Represents a response to a request for pooled transactions.
96    PooledTransactions {
97        /// The receiver channel for the response to a pooled transactions request.
98        response: oneshot::Receiver<RequestResult<PooledTransactions<N::PooledTransaction>>>,
99    },
100    /// Represents a response to a request for `NodeData`.
101    NodeData {
102        /// The receiver channel for the response to a `NodeData` request.
103        response: oneshot::Receiver<RequestResult<NodeData>>,
104    },
105    /// Represents a response to a request for receipts.
106    Receipts {
107        /// The receiver channel for the response to a receipts request.
108        response: oneshot::Receiver<RequestResult<Receipts<N::Receipt>>>,
109    },
110    /// Represents a response to a request for receipts.
111    ///
112    /// This is a variant of `Receipts` that was introduced in `eth/69`.
113    /// The difference is that this variant does not require the inclusion of bloom filters in the
114    /// response, making it more lightweight.
115    Receipts69 {
116        /// The receiver channel for the response to a receipts request.
117        response: oneshot::Receiver<RequestResult<Receipts69<N::Receipt>>>,
118    },
119    /// Represents a response to a request for receipts using eth/70.
120    Receipts70 {
121        /// The receiver channel for the response to a receipts request.
122        response: oneshot::Receiver<RequestResult<Receipts70<N::Receipt>>>,
123    },
124}
125
126// === impl PeerResponse ===
127
128impl<N: NetworkPrimitives> PeerResponse<N> {
129    /// Polls the type to completion.
130    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerResponseResult<N>> {
131        macro_rules! poll_request {
132            ($response:ident, $item:ident, $cx:ident) => {
133                match ready!($response.poll_unpin($cx)) {
134                    Ok(res) => PeerResponseResult::$item(res.map(|item| item.0)),
135                    Err(err) => PeerResponseResult::$item(Err(err.into())),
136                }
137            };
138        }
139
140        let res = match self {
141            Self::BlockHeaders { response } => {
142                poll_request!(response, BlockHeaders, cx)
143            }
144            Self::BlockBodies { response } => {
145                poll_request!(response, BlockBodies, cx)
146            }
147            Self::PooledTransactions { response } => {
148                poll_request!(response, PooledTransactions, cx)
149            }
150            Self::NodeData { response } => {
151                poll_request!(response, NodeData, cx)
152            }
153            Self::Receipts { response } => {
154                poll_request!(response, Receipts, cx)
155            }
156            Self::Receipts69 { response } => {
157                poll_request!(response, Receipts69, cx)
158            }
159            Self::Receipts70 { response } => match ready!(response.poll_unpin(cx)) {
160                Ok(res) => PeerResponseResult::Receipts70(res),
161                Err(err) => PeerResponseResult::Receipts70(Err(err.into())),
162            },
163        };
164        Poll::Ready(res)
165    }
166}
167
168/// All response variants for [`PeerResponse`]
169#[derive(Debug)]
170pub enum PeerResponseResult<N: NetworkPrimitives = EthNetworkPrimitives> {
171    /// Represents a result containing block headers or an error.
172    BlockHeaders(RequestResult<Vec<N::BlockHeader>>),
173    /// Represents a result containing block bodies or an error.
174    BlockBodies(RequestResult<Vec<N::BlockBody>>),
175    /// Represents a result containing pooled transactions or an error.
176    PooledTransactions(RequestResult<Vec<N::PooledTransaction>>),
177    /// Represents a result containing node data or an error.
178    NodeData(RequestResult<Vec<Bytes>>),
179    /// Represents a result containing receipts or an error.
180    Receipts(RequestResult<Vec<Vec<ReceiptWithBloom<N::Receipt>>>>),
181    /// Represents a result containing receipts or an error for eth/69.
182    Receipts69(RequestResult<Vec<Vec<N::Receipt>>>),
183    /// Represents a result containing receipts or an error for eth/70.
184    Receipts70(RequestResult<Receipts70<N::Receipt>>),
185}
186
187// === impl PeerResponseResult ===
188
189impl<N: NetworkPrimitives> PeerResponseResult<N> {
190    /// Converts this response into an [`EthMessage`]
191    pub fn try_into_message(self, id: u64) -> RequestResult<EthMessage<N>> {
192        macro_rules! to_message {
193            ($response:ident, $item:ident, $request_id:ident) => {
194                match $response {
195                    Ok(res) => {
196                        let request = RequestPair { request_id: $request_id, message: $item(res) };
197                        Ok(EthMessage::$item(request))
198                    }
199                    Err(err) => Err(err),
200                }
201            };
202        }
203        match self {
204            Self::BlockHeaders(resp) => {
205                to_message!(resp, BlockHeaders, id)
206            }
207            Self::BlockBodies(resp) => {
208                to_message!(resp, BlockBodies, id)
209            }
210            Self::PooledTransactions(resp) => {
211                to_message!(resp, PooledTransactions, id)
212            }
213            Self::NodeData(resp) => {
214                to_message!(resp, NodeData, id)
215            }
216            Self::Receipts(resp) => {
217                to_message!(resp, Receipts, id)
218            }
219            Self::Receipts69(resp) => {
220                to_message!(resp, Receipts69, id)
221            }
222            Self::Receipts70(resp) => match resp {
223                Ok(res) => {
224                    let request = RequestPair { request_id: id, message: res };
225                    Ok(EthMessage::Receipts70(request))
226                }
227                Err(err) => Err(err),
228            },
229        }
230    }
231
232    /// Returns the `Err` value if the result is an error.
233    pub fn err(&self) -> Option<&RequestError> {
234        match self {
235            Self::BlockHeaders(res) => res.as_ref().err(),
236            Self::BlockBodies(res) => res.as_ref().err(),
237            Self::PooledTransactions(res) => res.as_ref().err(),
238            Self::NodeData(res) => res.as_ref().err(),
239            Self::Receipts(res) => res.as_ref().err(),
240            Self::Receipts69(res) => res.as_ref().err(),
241            Self::Receipts70(res) => res.as_ref().err(),
242        }
243    }
244
245    /// Returns whether this result is an error.
246    pub fn is_err(&self) -> bool {
247        self.err().is_some()
248    }
249}