Skip to main content

reth_network/
message.rs

1//! Capability messaging
2//!
3//! An `RLPx` stream is multiplexed via the prepended message-id of a framed message.
4//! Capabilities are exchanged via the `RLPx` `Hello` message as pairs of `(id, version)`, <https://github.com/ethereum/devp2p/blob/master/rlpx.md#capability-messaging>
5
6use crate::types::{BlockAccessLists, Receipts69, Receipts70};
7use alloy_consensus::{BlockHeader, ReceiptWithBloom};
8use alloy_primitives::{Bytes, B256};
9use futures::FutureExt;
10use reth_eth_wire::{
11    message::RequestPair, BlockBodies, BlockHeaders, BlockRangeUpdate, Cells, EthMessage,
12    EthNetworkPrimitives, GetBlockAccessLists, GetBlockBodies, GetBlockHeaders, GetReceipts,
13    NetworkPrimitives, NewBlock, NewBlockHashes, NewBlockPayload, NewPooledTransactionHashes,
14    NodeData, PooledTransactions, Receipts, SharedTransactions, Transactions,
15};
16use reth_eth_wire_types::RawCapabilityMessage;
17use reth_network_api::PeerRequest;
18use reth_network_p2p::error::{RequestError, RequestResult};
19use reth_primitives_traits::Block;
20use std::{
21    sync::Arc,
22    task::{ready, Context, Poll},
23};
24use tokio::sync::oneshot;
25
26/// Internal form of a `NewBlock` message
27#[derive(Debug, Clone)]
28pub struct NewBlockMessage<P = NewBlock<reth_ethereum_primitives::Block>> {
29    /// Hash of the block
30    pub hash: B256,
31    /// Raw received message
32    pub block: Arc<P>,
33}
34
35// === impl NewBlockMessage ===
36
37impl<P: NewBlockPayload> NewBlockMessage<P> {
38    /// Returns the block number of the block
39    pub fn number(&self) -> u64 {
40        self.block.block().header().number()
41    }
42}
43
44/// All Bi-directional eth-message variants that can be sent to a session or received from a
45/// session.
46#[derive(Debug)]
47pub enum PeerMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
48    /// Announce new block hashes
49    NewBlockHashes(NewBlockHashes),
50    /// Broadcast new block.
51    NewBlock(NewBlockMessage<N::NewBlockPayload>),
52    /// Received transactions _from_ the peer
53    ReceivedTransaction(Transactions<N::BroadcastedTransaction>),
54    /// Broadcast transactions _from_ local _to_ a peer.
55    SendTransactions(SharedTransactions<N::BroadcastedTransaction>),
56    /// Send new pooled transactions
57    PooledTransactions(NewPooledTransactionHashes),
58    /// All `eth` request variants.
59    EthRequest(PeerRequest<N>),
60    /// Announces when `BlockRange` is updated.
61    BlockRangeUpdated(BlockRangeUpdate),
62    /// Any other or manually crafted eth message.
63    ///
64    /// Caution: It is expected that this is a valid `eth_` capability message.
65    Other(RawCapabilityMessage),
66}
67
68impl<N: NetworkPrimitives> PeerMessage<N> {
69    /// Returns a static string identifying the message variant for logging.
70    pub const fn message_kind(&self) -> &'static str {
71        match self {
72            Self::NewBlockHashes(_) => "NewBlockHashes",
73            Self::NewBlock(_) => "NewBlock",
74            Self::ReceivedTransaction(_) => "ReceivedTransaction",
75            Self::SendTransactions(_) => "SendTransactions",
76            Self::PooledTransactions(_) => "PooledTransactions",
77            Self::EthRequest(_) => "EthRequest",
78            Self::BlockRangeUpdated(_) => "BlockRangeUpdated",
79            Self::Other(_) => "Other",
80        }
81    }
82
83    /// Returns `true` if this message is a broadcast (block/transaction announcement or
84    /// propagation) rather than a request/response.
85    pub const fn is_broadcast(&self) -> bool {
86        matches!(
87            self,
88            Self::NewBlockHashes(_) |
89                Self::NewBlock(_) |
90                Self::SendTransactions(_) |
91                Self::PooledTransactions(_)
92        )
93    }
94
95    /// Returns the number of items in the message payload, if applicable.
96    pub fn message_item_count(&self) -> usize {
97        match self {
98            Self::NewBlockHashes(msg) => msg.len(),
99            Self::ReceivedTransaction(msg) => msg.len(),
100            Self::SendTransactions(msg) => msg.len(),
101            Self::PooledTransactions(msg) => msg.len(),
102            Self::NewBlock(_) |
103            Self::EthRequest(_) |
104            Self::BlockRangeUpdated(_) |
105            Self::Other(_) => 1,
106        }
107    }
108}
109
110/// Request Variants that only target block related data.
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub enum BlockRequest {
113    /// Requests block headers from the peer.
114    ///
115    /// The response should be sent through the channel.
116    GetBlockHeaders(GetBlockHeaders),
117
118    /// Requests block bodies from the peer.
119    ///
120    /// The response should be sent through the channel.
121    GetBlockBodies(GetBlockBodies),
122    /// Requests block access lists from the peer.
123    ///
124    /// The response should be sent through the channel.
125    GetBlockAccessLists(GetBlockAccessLists),
126
127    /// Requests receipts from the peer.
128    ///
129    /// The response should be sent through the channel.
130    GetReceipts(GetReceipts),
131}
132
133/// Corresponding variant for [`PeerRequest`].
134#[derive(Debug)]
135pub enum PeerResponse<N: NetworkPrimitives = EthNetworkPrimitives> {
136    /// Represents a response to a request for block headers.
137    BlockHeaders {
138        /// The receiver channel for the response to a block headers request.
139        response: oneshot::Receiver<RequestResult<BlockHeaders<N::BlockHeader>>>,
140    },
141    /// Represents a response to a request for block bodies.
142    BlockBodies {
143        /// The receiver channel for the response to a block bodies request.
144        response: oneshot::Receiver<RequestResult<BlockBodies<N::BlockBody>>>,
145    },
146    /// Represents a response to a request for pooled transactions.
147    PooledTransactions {
148        /// The receiver channel for the response to a pooled transactions request.
149        response: oneshot::Receiver<RequestResult<PooledTransactions<N::PooledTransaction>>>,
150    },
151    /// Represents a response to a request for `NodeData`.
152    NodeData {
153        /// The receiver channel for the response to a `NodeData` request.
154        response: oneshot::Receiver<RequestResult<NodeData>>,
155    },
156    /// Represents a response to a request for receipts.
157    Receipts {
158        /// The receiver channel for the response to a receipts request.
159        response: oneshot::Receiver<RequestResult<Receipts<N::Receipt>>>,
160    },
161    /// Represents a response to a request for receipts.
162    ///
163    /// This is a variant of `Receipts` that was introduced in `eth/69`.
164    /// The difference is that this variant does not require the inclusion of bloom filters in the
165    /// response, making it more lightweight.
166    Receipts69 {
167        /// The receiver channel for the response to a receipts request.
168        response: oneshot::Receiver<RequestResult<Receipts69<N::Receipt>>>,
169    },
170    /// Represents a response to a request for receipts using eth/70.
171    Receipts70 {
172        /// The receiver channel for the response to a receipts request.
173        response: oneshot::Receiver<RequestResult<Receipts70<N::Receipt>>>,
174    },
175    /// Represents a response to a request for block access lists.
176    BlockAccessLists {
177        /// The receiver channel for the response to a block access lists request.
178        response: oneshot::Receiver<RequestResult<BlockAccessLists>>,
179    },
180    ///
181    /// Represents a response to a request for cells.
182    Cells {
183        /// The receiver channel for the response to a cells request.
184        response: oneshot::Receiver<RequestResult<Cells>>,
185    },
186}
187
188// === impl PeerResponse ===
189
190impl<N: NetworkPrimitives> PeerResponse<N> {
191    /// Polls the type to completion.
192    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerResponseResult<N>> {
193        macro_rules! poll_request {
194            ($response:ident, $item:ident, $cx:ident) => {
195                match ready!($response.poll_unpin($cx)) {
196                    Ok(res) => PeerResponseResult::$item(res.map(|item| item.0)),
197                    Err(err) => PeerResponseResult::$item(Err(err.into())),
198                }
199            };
200        }
201
202        let res = match self {
203            Self::BlockHeaders { response } => {
204                poll_request!(response, BlockHeaders, cx)
205            }
206            Self::BlockBodies { response } => {
207                poll_request!(response, BlockBodies, cx)
208            }
209            Self::PooledTransactions { response } => {
210                poll_request!(response, PooledTransactions, cx)
211            }
212            Self::NodeData { response } => {
213                poll_request!(response, NodeData, cx)
214            }
215            Self::Receipts { response } => {
216                poll_request!(response, Receipts, cx)
217            }
218            Self::Receipts69 { response } => {
219                poll_request!(response, Receipts69, cx)
220            }
221            Self::Receipts70 { response } => match ready!(response.poll_unpin(cx)) {
222                Ok(res) => PeerResponseResult::Receipts70(res),
223                Err(err) => PeerResponseResult::Receipts70(Err(err.into())),
224            },
225            Self::BlockAccessLists { response } => match ready!(response.poll_unpin(cx)) {
226                Ok(res) => PeerResponseResult::BlockAccessLists(res),
227                Err(err) => PeerResponseResult::BlockAccessLists(Err(err.into())),
228            },
229            Self::Cells { response } => match ready!(response.poll_unpin(cx)) {
230                Ok(res) => PeerResponseResult::Cells(res),
231                Err(err) => PeerResponseResult::Cells(Err(err.into())),
232            },
233        };
234        Poll::Ready(res)
235    }
236}
237
238/// All response variants for [`PeerResponse`]
239#[derive(Debug)]
240pub enum PeerResponseResult<N: NetworkPrimitives = EthNetworkPrimitives> {
241    /// Represents a result containing block headers or an error.
242    BlockHeaders(RequestResult<Vec<N::BlockHeader>>),
243    /// Represents a result containing block bodies or an error.
244    BlockBodies(RequestResult<Vec<N::BlockBody>>),
245    /// Represents a result containing pooled transactions or an error.
246    PooledTransactions(RequestResult<Vec<N::PooledTransaction>>),
247    /// Represents a result containing node data or an error.
248    NodeData(RequestResult<Vec<Bytes>>),
249    /// Represents a result containing receipts or an error.
250    Receipts(RequestResult<Vec<Vec<ReceiptWithBloom<N::Receipt>>>>),
251    /// Represents a result containing receipts or an error for eth/69.
252    Receipts69(RequestResult<Vec<Vec<N::Receipt>>>),
253    /// Represents a result containing receipts or an error for eth/70.
254    Receipts70(RequestResult<Receipts70<N::Receipt>>),
255    /// Represents a result containing block access lists or an error.
256    BlockAccessLists(RequestResult<BlockAccessLists>),
257    /// Represents a result containing cells or an error.
258    Cells(RequestResult<Cells>),
259}
260
261// === impl PeerResponseResult ===
262
263impl<N: NetworkPrimitives> PeerResponseResult<N> {
264    /// Converts this response into an [`EthMessage`]
265    pub fn try_into_message(self, id: u64) -> RequestResult<EthMessage<N>> {
266        macro_rules! to_message {
267            ($response:ident, $item:ident, $request_id:ident) => {
268                match $response {
269                    Ok(res) => {
270                        let request = RequestPair { request_id: $request_id, message: $item(res) };
271                        Ok(EthMessage::$item(request))
272                    }
273                    Err(err) => Err(err),
274                }
275            };
276        }
277        match self {
278            Self::BlockHeaders(resp) => {
279                to_message!(resp, BlockHeaders, id)
280            }
281            Self::BlockBodies(resp) => {
282                to_message!(resp, BlockBodies, id)
283            }
284            Self::PooledTransactions(resp) => {
285                to_message!(resp, PooledTransactions, id)
286            }
287            Self::NodeData(resp) => {
288                to_message!(resp, NodeData, id)
289            }
290            Self::Receipts(resp) => {
291                to_message!(resp, Receipts, id)
292            }
293            Self::Receipts69(resp) => {
294                to_message!(resp, Receipts69, id)
295            }
296            Self::Receipts70(resp) => match resp {
297                Ok(res) => {
298                    let request = RequestPair { request_id: id, message: res };
299                    Ok(EthMessage::Receipts70(request))
300                }
301                Err(err) => Err(err),
302            },
303            Self::BlockAccessLists(resp) => match resp {
304                Ok(res) => {
305                    let request = RequestPair { request_id: id, message: res };
306                    Ok(EthMessage::BlockAccessLists(request))
307                }
308                Err(err) => Err(err),
309            },
310            Self::Cells(resp) => match resp {
311                Ok(res) => {
312                    let request = RequestPair { request_id: id, message: res };
313                    Ok(EthMessage::Cells(request))
314                }
315                Err(err) => Err(err),
316            },
317        }
318    }
319
320    /// Returns the `Err` value if the result is an error.
321    pub fn err(&self) -> Option<&RequestError> {
322        match self {
323            Self::BlockHeaders(res) => res.as_ref().err(),
324            Self::BlockBodies(res) => res.as_ref().err(),
325            Self::PooledTransactions(res) => res.as_ref().err(),
326            Self::NodeData(res) => res.as_ref().err(),
327            Self::Receipts(res) => res.as_ref().err(),
328            Self::Receipts69(res) => res.as_ref().err(),
329            Self::Receipts70(res) => res.as_ref().err(),
330            Self::BlockAccessLists(res) => res.as_ref().err(),
331            Self::Cells(res) => res.as_ref().err(),
332        }
333    }
334
335    /// Returns whether this result is an error.
336    pub fn is_err(&self) -> bool {
337        self.err().is_some()
338    }
339}