Skip to main content

reth_eth_wire_types/
message.rs

1//! Implements Ethereum wire protocol for versions 66 through 71.
2//! Defines structs/enums for messages, request-response pairs, and broadcasts.
3//! Handles compatibility with [`EthVersion`].
4//!
5//! Examples include creating, encoding, and decoding protocol messages.
6//!
7//! Reference: [Ethereum Wire Protocol](https://github.com/ethereum/devp2p/blob/master/caps/eth.md).
8
9use super::{
10    broadcast::NewBlockHashes, BlockAccessLists, BlockBodies, BlockHeaders, GetBlockAccessLists,
11    GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts,
12    GetReceipts70, NewPooledTransactionHashes66, NewPooledTransactionHashes68, NodeData,
13    PooledTransactions, Receipts, Status, StatusEth69, Transactions,
14};
15use crate::{
16    status::StatusMessage, BlockRangeUpdate, Cells, EthNetworkPrimitives, EthVersion, GetCells,
17    NetworkPrimitives, RawCapabilityMessage, Receipts69, Receipts70, SharedTransactions,
18};
19use alloc::{boxed::Box, string::String, sync::Arc};
20use alloy_primitives::{
21    bytes::{Buf, BufMut},
22    Bytes,
23};
24use alloy_rlp::{length_of_length, Decodable, Encodable, Header};
25use core::fmt::Debug;
26
27/// [`MAX_MESSAGE_SIZE`] is the maximum cap on the size of a protocol message.
28// https://github.com/ethereum/go-ethereum/blob/30602163d5d8321fbc68afdcbbaf2362b2641bde/eth/protocols/eth/protocol.go#L50
29pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
30
31/// Multiplier applied to `max_message_size` to derive the in-memory budget for decoding
32/// `Transactions` and `PooledTransactions` messages.
33///
34/// Decoded transactions expand relative to their RLP encoding due to struct overhead and heap
35/// allocations. With many peers in flight this can cause significant memory pressure, so we
36/// stop decoding once the cumulative in-memory size of decoded transactions exceeds
37/// `max_message_size * TX_MEMORY_BUDGET_MULTIPLIER`. Remaining transactions are silently dropped.
38pub const TX_MEMORY_BUDGET_MULTIPLIER: usize = 2;
39
40/// Error when sending/receiving a message
41#[derive(thiserror::Error, Debug)]
42pub enum MessageError {
43    /// Flags an unrecognized message ID for a given protocol version.
44    #[error("message id {1:?} is invalid for version {0:?}")]
45    Invalid(EthVersion, EthMessageID),
46    /// Expected a Status message but received a different message type.
47    #[error("expected status message but received {0:?}")]
48    ExpectedStatusMessage(EthMessageID),
49    /// Thrown when rlp decoding a message failed.
50    #[error("RLP error: {0}")]
51    RlpError(#[from] alloy_rlp::Error),
52    /// Other message error with custom message
53    #[error("{0}")]
54    Other(String),
55}
56
57/// An `eth` protocol message, containing a message ID and payload.
58#[derive(Clone, Debug, PartialEq, Eq)]
59#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
60pub struct ProtocolMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
61    /// The unique identifier representing the type of the Ethereum message.
62    pub message_type: EthMessageID,
63    /// The content of the message, including specific data based on the message type.
64    #[cfg_attr(
65        feature = "serde",
66        serde(bound = "EthMessage<N>: serde::Serialize + serde::de::DeserializeOwned")
67    )]
68    pub message: EthMessage<N>,
69}
70
71impl<N: NetworkPrimitives> ProtocolMessage<N> {
72    /// Decode only a Status message from RLP bytes.
73    ///
74    /// This is used during the eth handshake where only a Status message is a valid response.
75    /// Returns an error if the message is not a Status message.
76    pub fn decode_status(
77        version: EthVersion,
78        buf: &mut &[u8],
79    ) -> Result<StatusMessage, MessageError> {
80        let message_type = EthMessageID::decode(buf)?;
81
82        if message_type != EthMessageID::Status {
83            return Err(MessageError::ExpectedStatusMessage(message_type))
84        }
85
86        let status = if version < EthVersion::Eth69 {
87            StatusMessage::Legacy(Status::decode(buf)?)
88        } else {
89            StatusMessage::Eth69(StatusEth69::decode(buf)?)
90        };
91
92        Ok(status)
93    }
94
95    /// Create a new `ProtocolMessage` from a message type and message rlp bytes.
96    ///
97    /// This will enforce decoding according to the given [`EthVersion`] of the connection.
98    pub fn decode_message(version: EthVersion, buf: &mut &[u8]) -> Result<Self, MessageError> {
99        Self::decode_message_with_tx_memory_budget(version, buf, usize::MAX)
100    }
101
102    /// Like [`Self::decode_message`], but caps the cumulative in-memory size of decoded
103    /// transactions in `Transactions` and `PooledTransactions` messages. Once exceeded,
104    /// remaining transactions are silently dropped.
105    ///
106    /// Use [`TX_MEMORY_BUDGET_MULTIPLIER`] to derive a reasonable default.
107    pub fn decode_message_with_tx_memory_budget(
108        version: EthVersion,
109        buf: &mut &[u8],
110        tx_memory_budget: usize,
111    ) -> Result<Self, MessageError> {
112        let message_type = EthMessageID::decode(buf)?;
113
114        // For EIP-7642 (https://github.com/ethereum/EIPs/blob/master/EIPS/eip-7642.md):
115        // pre-merge (legacy) status messages include total difficulty, whereas eth/69 omits it.
116        let message = match message_type {
117            EthMessageID::Status => EthMessage::Status(if version < EthVersion::Eth69 {
118                StatusMessage::Legacy(Status::decode(buf)?)
119            } else {
120                StatusMessage::Eth69(StatusEth69::decode(buf)?)
121            }),
122            EthMessageID::NewBlockHashes => {
123                EthMessage::NewBlockHashes(NewBlockHashes::decode(buf)?)
124            }
125            EthMessageID::NewBlock => {
126                EthMessage::NewBlock(Box::new(N::NewBlockPayload::decode(buf)?))
127            }
128            EthMessageID::Transactions => EthMessage::Transactions(
129                Transactions::decode_with_memory_budget(buf, tx_memory_budget)?,
130            ),
131            EthMessageID::NewPooledTransactionHashes => {
132                if version >= EthVersion::Eth68 {
133                    EthMessage::NewPooledTransactionHashes68(NewPooledTransactionHashes68::decode(
134                        buf,
135                    )?)
136                } else {
137                    EthMessage::NewPooledTransactionHashes66(NewPooledTransactionHashes66::decode(
138                        buf,
139                    )?)
140                }
141            }
142            EthMessageID::GetBlockHeaders => EthMessage::GetBlockHeaders(RequestPair::decode(buf)?),
143            EthMessageID::BlockHeaders => EthMessage::BlockHeaders(RequestPair::decode(buf)?),
144            EthMessageID::GetBlockBodies => EthMessage::GetBlockBodies(RequestPair::decode(buf)?),
145            EthMessageID::BlockBodies => EthMessage::BlockBodies(RequestPair::decode(buf)?),
146            EthMessageID::GetPooledTransactions => {
147                EthMessage::GetPooledTransactions(RequestPair::decode(buf)?)
148            }
149            EthMessageID::PooledTransactions => {
150                EthMessage::PooledTransactions(RequestPair::decode_with(buf, |buf| {
151                    PooledTransactions::decode_with_memory_budget(buf, tx_memory_budget)
152                })?)
153            }
154            EthMessageID::GetNodeData => {
155                if version >= EthVersion::Eth67 {
156                    return Err(MessageError::Invalid(version, EthMessageID::GetNodeData))
157                }
158                EthMessage::GetNodeData(RequestPair::decode(buf)?)
159            }
160            EthMessageID::NodeData => {
161                if version >= EthVersion::Eth67 {
162                    return Err(MessageError::Invalid(version, EthMessageID::NodeData))
163                }
164                EthMessage::NodeData(RequestPair::decode(buf)?)
165            }
166            EthMessageID::GetReceipts => {
167                if version >= EthVersion::Eth70 {
168                    EthMessage::GetReceipts70(RequestPair::decode(buf)?)
169                } else {
170                    EthMessage::GetReceipts(RequestPair::decode(buf)?)
171                }
172            }
173            EthMessageID::Receipts => {
174                match version {
175                    v if v >= EthVersion::Eth70 => {
176                        // eth/70 continues to omit bloom filters and adds the
177                        // `lastBlockIncomplete` flag, encoded as
178                        // `[request-id, lastBlockIncomplete, [[receipt₁, receipt₂], ...]]`.
179                        EthMessage::Receipts70(RequestPair::decode(buf)?)
180                    }
181                    EthVersion::Eth69 => {
182                        // with eth69, receipts no longer include the bloom
183                        EthMessage::Receipts69(RequestPair::decode(buf)?)
184                    }
185                    _ => {
186                        // before eth69 we need to decode the bloom  as well
187                        EthMessage::Receipts(RequestPair::decode(buf)?)
188                    }
189                }
190            }
191            EthMessageID::BlockRangeUpdate => {
192                if version < EthVersion::Eth69 {
193                    return Err(MessageError::Invalid(version, EthMessageID::BlockRangeUpdate))
194                }
195                EthMessage::BlockRangeUpdate(BlockRangeUpdate::decode(buf)?)
196            }
197            EthMessageID::GetBlockAccessLists => {
198                if version < EthVersion::Eth71 {
199                    return Err(MessageError::Invalid(version, EthMessageID::GetBlockAccessLists))
200                }
201                EthMessage::GetBlockAccessLists(RequestPair::decode(buf)?)
202            }
203            EthMessageID::BlockAccessLists => {
204                if version < EthVersion::Eth71 {
205                    return Err(MessageError::Invalid(version, EthMessageID::BlockAccessLists))
206                }
207                EthMessage::BlockAccessLists(RequestPair::decode(buf)?)
208            }
209            EthMessageID::Cells => {
210                if version < EthVersion::Eth72 {
211                    return Err(MessageError::Invalid(version, EthMessageID::Cells))
212                }
213                EthMessage::Cells(RequestPair::decode(buf)?)
214            }
215            EthMessageID::GetCells => {
216                if version < EthVersion::Eth72 {
217                    return Err(MessageError::Invalid(version, EthMessageID::GetCells))
218                }
219                EthMessage::GetCells(RequestPair::decode(buf)?)
220            }
221            EthMessageID::Other(_) => {
222                let raw_payload = Bytes::copy_from_slice(buf);
223                buf.advance(raw_payload.len());
224                EthMessage::Other(RawCapabilityMessage::new(
225                    message_type.to_u8() as usize,
226                    raw_payload.into(),
227                ))
228            }
229        };
230        Ok(Self { message_type, message })
231    }
232}
233
234impl<N: NetworkPrimitives> Encodable for ProtocolMessage<N> {
235    /// Encodes the protocol message into bytes. The message type is encoded as a single byte and
236    /// prepended to the message.
237    fn encode(&self, out: &mut dyn BufMut) {
238        self.message_type.encode(out);
239        self.message.encode(out);
240    }
241    fn length(&self) -> usize {
242        self.message_type.length() + self.message.length()
243    }
244}
245
246impl<N: NetworkPrimitives> From<EthMessage<N>> for ProtocolMessage<N> {
247    fn from(message: EthMessage<N>) -> Self {
248        Self { message_type: message.message_id(), message }
249    }
250}
251
252/// Represents messages that can be sent to multiple peers.
253#[derive(Clone, Debug)]
254pub struct ProtocolBroadcastMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
255    /// The unique identifier representing the type of the Ethereum message.
256    pub message_type: EthMessageID,
257    /// The content of the message to be broadcasted, including specific data based on the message
258    /// type.
259    pub message: EthBroadcastMessage<N>,
260}
261
262impl<N: NetworkPrimitives> Encodable for ProtocolBroadcastMessage<N> {
263    /// Encodes the protocol message into bytes. The message type is encoded as a single byte and
264    /// prepended to the message.
265    fn encode(&self, out: &mut dyn BufMut) {
266        self.message_type.encode(out);
267        self.message.encode(out);
268    }
269    fn length(&self) -> usize {
270        self.message_type.length() + self.message.length()
271    }
272}
273
274impl<N: NetworkPrimitives> From<EthBroadcastMessage<N>> for ProtocolBroadcastMessage<N> {
275    fn from(message: EthBroadcastMessage<N>) -> Self {
276        Self { message_type: message.message_id(), message }
277    }
278}
279
280/// Represents a message in the eth wire protocol, versions 66, 67, 68 and 69.
281///
282/// The ethereum wire protocol is a set of messages that are broadcast to the network in two
283/// styles:
284///  * A request message sent by a peer (such as [`GetPooledTransactions`]), and an associated
285///    response message (such as [`PooledTransactions`]).
286///  * A message that is broadcast to the network, without a corresponding request.
287///
288/// The newer `eth/66` is an efficiency upgrade on top of `eth/65`, introducing a request id to
289/// correlate request-response message pairs. This allows for request multiplexing.
290///
291/// The `eth/67` is based on `eth/66` but only removes two messages, [`GetNodeData`] and
292/// [`NodeData`].
293///
294/// The `eth/68` changes only `NewPooledTransactionHashes` to include `types` and `sized`. For
295/// it, `NewPooledTransactionHashes` is renamed as [`NewPooledTransactionHashes66`] and
296/// [`NewPooledTransactionHashes68`] is defined.
297///
298/// The `eth/69` announces the historical block range served by the node. Removes total difficulty
299/// information. And removes the Bloom field from receipts transferred over the protocol.
300///
301/// The `eth/70` (EIP-7975) keeps the eth/69 status format and introduces partial receipts.
302/// requests/responses.
303///
304/// The `eth/71` draft extends eth/70 with block access list request/response messages.
305#[derive(Clone, Debug, PartialEq, Eq)]
306#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
307pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
308    /// Represents a Status message required for the protocol handshake.
309    Status(StatusMessage),
310    /// Represents a `NewBlockHashes` message broadcast to the network.
311    NewBlockHashes(NewBlockHashes),
312    /// Represents a `NewBlock` message broadcast to the network.
313    #[cfg_attr(
314        feature = "serde",
315        serde(bound = "N::NewBlockPayload: serde::Serialize + serde::de::DeserializeOwned")
316    )]
317    NewBlock(Box<N::NewBlockPayload>),
318    /// Represents a Transactions message broadcast to the network.
319    #[cfg_attr(
320        feature = "serde",
321        serde(bound = "N::BroadcastedTransaction: serde::Serialize + serde::de::DeserializeOwned")
322    )]
323    Transactions(Transactions<N::BroadcastedTransaction>),
324    /// Represents a `NewPooledTransactionHashes` message for eth/66 version.
325    NewPooledTransactionHashes66(NewPooledTransactionHashes66),
326    /// Represents a `NewPooledTransactionHashes` message for eth/68 version.
327    NewPooledTransactionHashes68(NewPooledTransactionHashes68),
328    // The following messages are request-response message pairs
329    /// Represents a `GetBlockHeaders` request-response pair.
330    GetBlockHeaders(RequestPair<GetBlockHeaders>),
331    /// Represents a `BlockHeaders` request-response pair.
332    #[cfg_attr(
333        feature = "serde",
334        serde(bound = "N::BlockHeader: serde::Serialize + serde::de::DeserializeOwned")
335    )]
336    BlockHeaders(RequestPair<BlockHeaders<N::BlockHeader>>),
337    /// Represents a `GetBlockBodies` request-response pair.
338    GetBlockBodies(RequestPair<GetBlockBodies>),
339    /// Represents a `BlockBodies` request-response pair.
340    #[cfg_attr(
341        feature = "serde",
342        serde(bound = "N::BlockBody: serde::Serialize + serde::de::DeserializeOwned")
343    )]
344    BlockBodies(RequestPair<BlockBodies<N::BlockBody>>),
345    /// Represents a `GetPooledTransactions` request-response pair.
346    GetPooledTransactions(RequestPair<GetPooledTransactions>),
347    /// Represents a `PooledTransactions` request-response pair.
348    #[cfg_attr(
349        feature = "serde",
350        serde(bound = "N::PooledTransaction: serde::Serialize + serde::de::DeserializeOwned")
351    )]
352    PooledTransactions(RequestPair<PooledTransactions<N::PooledTransaction>>),
353    /// Represents a `GetNodeData` request-response pair.
354    GetNodeData(RequestPair<GetNodeData>),
355    /// Represents a `NodeData` request-response pair.
356    NodeData(RequestPair<NodeData>),
357    /// Represents a `GetReceipts` request-response pair.
358    GetReceipts(RequestPair<GetReceipts>),
359    /// Represents a `GetReceipts` request for eth/70.
360    ///
361    /// Note: Unlike earlier protocol versions, the eth/70 encoding for
362    /// `GetReceipts` in EIP-7975 inlines the request id. The type still wraps
363    /// a [`RequestPair`], but with a custom inline encoding.
364    GetReceipts70(RequestPair<GetReceipts70>),
365    /// Represents a `GetBlockAccessLists` request-response pair for eth/71.
366    GetBlockAccessLists(RequestPair<GetBlockAccessLists>),
367    /// Represents a Receipts request-response pair.
368    #[cfg_attr(
369        feature = "serde",
370        serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
371    )]
372    Receipts(RequestPair<Receipts<N::Receipt>>),
373    /// Represents a Receipts request-response pair for eth/69.
374    #[cfg_attr(
375        feature = "serde",
376        serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
377    )]
378    Receipts69(RequestPair<Receipts69<N::Receipt>>),
379    /// Represents a Receipts request-response pair for eth/70.
380    #[cfg_attr(
381        feature = "serde",
382        serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
383    )]
384    ///
385    /// Note: The eth/70 encoding for `Receipts` in EIP-7975 inlines the
386    /// request id. The type still wraps a [`RequestPair`], but with a custom
387    /// inline encoding.
388    Receipts70(RequestPair<Receipts70<N::Receipt>>),
389    /// Represents a `BlockAccessLists` request-response pair for eth/71.
390    BlockAccessLists(RequestPair<BlockAccessLists>),
391    /// Represents a `Cells` request-response pair for eth/72.
392    Cells(RequestPair<Cells>),
393    /// Represents a `GetCells` request-response pair for eth/72.
394    GetCells(RequestPair<GetCells>),
395    /// Represents a `BlockRangeUpdate` message broadcast to the network.
396    #[cfg_attr(
397        feature = "serde",
398        serde(bound = "N::BroadcastedTransaction: serde::Serialize + serde::de::DeserializeOwned")
399    )]
400    BlockRangeUpdate(BlockRangeUpdate),
401    /// Represents an encoded message that doesn't match any other variant
402    Other(RawCapabilityMessage),
403}
404
405impl<N: NetworkPrimitives> EthMessage<N> {
406    /// Returns the message's ID.
407    pub const fn message_id(&self) -> EthMessageID {
408        match self {
409            Self::Status(_) => EthMessageID::Status,
410            Self::NewBlockHashes(_) => EthMessageID::NewBlockHashes,
411            Self::NewBlock(_) => EthMessageID::NewBlock,
412            Self::Transactions(_) => EthMessageID::Transactions,
413            Self::NewPooledTransactionHashes66(_) | Self::NewPooledTransactionHashes68(_) => {
414                EthMessageID::NewPooledTransactionHashes
415            }
416            Self::GetBlockHeaders(_) => EthMessageID::GetBlockHeaders,
417            Self::BlockHeaders(_) => EthMessageID::BlockHeaders,
418            Self::GetBlockBodies(_) => EthMessageID::GetBlockBodies,
419            Self::BlockBodies(_) => EthMessageID::BlockBodies,
420            Self::GetPooledTransactions(_) => EthMessageID::GetPooledTransactions,
421            Self::PooledTransactions(_) => EthMessageID::PooledTransactions,
422            Self::GetNodeData(_) => EthMessageID::GetNodeData,
423            Self::NodeData(_) => EthMessageID::NodeData,
424            Self::GetReceipts(_) | Self::GetReceipts70(_) => EthMessageID::GetReceipts,
425            Self::Receipts(_) | Self::Receipts69(_) | Self::Receipts70(_) => EthMessageID::Receipts,
426            Self::BlockRangeUpdate(_) => EthMessageID::BlockRangeUpdate,
427            Self::GetBlockAccessLists(_) => EthMessageID::GetBlockAccessLists,
428            Self::BlockAccessLists(_) => EthMessageID::BlockAccessLists,
429            Self::Cells(_) => EthMessageID::Cells,
430            Self::GetCells(_) => EthMessageID::GetCells,
431            Self::Other(msg) => EthMessageID::Other(msg.id as u8),
432        }
433    }
434
435    /// Returns true if the message variant is a request.
436    pub const fn is_request(&self) -> bool {
437        matches!(
438            self,
439            Self::GetBlockBodies(_) |
440                Self::GetBlockHeaders(_) |
441                Self::GetReceipts(_) |
442                Self::GetReceipts70(_) |
443                Self::GetBlockAccessLists(_) |
444                Self::GetCells(_) |
445                Self::GetPooledTransactions(_) |
446                Self::GetNodeData(_)
447        )
448    }
449
450    /// Returns true if the message variant is a response to a request.
451    pub const fn is_response(&self) -> bool {
452        matches!(
453            self,
454            Self::PooledTransactions(_) |
455                Self::Receipts(_) |
456                Self::Receipts69(_) |
457                Self::Receipts70(_) |
458                Self::BlockAccessLists(_) |
459                Self::BlockHeaders(_) |
460                Self::BlockBodies(_) |
461                Self::NodeData(_) |
462                Self::Cells(_)
463        )
464    }
465
466    /// Converts the message types where applicable.
467    ///
468    /// This handles up/downcasting where appropriate, for example for different receipt request
469    /// types.
470    pub fn map_versioned(self, version: EthVersion) -> Self {
471        // For eth/70 peers we send `GetReceipts` using the new eth/70
472        // encoding with `firstBlockReceiptIndex = 0`, while keeping the
473        // user-facing `PeerRequest` API unchanged.
474        if version >= EthVersion::Eth70 {
475            return match self {
476                Self::GetReceipts(pair) => {
477                    let RequestPair { request_id, message } = pair;
478                    let req = RequestPair {
479                        request_id,
480                        message: GetReceipts70 {
481                            first_block_receipt_index: 0,
482                            block_hashes: message.0,
483                        },
484                    };
485                    Self::GetReceipts70(req)
486                }
487                other => other,
488            }
489        }
490
491        self
492    }
493}
494
495impl<N: NetworkPrimitives> Encodable for EthMessage<N> {
496    fn encode(&self, out: &mut dyn BufMut) {
497        match self {
498            Self::Status(status) => status.encode(out),
499            Self::NewBlockHashes(new_block_hashes) => new_block_hashes.encode(out),
500            Self::NewBlock(new_block) => new_block.encode(out),
501            Self::Transactions(transactions) => transactions.encode(out),
502            Self::NewPooledTransactionHashes66(hashes) => hashes.encode(out),
503            Self::NewPooledTransactionHashes68(hashes) => hashes.encode(out),
504            Self::GetBlockHeaders(request) => request.encode(out),
505            Self::BlockHeaders(headers) => headers.encode(out),
506            Self::GetBlockBodies(request) => request.encode(out),
507            Self::BlockBodies(bodies) => bodies.encode(out),
508            Self::GetPooledTransactions(request) => request.encode(out),
509            Self::PooledTransactions(transactions) => transactions.encode(out),
510            Self::GetNodeData(request) => request.encode(out),
511            Self::NodeData(data) => data.encode(out),
512            Self::GetReceipts(request) => request.encode(out),
513            Self::GetReceipts70(request) => request.encode(out),
514            Self::GetBlockAccessLists(request) => request.encode(out),
515            Self::GetCells(request) => request.encode(out),
516            Self::Receipts(receipts) => receipts.encode(out),
517            Self::Receipts69(receipt69) => receipt69.encode(out),
518            Self::Receipts70(receipt70) => receipt70.encode(out),
519            Self::BlockAccessLists(block_access_lists) => block_access_lists.encode(out),
520            Self::BlockRangeUpdate(block_range_update) => block_range_update.encode(out),
521            Self::Cells(cells) => cells.encode(out),
522            Self::Other(unknown) => out.put_slice(&unknown.payload),
523        }
524    }
525    fn length(&self) -> usize {
526        match self {
527            Self::Status(status) => status.length(),
528            Self::NewBlockHashes(new_block_hashes) => new_block_hashes.length(),
529            Self::NewBlock(new_block) => new_block.length(),
530            Self::Transactions(transactions) => transactions.length(),
531            Self::NewPooledTransactionHashes66(hashes) => hashes.length(),
532            Self::NewPooledTransactionHashes68(hashes) => hashes.length(),
533            Self::GetBlockHeaders(request) => request.length(),
534            Self::BlockHeaders(headers) => headers.length(),
535            Self::GetBlockBodies(request) => request.length(),
536            Self::BlockBodies(bodies) => bodies.length(),
537            Self::GetPooledTransactions(request) => request.length(),
538            Self::PooledTransactions(transactions) => transactions.length(),
539            Self::GetNodeData(request) => request.length(),
540            Self::NodeData(data) => data.length(),
541            Self::GetReceipts(request) => request.length(),
542            Self::GetReceipts70(request) => request.length(),
543            Self::GetBlockAccessLists(request) => request.length(),
544            Self::GetCells(request) => request.length(),
545            Self::Receipts(receipts) => receipts.length(),
546            Self::Receipts69(receipt69) => receipt69.length(),
547            Self::Receipts70(receipt70) => receipt70.length(),
548            Self::BlockAccessLists(block_access_lists) => block_access_lists.length(),
549            Self::BlockRangeUpdate(block_range_update) => block_range_update.length(),
550            Self::Cells(cells) => cells.length(),
551            Self::Other(unknown) => unknown.length(),
552        }
553    }
554}
555
556/// Represents broadcast messages of [`EthMessage`] with the same object that can be sent to
557/// multiple peers.
558///
559/// Messages that contain a list of hashes depend on the peer the message is sent to. A peer should
560/// never receive a hash of an object (block, transaction) it has already seen.
561///
562/// Note: This is only useful for outgoing messages.
563#[derive(Clone, Debug, PartialEq, Eq)]
564pub enum EthBroadcastMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
565    /// Represents a new block broadcast message.
566    NewBlock(Arc<N::NewBlockPayload>),
567    /// Represents a transactions broadcast message.
568    Transactions(SharedTransactions<N::BroadcastedTransaction>),
569}
570
571// === impl EthBroadcastMessage ===
572
573impl<N: NetworkPrimitives> EthBroadcastMessage<N> {
574    /// Returns the message's ID.
575    pub const fn message_id(&self) -> EthMessageID {
576        match self {
577            Self::NewBlock(_) => EthMessageID::NewBlock,
578            Self::Transactions(_) => EthMessageID::Transactions,
579        }
580    }
581}
582
583impl<N: NetworkPrimitives> Encodable for EthBroadcastMessage<N> {
584    fn encode(&self, out: &mut dyn BufMut) {
585        match self {
586            Self::NewBlock(new_block) => new_block.encode(out),
587            Self::Transactions(transactions) => transactions.encode(out),
588        }
589    }
590
591    fn length(&self) -> usize {
592        match self {
593            Self::NewBlock(new_block) => new_block.length(),
594            Self::Transactions(transactions) => transactions.length(),
595        }
596    }
597}
598
599/// Represents message IDs for eth protocol messages.
600#[repr(u8)]
601#[derive(Clone, Copy, Debug, PartialEq, Eq)]
602#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
603pub enum EthMessageID {
604    /// Status message.
605    Status = 0x00,
606    /// New block hashes message.
607    NewBlockHashes = 0x01,
608    /// Transactions message.
609    Transactions = 0x02,
610    /// Get block headers message.
611    GetBlockHeaders = 0x03,
612    /// Block headers message.
613    BlockHeaders = 0x04,
614    /// Get block bodies message.
615    GetBlockBodies = 0x05,
616    /// Block bodies message.
617    BlockBodies = 0x06,
618    /// New block message.
619    NewBlock = 0x07,
620    /// New pooled transaction hashes message.
621    NewPooledTransactionHashes = 0x08,
622    /// Requests pooled transactions.
623    GetPooledTransactions = 0x09,
624    /// Represents pooled transactions.
625    PooledTransactions = 0x0a,
626    /// Requests node data.
627    GetNodeData = 0x0d,
628    /// Represents node data.
629    NodeData = 0x0e,
630    /// Requests receipts.
631    GetReceipts = 0x0f,
632    /// Represents receipts.
633    Receipts = 0x10,
634    /// Block range update.
635    ///
636    /// Introduced in Eth69
637    BlockRangeUpdate = 0x11,
638    /// Requests block access lists.
639    ///
640    /// Introduced in Eth71
641    GetBlockAccessLists = 0x12,
642    /// Represents block access lists.
643    ///
644    /// Introduced in Eth71
645    BlockAccessLists = 0x13,
646
647    /// Requests cells.
648    ///
649    /// Introduced in Eth72
650    GetCells = 0x14,
651    /// Represents Cells
652    ///
653    /// Introduced in Eth72
654    Cells = 0x15,
655    /// Represents unknown message types.
656    Other(u8),
657}
658
659impl EthMessageID {
660    /// Returns the corresponding `u8` value for an `EthMessageID`.
661    pub const fn to_u8(&self) -> u8 {
662        match self {
663            Self::Status => 0x00,
664            Self::NewBlockHashes => 0x01,
665            Self::Transactions => 0x02,
666            Self::GetBlockHeaders => 0x03,
667            Self::BlockHeaders => 0x04,
668            Self::GetBlockBodies => 0x05,
669            Self::BlockBodies => 0x06,
670            Self::NewBlock => 0x07,
671            Self::NewPooledTransactionHashes => 0x08,
672            Self::GetPooledTransactions => 0x09,
673            Self::PooledTransactions => 0x0a,
674            Self::GetNodeData => 0x0d,
675            Self::NodeData => 0x0e,
676            Self::GetReceipts => 0x0f,
677            Self::Receipts => 0x10,
678            Self::BlockRangeUpdate => 0x11,
679            Self::GetBlockAccessLists => 0x12,
680            Self::BlockAccessLists => 0x13,
681            Self::GetCells => 0x14,
682            Self::Cells => 0x15,
683            Self::Other(value) => *value, // Return the stored `u8`
684        }
685    }
686
687    /// Returns the max value for the given version.
688    pub const fn max(version: EthVersion) -> u8 {
689        if version.is_eth72() {
690            Self::Cells.to_u8()
691        } else if version.is_eth71() {
692            Self::BlockAccessLists.to_u8()
693        } else if version.is_eth69_or_newer() {
694            Self::BlockRangeUpdate.to_u8()
695        } else {
696            Self::Receipts.to_u8()
697        }
698    }
699
700    /// Returns the total number of message types for the given version.
701    ///
702    /// This is used for message ID multiplexing.
703    ///
704    /// <https://github.com/ethereum/go-ethereum/blob/85077be58edea572f29c3b1a6a055077f1a56a8b/eth/protocols/eth/protocol.go#L45-L47>
705    pub const fn message_count(version: EthVersion) -> u8 {
706        Self::max(version) + 1
707    }
708}
709
710impl Encodable for EthMessageID {
711    fn encode(&self, out: &mut dyn BufMut) {
712        out.put_u8(self.to_u8());
713    }
714    fn length(&self) -> usize {
715        1
716    }
717}
718
719impl Decodable for EthMessageID {
720    fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
721        let id = match buf.first().ok_or(alloy_rlp::Error::InputTooShort)? {
722            0x00 => Self::Status,
723            0x01 => Self::NewBlockHashes,
724            0x02 => Self::Transactions,
725            0x03 => Self::GetBlockHeaders,
726            0x04 => Self::BlockHeaders,
727            0x05 => Self::GetBlockBodies,
728            0x06 => Self::BlockBodies,
729            0x07 => Self::NewBlock,
730            0x08 => Self::NewPooledTransactionHashes,
731            0x09 => Self::GetPooledTransactions,
732            0x0a => Self::PooledTransactions,
733            0x0d => Self::GetNodeData,
734            0x0e => Self::NodeData,
735            0x0f => Self::GetReceipts,
736            0x10 => Self::Receipts,
737            0x11 => Self::BlockRangeUpdate,
738            0x12 => Self::GetBlockAccessLists,
739            0x13 => Self::BlockAccessLists,
740            0x14 => Self::GetCells,
741            0x15 => Self::Cells,
742            unknown => Self::Other(*unknown),
743        };
744        buf.advance(1);
745        Ok(id)
746    }
747}
748
749impl TryFrom<usize> for EthMessageID {
750    type Error = &'static str;
751
752    fn try_from(value: usize) -> Result<Self, Self::Error> {
753        match value {
754            0x00 => Ok(Self::Status),
755            0x01 => Ok(Self::NewBlockHashes),
756            0x02 => Ok(Self::Transactions),
757            0x03 => Ok(Self::GetBlockHeaders),
758            0x04 => Ok(Self::BlockHeaders),
759            0x05 => Ok(Self::GetBlockBodies),
760            0x06 => Ok(Self::BlockBodies),
761            0x07 => Ok(Self::NewBlock),
762            0x08 => Ok(Self::NewPooledTransactionHashes),
763            0x09 => Ok(Self::GetPooledTransactions),
764            0x0a => Ok(Self::PooledTransactions),
765            0x0d => Ok(Self::GetNodeData),
766            0x0e => Ok(Self::NodeData),
767            0x0f => Ok(Self::GetReceipts),
768            0x10 => Ok(Self::Receipts),
769            0x11 => Ok(Self::BlockRangeUpdate),
770            0x12 => Ok(Self::GetBlockAccessLists),
771            0x13 => Ok(Self::BlockAccessLists),
772            0x14 => Ok(Self::GetCells),
773            0x15 => Ok(Self::Cells),
774            _ => Err("Invalid message ID"),
775        }
776    }
777}
778
779/// This is used for all request-response style `eth` protocol messages.
780/// This can represent either a request or a response, since both include a message payload and
781/// request id.
782#[derive(Clone, Debug, PartialEq, Eq)]
783#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
784#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
785pub struct RequestPair<T> {
786    /// id for the contained request or response message
787    pub request_id: u64,
788
789    /// the request or response message payload
790    pub message: T,
791}
792
793impl<T> RequestPair<T> {
794    /// Converts the message type with the given closure.
795    pub fn map<F, R>(self, f: F) -> RequestPair<R>
796    where
797        F: FnOnce(T) -> R,
798    {
799        let Self { request_id, message } = self;
800        RequestPair { request_id, message: f(message) }
801    }
802
803    /// Decodes the request id and then decodes the message payload using `decode_msg`.
804    pub fn decode_with<F>(buf: &mut &[u8], decode_msg: F) -> alloy_rlp::Result<Self>
805    where
806        F: FnOnce(&mut &[u8]) -> alloy_rlp::Result<T>,
807    {
808        let header = Header::decode(buf)?;
809
810        let initial_length = buf.len();
811        let request_id = u64::decode(buf)?;
812        let message = decode_msg(buf)?;
813
814        let consumed_len = initial_length - buf.len();
815        if consumed_len != header.payload_length {
816            return Err(alloy_rlp::Error::UnexpectedLength)
817        }
818
819        Ok(Self { request_id, message })
820    }
821}
822
823/// Allows messages with request ids to be serialized into RLP bytes.
824impl<T> Encodable for RequestPair<T>
825where
826    T: Encodable,
827{
828    fn encode(&self, out: &mut dyn alloy_rlp::BufMut) {
829        let header =
830            Header { list: true, payload_length: self.request_id.length() + self.message.length() };
831
832        header.encode(out);
833        self.request_id.encode(out);
834        self.message.encode(out);
835    }
836
837    fn length(&self) -> usize {
838        let mut length = 0;
839        length += self.request_id.length();
840        length += self.message.length();
841        length += length_of_length(length);
842        length
843    }
844}
845
846/// Allows messages with request ids to be deserialized into RLP bytes.
847impl<T> Decodable for RequestPair<T>
848where
849    T: Decodable,
850{
851    fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
852        let header = Header::decode(buf)?;
853
854        let initial_length = buf.len();
855        let request_id = u64::decode(buf)?;
856        let message = T::decode(buf)?;
857
858        // Check that the buffer consumed exactly payload_length bytes after decoding the
859        // RequestPair
860        let consumed_len = initial_length - buf.len();
861        if consumed_len != header.payload_length {
862            return Err(alloy_rlp::Error::UnexpectedLength)
863        }
864
865        Ok(Self { request_id, message })
866    }
867}
868
869#[cfg(test)]
870mod tests {
871    use super::MessageError;
872    use crate::{
873        message::RequestPair, BlockAccessLists, EthMessage, EthMessageID, EthNetworkPrimitives,
874        EthVersion, GetBlockAccessLists, GetNodeData, NodeData, ProtocolMessage,
875        RawCapabilityMessage,
876    };
877    use alloy_primitives::hex;
878    use alloy_rlp::{Decodable, Encodable, Error};
879    use reth_ethereum_primitives::BlockBody;
880
881    fn encode<T: Encodable>(value: T) -> Vec<u8> {
882        let mut buf = vec![];
883        value.encode(&mut buf);
884        buf
885    }
886
887    #[test]
888    fn test_removed_message_at_eth67() {
889        let get_node_data = EthMessage::<EthNetworkPrimitives>::GetNodeData(RequestPair {
890            request_id: 1337,
891            message: GetNodeData(vec![]),
892        });
893        let buf = encode(ProtocolMessage {
894            message_type: EthMessageID::GetNodeData,
895            message: get_node_data,
896        });
897        let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
898            crate::EthVersion::Eth67,
899            &mut &buf[..],
900        );
901        assert!(matches!(msg, Err(MessageError::Invalid(..))));
902
903        let node_data = EthMessage::<EthNetworkPrimitives>::NodeData(RequestPair {
904            request_id: 1337,
905            message: NodeData(vec![]),
906        });
907        let buf =
908            encode(ProtocolMessage { message_type: EthMessageID::NodeData, message: node_data });
909        let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
910            crate::EthVersion::Eth67,
911            &mut &buf[..],
912        );
913        assert!(matches!(msg, Err(MessageError::Invalid(..))));
914    }
915
916    #[test]
917    fn test_bal_message_version_gating() {
918        let get_block_access_lists =
919            EthMessage::<EthNetworkPrimitives>::GetBlockAccessLists(RequestPair {
920                request_id: 1337,
921                message: GetBlockAccessLists(vec![]),
922            });
923        let buf = encode(ProtocolMessage {
924            message_type: EthMessageID::GetBlockAccessLists,
925            message: get_block_access_lists,
926        });
927        let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
928            EthVersion::Eth70,
929            &mut &buf[..],
930        );
931        assert!(matches!(
932            msg,
933            Err(MessageError::Invalid(EthVersion::Eth70, EthMessageID::GetBlockAccessLists))
934        ));
935
936        let block_access_lists =
937            EthMessage::<EthNetworkPrimitives>::BlockAccessLists(RequestPair {
938                request_id: 1337,
939                message: BlockAccessLists(vec![]),
940            });
941        let buf = encode(ProtocolMessage {
942            message_type: EthMessageID::BlockAccessLists,
943            message: block_access_lists,
944        });
945        let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
946            EthVersion::Eth70,
947            &mut &buf[..],
948        );
949        assert!(matches!(
950            msg,
951            Err(MessageError::Invalid(EthVersion::Eth70, EthMessageID::BlockAccessLists))
952        ));
953    }
954
955    #[test]
956    fn test_bal_message_eth71_roundtrip() {
957        let msg = ProtocolMessage::from(EthMessage::<EthNetworkPrimitives>::GetBlockAccessLists(
958            RequestPair { request_id: 42, message: GetBlockAccessLists(vec![]) },
959        ));
960        let encoded = encode(msg.clone());
961        let decoded = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
962            EthVersion::Eth71,
963            &mut &encoded[..],
964        )
965        .unwrap();
966
967        assert_eq!(decoded, msg);
968    }
969
970    #[test]
971    fn request_pair_encode() {
972        let request_pair = RequestPair { request_id: 1337, message: vec![5u8] };
973
974        // c5: start of list (c0) + len(full_list) (length is <55 bytes)
975        // 82: 0x80 + len(1337)
976        // 05 39: 1337 (request_id)
977        // === full_list ===
978        // c1: start of list (c0) + len(list) (length is <55 bytes)
979        // 05: 5 (message)
980        let expected = hex!("c5820539c105");
981        let got = encode(request_pair);
982        assert_eq!(expected[..], got, "expected: {expected:X?}, got: {got:X?}",);
983    }
984
985    #[test]
986    fn request_pair_decode() {
987        let raw_pair = &hex!("c5820539c105")[..];
988
989        let expected = RequestPair { request_id: 1337, message: vec![5u8] };
990
991        let got = RequestPair::<Vec<u8>>::decode(&mut &*raw_pair).unwrap();
992        assert_eq!(expected.length(), raw_pair.len());
993        assert_eq!(expected, got);
994    }
995
996    #[test]
997    fn malicious_request_pair_decode() {
998        // A maliciously encoded request pair, where the len(full_list) is 5, but it
999        // actually consumes 6 bytes when decoding
1000        //
1001        // c5: start of list (c0) + len(full_list) (length is <55 bytes)
1002        // 82: 0x80 + len(1337)
1003        // 05 39: 1337 (request_id)
1004        // === full_list ===
1005        // c2: start of list (c0) + len(list) (length is <55 bytes)
1006        // 05 05: 5 5(message)
1007        let raw_pair = &hex!("c5820539c20505")[..];
1008
1009        let result = RequestPair::<Vec<u8>>::decode(&mut &*raw_pair);
1010        assert!(matches!(result, Err(Error::UnexpectedLength)));
1011    }
1012
1013    #[test]
1014    fn empty_block_bodies_protocol() {
1015        let empty_block_bodies =
1016            ProtocolMessage::from(EthMessage::<EthNetworkPrimitives>::BlockBodies(RequestPair {
1017                request_id: 0,
1018                message: Default::default(),
1019            }));
1020        let mut buf = Vec::new();
1021        empty_block_bodies.encode(&mut buf);
1022        let decoded =
1023            ProtocolMessage::decode_message(EthVersion::Eth68, &mut buf.as_slice()).unwrap();
1024        assert_eq!(empty_block_bodies, decoded);
1025    }
1026
1027    #[test]
1028    fn empty_block_body_protocol() {
1029        let empty_block_bodies =
1030            ProtocolMessage::from(EthMessage::<EthNetworkPrimitives>::BlockBodies(RequestPair {
1031                request_id: 0,
1032                message: vec![BlockBody {
1033                    transactions: vec![],
1034                    ommers: vec![],
1035                    withdrawals: Some(Default::default()),
1036                }]
1037                .into(),
1038            }));
1039        let mut buf = Vec::new();
1040        empty_block_bodies.encode(&mut buf);
1041        let decoded =
1042            ProtocolMessage::decode_message(EthVersion::Eth68, &mut buf.as_slice()).unwrap();
1043        assert_eq!(empty_block_bodies, decoded);
1044    }
1045
1046    #[test]
1047    fn decode_block_bodies_message() {
1048        let buf = hex!("06c48199c1c0");
1049        let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
1050            EthVersion::Eth68,
1051            &mut &buf[..],
1052        )
1053        .unwrap_err();
1054        assert!(matches!(msg, MessageError::RlpError(alloy_rlp::Error::InputTooShort)));
1055    }
1056
1057    #[test]
1058    fn custom_message_roundtrip() {
1059        let custom_payload = vec![1, 2, 3, 4, 5];
1060        let custom_message = RawCapabilityMessage::new(0x20, custom_payload.into());
1061        let protocol_message = ProtocolMessage::<EthNetworkPrimitives> {
1062            message_type: EthMessageID::Other(0x20),
1063            message: EthMessage::Other(custom_message),
1064        };
1065
1066        let encoded = encode(protocol_message.clone());
1067        let decoded = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
1068            EthVersion::Eth68,
1069            &mut &encoded[..],
1070        )
1071        .unwrap();
1072
1073        assert_eq!(protocol_message, decoded);
1074    }
1075
1076    #[test]
1077    fn custom_message_empty_payload_roundtrip() {
1078        let custom_message = RawCapabilityMessage::new(0x30, vec![].into());
1079        let protocol_message = ProtocolMessage::<EthNetworkPrimitives> {
1080            message_type: EthMessageID::Other(0x30),
1081            message: EthMessage::Other(custom_message),
1082        };
1083
1084        let encoded = encode(protocol_message.clone());
1085        let decoded = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
1086            EthVersion::Eth68,
1087            &mut &encoded[..],
1088        )
1089        .unwrap();
1090
1091        assert_eq!(protocol_message, decoded);
1092    }
1093
1094    #[test]
1095    fn decode_status_success() {
1096        use crate::{Status, StatusMessage};
1097        use alloy_hardforks::{ForkHash, ForkId};
1098        use alloy_primitives::{B256, U256};
1099
1100        let status = Status {
1101            version: EthVersion::Eth68,
1102            chain: alloy_chains::Chain::mainnet(),
1103            total_difficulty: U256::from(100u64),
1104            blockhash: B256::random(),
1105            genesis: B256::random(),
1106            forkid: ForkId { hash: ForkHash([0xb7, 0x15, 0x07, 0x7d]), next: 0 },
1107        };
1108
1109        let protocol_message = ProtocolMessage::<EthNetworkPrimitives>::from(EthMessage::Status(
1110            StatusMessage::Legacy(status),
1111        ));
1112        let encoded = encode(protocol_message);
1113
1114        let decoded = ProtocolMessage::<EthNetworkPrimitives>::decode_status(
1115            EthVersion::Eth68,
1116            &mut &encoded[..],
1117        )
1118        .unwrap();
1119
1120        assert!(matches!(decoded, StatusMessage::Legacy(s) if s == status));
1121    }
1122
1123    #[test]
1124    fn eth_message_id_max_includes_block_range_update() {
1125        assert_eq!(EthMessageID::max(EthVersion::Eth69), EthMessageID::BlockRangeUpdate.to_u8(),);
1126        assert_eq!(EthMessageID::max(EthVersion::Eth70), EthMessageID::BlockRangeUpdate.to_u8(),);
1127        assert_eq!(EthMessageID::max(EthVersion::Eth68), EthMessageID::Receipts.to_u8());
1128    }
1129
1130    #[test]
1131    fn decode_status_rejects_non_status() {
1132        let msg = EthMessage::<EthNetworkPrimitives>::GetBlockBodies(RequestPair {
1133            request_id: 1,
1134            message: crate::GetBlockBodies::default(),
1135        });
1136        let protocol_message =
1137            ProtocolMessage { message_type: EthMessageID::GetBlockBodies, message: msg };
1138        let encoded = encode(protocol_message);
1139
1140        let result = ProtocolMessage::<EthNetworkPrimitives>::decode_status(
1141            EthVersion::Eth68,
1142            &mut &encoded[..],
1143        );
1144
1145        assert!(matches!(
1146            result,
1147            Err(MessageError::ExpectedStatusMessage(EthMessageID::GetBlockBodies))
1148        ));
1149    }
1150}