reth_network/
message.rs

1//! Capability messaging
2//!
3//! An `RLPx` stream is multiplexed via the prepended message-id of a framed message.
4//! Capabilities are exchanged via the `RLPx` `Hello` message as pairs of `(id, version)`, <https://github.com/ethereum/devp2p/blob/master/rlpx.md#capability-messaging>
5
6use alloy_consensus::{BlockHeader, ReceiptWithBloom};
7use alloy_primitives::{Bytes, B256};
8use futures::FutureExt;
9use reth_eth_wire::{
10    message::RequestPair, BlockBodies, BlockHeaders, EthMessage, EthNetworkPrimitives,
11    GetBlockBodies, GetBlockHeaders, NetworkPrimitives, NewBlock, NewBlockHashes,
12    NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts, SharedTransactions,
13    Transactions,
14};
15use reth_eth_wire_types::RawCapabilityMessage;
16use reth_network_api::PeerRequest;
17use reth_network_p2p::error::{RequestError, RequestResult};
18use std::{
19    sync::Arc,
20    task::{ready, Context, Poll},
21};
22use tokio::sync::oneshot;
23
24/// Internal form of a `NewBlock` message
25#[derive(Debug, Clone)]
26pub struct NewBlockMessage<B = reth_ethereum_primitives::Block> {
27    /// Hash of the block
28    pub hash: B256,
29    /// Raw received message
30    pub block: Arc<NewBlock<B>>,
31}
32
33// === impl NewBlockMessage ===
34
35impl<B: reth_primitives_traits::Block> NewBlockMessage<B> {
36    /// Returns the block number of the block
37    pub fn number(&self) -> u64 {
38        self.block.block.header().number()
39    }
40}
41
42/// All Bi-directional eth-message variants that can be sent to a session or received from a
43/// session.
44#[derive(Debug)]
45pub enum PeerMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
46    /// Announce new block hashes
47    NewBlockHashes(NewBlockHashes),
48    /// Broadcast new block.
49    NewBlock(NewBlockMessage<N::Block>),
50    /// Received transactions _from_ the peer
51    ReceivedTransaction(Transactions<N::BroadcastedTransaction>),
52    /// Broadcast transactions _from_ local _to_ a peer.
53    SendTransactions(SharedTransactions<N::BroadcastedTransaction>),
54    /// Send new pooled transactions
55    PooledTransactions(NewPooledTransactionHashes),
56    /// All `eth` request variants.
57    EthRequest(PeerRequest<N>),
58    /// Any other or manually crafted eth message.
59    ///
60    /// Caution: It is expected that this is a valid `eth_` capability message.
61    Other(RawCapabilityMessage),
62}
63
64/// Request Variants that only target block related data.
65#[derive(Debug, Clone, PartialEq, Eq)]
66pub enum BlockRequest {
67    /// Requests block headers from the peer.
68    ///
69    /// The response should be sent through the channel.
70    GetBlockHeaders(GetBlockHeaders),
71
72    /// Requests block bodies from the peer.
73    ///
74    /// The response should be sent through the channel.
75    GetBlockBodies(GetBlockBodies),
76}
77
78/// Corresponding variant for [`PeerRequest`].
79#[derive(Debug)]
80pub enum PeerResponse<N: NetworkPrimitives = EthNetworkPrimitives> {
81    /// Represents a response to a request for block headers.
82    BlockHeaders {
83        /// The receiver channel for the response to a block headers request.
84        response: oneshot::Receiver<RequestResult<BlockHeaders<N::BlockHeader>>>,
85    },
86    /// Represents a response to a request for block bodies.
87    BlockBodies {
88        /// The receiver channel for the response to a block bodies request.
89        response: oneshot::Receiver<RequestResult<BlockBodies<N::BlockBody>>>,
90    },
91    /// Represents a response to a request for pooled transactions.
92    PooledTransactions {
93        /// The receiver channel for the response to a pooled transactions request.
94        response: oneshot::Receiver<RequestResult<PooledTransactions<N::PooledTransaction>>>,
95    },
96    /// Represents a response to a request for `NodeData`.
97    NodeData {
98        /// The receiver channel for the response to a `NodeData` request.
99        response: oneshot::Receiver<RequestResult<NodeData>>,
100    },
101    /// Represents a response to a request for receipts.
102    Receipts {
103        /// The receiver channel for the response to a receipts request.
104        response: oneshot::Receiver<RequestResult<Receipts<N::Receipt>>>,
105    },
106}
107
108// === impl PeerResponse ===
109
110impl<N: NetworkPrimitives> PeerResponse<N> {
111    /// Polls the type to completion.
112    pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerResponseResult<N>> {
113        macro_rules! poll_request {
114            ($response:ident, $item:ident, $cx:ident) => {
115                match ready!($response.poll_unpin($cx)) {
116                    Ok(res) => PeerResponseResult::$item(res.map(|item| item.0)),
117                    Err(err) => PeerResponseResult::$item(Err(err.into())),
118                }
119            };
120        }
121
122        let res = match self {
123            Self::BlockHeaders { response } => {
124                poll_request!(response, BlockHeaders, cx)
125            }
126            Self::BlockBodies { response } => {
127                poll_request!(response, BlockBodies, cx)
128            }
129            Self::PooledTransactions { response } => {
130                poll_request!(response, PooledTransactions, cx)
131            }
132            Self::NodeData { response } => {
133                poll_request!(response, NodeData, cx)
134            }
135            Self::Receipts { response } => {
136                poll_request!(response, Receipts, cx)
137            }
138        };
139        Poll::Ready(res)
140    }
141}
142
143/// All response variants for [`PeerResponse`]
144#[derive(Debug)]
145pub enum PeerResponseResult<N: NetworkPrimitives = EthNetworkPrimitives> {
146    /// Represents a result containing block headers or an error.
147    BlockHeaders(RequestResult<Vec<N::BlockHeader>>),
148    /// Represents a result containing block bodies or an error.
149    BlockBodies(RequestResult<Vec<N::BlockBody>>),
150    /// Represents a result containing pooled transactions or an error.
151    PooledTransactions(RequestResult<Vec<N::PooledTransaction>>),
152    /// Represents a result containing node data or an error.
153    NodeData(RequestResult<Vec<Bytes>>),
154    /// Represents a result containing receipts or an error.
155    Receipts(RequestResult<Vec<Vec<ReceiptWithBloom<N::Receipt>>>>),
156}
157
158// === impl PeerResponseResult ===
159
160impl<N: NetworkPrimitives> PeerResponseResult<N> {
161    /// Converts this response into an [`EthMessage`]
162    pub fn try_into_message(self, id: u64) -> RequestResult<EthMessage<N>> {
163        macro_rules! to_message {
164            ($response:ident, $item:ident, $request_id:ident) => {
165                match $response {
166                    Ok(res) => {
167                        let request = RequestPair { request_id: $request_id, message: $item(res) };
168                        Ok(EthMessage::$item(request))
169                    }
170                    Err(err) => Err(err),
171                }
172            };
173        }
174        match self {
175            Self::BlockHeaders(resp) => {
176                to_message!(resp, BlockHeaders, id)
177            }
178            Self::BlockBodies(resp) => {
179                to_message!(resp, BlockBodies, id)
180            }
181            Self::PooledTransactions(resp) => {
182                to_message!(resp, PooledTransactions, id)
183            }
184            Self::NodeData(resp) => {
185                to_message!(resp, NodeData, id)
186            }
187            Self::Receipts(resp) => {
188                to_message!(resp, Receipts, id)
189            }
190        }
191    }
192
193    /// Returns the `Err` value if the result is an error.
194    pub fn err(&self) -> Option<&RequestError> {
195        match self {
196            Self::BlockHeaders(res) => res.as_ref().err(),
197            Self::BlockBodies(res) => res.as_ref().err(),
198            Self::PooledTransactions(res) => res.as_ref().err(),
199            Self::NodeData(res) => res.as_ref().err(),
200            Self::Receipts(res) => res.as_ref().err(),
201        }
202    }
203
204    /// Returns whether this result is an error.
205    pub fn is_err(&self) -> bool {
206        self.err().is_some()
207    }
208}