reth_eth_wire_types/
message.rs

1//! Implements Ethereum wire protocol for versions 66 through 70.
2//! Defines structs/enums for messages, request-response pairs, and broadcasts.
3//! Handles compatibility with [`EthVersion`].
4//!
5//! Examples include creating, encoding, and decoding protocol messages.
6//!
7//! Reference: [Ethereum Wire Protocol](https://github.com/ethereum/devp2p/blob/master/caps/eth.md).
8
9use super::{
10    broadcast::NewBlockHashes, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders,
11    GetNodeData, GetPooledTransactions, GetReceipts, GetReceipts70, NewPooledTransactionHashes66,
12    NewPooledTransactionHashes68, NodeData, PooledTransactions, Receipts, Status, StatusEth69,
13    Transactions,
14};
15use crate::{
16    status::StatusMessage, BlockRangeUpdate, EthNetworkPrimitives, EthVersion, NetworkPrimitives,
17    RawCapabilityMessage, Receipts69, Receipts70, SharedTransactions,
18};
19use alloc::{boxed::Box, string::String, sync::Arc};
20use alloy_primitives::{
21    bytes::{Buf, BufMut},
22    Bytes,
23};
24use alloy_rlp::{length_of_length, Decodable, Encodable, Header};
25use core::fmt::Debug;
26
27/// [`MAX_MESSAGE_SIZE`] is the maximum cap on the size of a protocol message.
28// https://github.com/ethereum/go-ethereum/blob/30602163d5d8321fbc68afdcbbaf2362b2641bde/eth/protocols/eth/protocol.go#L50
29pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
30
31/// Error when sending/receiving a message
32#[derive(thiserror::Error, Debug)]
33pub enum MessageError {
34    /// Flags an unrecognized message ID for a given protocol version.
35    #[error("message id {1:?} is invalid for version {0:?}")]
36    Invalid(EthVersion, EthMessageID),
37    /// Thrown when rlp decoding a message failed.
38    #[error("RLP error: {0}")]
39    RlpError(#[from] alloy_rlp::Error),
40    /// Other message error with custom message
41    #[error("{0}")]
42    Other(String),
43}
44
45/// An `eth` protocol message, containing a message ID and payload.
46#[derive(Clone, Debug, PartialEq, Eq)]
47#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
48pub struct ProtocolMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
49    /// The unique identifier representing the type of the Ethereum message.
50    pub message_type: EthMessageID,
51    /// The content of the message, including specific data based on the message type.
52    #[cfg_attr(
53        feature = "serde",
54        serde(bound = "EthMessage<N>: serde::Serialize + serde::de::DeserializeOwned")
55    )]
56    pub message: EthMessage<N>,
57}
58
59impl<N: NetworkPrimitives> ProtocolMessage<N> {
60    /// Create a new `ProtocolMessage` from a message type and message rlp bytes.
61    ///
62    /// This will enforce decoding according to the given [`EthVersion`] of the connection.
63    pub fn decode_message(version: EthVersion, buf: &mut &[u8]) -> Result<Self, MessageError> {
64        let message_type = EthMessageID::decode(buf)?;
65
66        // For EIP-7642 (https://github.com/ethereum/EIPs/blob/master/EIPS/eip-7642.md):
67        // pre-merge (legacy) status messages include total difficulty, whereas eth/69 omits it.
68        let message = match message_type {
69            EthMessageID::Status => EthMessage::Status(if version < EthVersion::Eth69 {
70                StatusMessage::Legacy(Status::decode(buf)?)
71            } else {
72                StatusMessage::Eth69(StatusEth69::decode(buf)?)
73            }),
74            EthMessageID::NewBlockHashes => {
75                EthMessage::NewBlockHashes(NewBlockHashes::decode(buf)?)
76            }
77            EthMessageID::NewBlock => {
78                EthMessage::NewBlock(Box::new(N::NewBlockPayload::decode(buf)?))
79            }
80            EthMessageID::Transactions => EthMessage::Transactions(Transactions::decode(buf)?),
81            EthMessageID::NewPooledTransactionHashes => {
82                if version >= EthVersion::Eth68 {
83                    EthMessage::NewPooledTransactionHashes68(NewPooledTransactionHashes68::decode(
84                        buf,
85                    )?)
86                } else {
87                    EthMessage::NewPooledTransactionHashes66(NewPooledTransactionHashes66::decode(
88                        buf,
89                    )?)
90                }
91            }
92            EthMessageID::GetBlockHeaders => EthMessage::GetBlockHeaders(RequestPair::decode(buf)?),
93            EthMessageID::BlockHeaders => EthMessage::BlockHeaders(RequestPair::decode(buf)?),
94            EthMessageID::GetBlockBodies => EthMessage::GetBlockBodies(RequestPair::decode(buf)?),
95            EthMessageID::BlockBodies => EthMessage::BlockBodies(RequestPair::decode(buf)?),
96            EthMessageID::GetPooledTransactions => {
97                EthMessage::GetPooledTransactions(RequestPair::decode(buf)?)
98            }
99            EthMessageID::PooledTransactions => {
100                EthMessage::PooledTransactions(RequestPair::decode(buf)?)
101            }
102            EthMessageID::GetNodeData => {
103                if version >= EthVersion::Eth67 {
104                    return Err(MessageError::Invalid(version, EthMessageID::GetNodeData))
105                }
106                EthMessage::GetNodeData(RequestPair::decode(buf)?)
107            }
108            EthMessageID::NodeData => {
109                if version >= EthVersion::Eth67 {
110                    return Err(MessageError::Invalid(version, EthMessageID::GetNodeData))
111                }
112                EthMessage::NodeData(RequestPair::decode(buf)?)
113            }
114            EthMessageID::GetReceipts => {
115                if version >= EthVersion::Eth70 {
116                    EthMessage::GetReceipts70(RequestPair::decode(buf)?)
117                } else {
118                    EthMessage::GetReceipts(RequestPair::decode(buf)?)
119                }
120            }
121            EthMessageID::Receipts => {
122                match version {
123                    v if v >= EthVersion::Eth70 => {
124                        // eth/70 continues to omit bloom filters and adds the
125                        // `lastBlockIncomplete` flag, encoded as
126                        // `[request-id, lastBlockIncomplete, [[receipt₁, receipt₂], ...]]`.
127                        EthMessage::Receipts70(RequestPair::decode(buf)?)
128                    }
129                    EthVersion::Eth69 => {
130                        // with eth69, receipts no longer include the bloom
131                        EthMessage::Receipts69(RequestPair::decode(buf)?)
132                    }
133                    _ => {
134                        // before eth69 we need to decode the bloom  as well
135                        EthMessage::Receipts(RequestPair::decode(buf)?)
136                    }
137                }
138            }
139            EthMessageID::BlockRangeUpdate => {
140                if version < EthVersion::Eth69 {
141                    return Err(MessageError::Invalid(version, EthMessageID::BlockRangeUpdate))
142                }
143                EthMessage::BlockRangeUpdate(BlockRangeUpdate::decode(buf)?)
144            }
145            EthMessageID::Other(_) => {
146                let raw_payload = Bytes::copy_from_slice(buf);
147                buf.advance(raw_payload.len());
148                EthMessage::Other(RawCapabilityMessage::new(
149                    message_type.to_u8() as usize,
150                    raw_payload.into(),
151                ))
152            }
153        };
154        Ok(Self { message_type, message })
155    }
156}
157
158impl<N: NetworkPrimitives> Encodable for ProtocolMessage<N> {
159    /// Encodes the protocol message into bytes. The message type is encoded as a single byte and
160    /// prepended to the message.
161    fn encode(&self, out: &mut dyn BufMut) {
162        self.message_type.encode(out);
163        self.message.encode(out);
164    }
165    fn length(&self) -> usize {
166        self.message_type.length() + self.message.length()
167    }
168}
169
170impl<N: NetworkPrimitives> From<EthMessage<N>> for ProtocolMessage<N> {
171    fn from(message: EthMessage<N>) -> Self {
172        Self { message_type: message.message_id(), message }
173    }
174}
175
176/// Represents messages that can be sent to multiple peers.
177#[derive(Clone, Debug)]
178pub struct ProtocolBroadcastMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
179    /// The unique identifier representing the type of the Ethereum message.
180    pub message_type: EthMessageID,
181    /// The content of the message to be broadcasted, including specific data based on the message
182    /// type.
183    pub message: EthBroadcastMessage<N>,
184}
185
186impl<N: NetworkPrimitives> Encodable for ProtocolBroadcastMessage<N> {
187    /// Encodes the protocol message into bytes. The message type is encoded as a single byte and
188    /// prepended to the message.
189    fn encode(&self, out: &mut dyn BufMut) {
190        self.message_type.encode(out);
191        self.message.encode(out);
192    }
193    fn length(&self) -> usize {
194        self.message_type.length() + self.message.length()
195    }
196}
197
198impl<N: NetworkPrimitives> From<EthBroadcastMessage<N>> for ProtocolBroadcastMessage<N> {
199    fn from(message: EthBroadcastMessage<N>) -> Self {
200        Self { message_type: message.message_id(), message }
201    }
202}
203
204/// Represents a message in the eth wire protocol, versions 66, 67, 68 and 69.
205///
206/// The ethereum wire protocol is a set of messages that are broadcast to the network in two
207/// styles:
208///  * A request message sent by a peer (such as [`GetPooledTransactions`]), and an associated
209///    response message (such as [`PooledTransactions`]).
210///  * A message that is broadcast to the network, without a corresponding request.
211///
212/// The newer `eth/66` is an efficiency upgrade on top of `eth/65`, introducing a request id to
213/// correlate request-response message pairs. This allows for request multiplexing.
214///
215/// The `eth/67` is based on `eth/66` but only removes two messages, [`GetNodeData`] and
216/// [`NodeData`].
217///
218/// The `eth/68` changes only `NewPooledTransactionHashes` to include `types` and `sized`. For
219/// it, `NewPooledTransactionHashes` is renamed as [`NewPooledTransactionHashes66`] and
220/// [`NewPooledTransactionHashes68`] is defined.
221///
222/// The `eth/69` announces the historical block range served by the node. Removes total difficulty
223/// information. And removes the Bloom field from receipts transferred over the protocol.
224///
225/// The `eth/70` (EIP-7975) keeps the eth/69 status format and introduces partial receipts.
226/// requests/responses.
227#[derive(Clone, Debug, PartialEq, Eq)]
228#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
229pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
230    /// Represents a Status message required for the protocol handshake.
231    Status(StatusMessage),
232    /// Represents a `NewBlockHashes` message broadcast to the network.
233    NewBlockHashes(NewBlockHashes),
234    /// Represents a `NewBlock` message broadcast to the network.
235    #[cfg_attr(
236        feature = "serde",
237        serde(bound = "N::NewBlockPayload: serde::Serialize + serde::de::DeserializeOwned")
238    )]
239    NewBlock(Box<N::NewBlockPayload>),
240    /// Represents a Transactions message broadcast to the network.
241    #[cfg_attr(
242        feature = "serde",
243        serde(bound = "N::BroadcastedTransaction: serde::Serialize + serde::de::DeserializeOwned")
244    )]
245    Transactions(Transactions<N::BroadcastedTransaction>),
246    /// Represents a `NewPooledTransactionHashes` message for eth/66 version.
247    NewPooledTransactionHashes66(NewPooledTransactionHashes66),
248    /// Represents a `NewPooledTransactionHashes` message for eth/68 version.
249    NewPooledTransactionHashes68(NewPooledTransactionHashes68),
250    // The following messages are request-response message pairs
251    /// Represents a `GetBlockHeaders` request-response pair.
252    GetBlockHeaders(RequestPair<GetBlockHeaders>),
253    /// Represents a `BlockHeaders` request-response pair.
254    #[cfg_attr(
255        feature = "serde",
256        serde(bound = "N::BlockHeader: serde::Serialize + serde::de::DeserializeOwned")
257    )]
258    BlockHeaders(RequestPair<BlockHeaders<N::BlockHeader>>),
259    /// Represents a `GetBlockBodies` request-response pair.
260    GetBlockBodies(RequestPair<GetBlockBodies>),
261    /// Represents a `BlockBodies` request-response pair.
262    #[cfg_attr(
263        feature = "serde",
264        serde(bound = "N::BlockBody: serde::Serialize + serde::de::DeserializeOwned")
265    )]
266    BlockBodies(RequestPair<BlockBodies<N::BlockBody>>),
267    /// Represents a `GetPooledTransactions` request-response pair.
268    GetPooledTransactions(RequestPair<GetPooledTransactions>),
269    /// Represents a `PooledTransactions` request-response pair.
270    #[cfg_attr(
271        feature = "serde",
272        serde(bound = "N::PooledTransaction: serde::Serialize + serde::de::DeserializeOwned")
273    )]
274    PooledTransactions(RequestPair<PooledTransactions<N::PooledTransaction>>),
275    /// Represents a `GetNodeData` request-response pair.
276    GetNodeData(RequestPair<GetNodeData>),
277    /// Represents a `NodeData` request-response pair.
278    NodeData(RequestPair<NodeData>),
279    /// Represents a `GetReceipts` request-response pair.
280    GetReceipts(RequestPair<GetReceipts>),
281    /// Represents a `GetReceipts` request for eth/70.
282    ///
283    /// Note: Unlike earlier protocol versions, the eth/70 encoding for
284    /// `GetReceipts` in EIP-7975 inlines the request id. The type still wraps
285    /// a [`RequestPair`], but with a custom inline encoding.
286    GetReceipts70(RequestPair<GetReceipts70>),
287    /// Represents a Receipts request-response pair.
288    #[cfg_attr(
289        feature = "serde",
290        serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
291    )]
292    Receipts(RequestPair<Receipts<N::Receipt>>),
293    /// Represents a Receipts request-response pair for eth/69.
294    #[cfg_attr(
295        feature = "serde",
296        serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
297    )]
298    Receipts69(RequestPair<Receipts69<N::Receipt>>),
299    /// Represents a Receipts request-response pair for eth/70.
300    #[cfg_attr(
301        feature = "serde",
302        serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
303    )]
304    ///
305    /// Note: The eth/70 encoding for `Receipts` in EIP-7975 inlines the
306    /// request id. The type still wraps a [`RequestPair`], but with a custom
307    /// inline encoding.
308    Receipts70(RequestPair<Receipts70<N::Receipt>>),
309    /// Represents a `BlockRangeUpdate` message broadcast to the network.
310    #[cfg_attr(
311        feature = "serde",
312        serde(bound = "N::BroadcastedTransaction: serde::Serialize + serde::de::DeserializeOwned")
313    )]
314    BlockRangeUpdate(BlockRangeUpdate),
315    /// Represents an encoded message that doesn't match any other variant
316    Other(RawCapabilityMessage),
317}
318
319impl<N: NetworkPrimitives> EthMessage<N> {
320    /// Returns the message's ID.
321    pub const fn message_id(&self) -> EthMessageID {
322        match self {
323            Self::Status(_) => EthMessageID::Status,
324            Self::NewBlockHashes(_) => EthMessageID::NewBlockHashes,
325            Self::NewBlock(_) => EthMessageID::NewBlock,
326            Self::Transactions(_) => EthMessageID::Transactions,
327            Self::NewPooledTransactionHashes66(_) | Self::NewPooledTransactionHashes68(_) => {
328                EthMessageID::NewPooledTransactionHashes
329            }
330            Self::GetBlockHeaders(_) => EthMessageID::GetBlockHeaders,
331            Self::BlockHeaders(_) => EthMessageID::BlockHeaders,
332            Self::GetBlockBodies(_) => EthMessageID::GetBlockBodies,
333            Self::BlockBodies(_) => EthMessageID::BlockBodies,
334            Self::GetPooledTransactions(_) => EthMessageID::GetPooledTransactions,
335            Self::PooledTransactions(_) => EthMessageID::PooledTransactions,
336            Self::GetNodeData(_) => EthMessageID::GetNodeData,
337            Self::NodeData(_) => EthMessageID::NodeData,
338            Self::GetReceipts(_) | Self::GetReceipts70(_) => EthMessageID::GetReceipts,
339            Self::Receipts(_) | Self::Receipts69(_) | Self::Receipts70(_) => EthMessageID::Receipts,
340            Self::BlockRangeUpdate(_) => EthMessageID::BlockRangeUpdate,
341            Self::Other(msg) => EthMessageID::Other(msg.id as u8),
342        }
343    }
344
345    /// Returns true if the message variant is a request.
346    pub const fn is_request(&self) -> bool {
347        matches!(
348            self,
349            Self::GetBlockBodies(_) |
350                Self::GetBlockHeaders(_) |
351                Self::GetReceipts(_) |
352                Self::GetReceipts70(_) |
353                Self::GetPooledTransactions(_) |
354                Self::GetNodeData(_)
355        )
356    }
357
358    /// Returns true if the message variant is a response to a request.
359    pub const fn is_response(&self) -> bool {
360        matches!(
361            self,
362            Self::PooledTransactions(_) |
363                Self::Receipts(_) |
364                Self::Receipts69(_) |
365                Self::Receipts70(_) |
366                Self::BlockHeaders(_) |
367                Self::BlockBodies(_) |
368                Self::NodeData(_)
369        )
370    }
371
372    /// Converts the message types where applicable.
373    ///
374    /// This handles up/downcasting where appropriate, for example for different receipt request
375    /// types.
376    pub fn map_versioned(self, version: EthVersion) -> Self {
377        // For eth/70 peers we send `GetReceipts` using the new eth/70
378        // encoding with `firstBlockReceiptIndex = 0`, while keeping the
379        // user-facing `PeerRequest` API unchanged.
380        if version >= EthVersion::Eth70 {
381            return match self {
382                Self::GetReceipts(pair) => {
383                    let RequestPair { request_id, message } = pair;
384                    let req = RequestPair {
385                        request_id,
386                        message: GetReceipts70 {
387                            first_block_receipt_index: 0,
388                            block_hashes: message.0,
389                        },
390                    };
391                    Self::GetReceipts70(req)
392                }
393                other => other,
394            }
395        }
396
397        self
398    }
399}
400
401impl<N: NetworkPrimitives> Encodable for EthMessage<N> {
402    fn encode(&self, out: &mut dyn BufMut) {
403        match self {
404            Self::Status(status) => status.encode(out),
405            Self::NewBlockHashes(new_block_hashes) => new_block_hashes.encode(out),
406            Self::NewBlock(new_block) => new_block.encode(out),
407            Self::Transactions(transactions) => transactions.encode(out),
408            Self::NewPooledTransactionHashes66(hashes) => hashes.encode(out),
409            Self::NewPooledTransactionHashes68(hashes) => hashes.encode(out),
410            Self::GetBlockHeaders(request) => request.encode(out),
411            Self::BlockHeaders(headers) => headers.encode(out),
412            Self::GetBlockBodies(request) => request.encode(out),
413            Self::BlockBodies(bodies) => bodies.encode(out),
414            Self::GetPooledTransactions(request) => request.encode(out),
415            Self::PooledTransactions(transactions) => transactions.encode(out),
416            Self::GetNodeData(request) => request.encode(out),
417            Self::NodeData(data) => data.encode(out),
418            Self::GetReceipts(request) => request.encode(out),
419            Self::GetReceipts70(request) => request.encode(out),
420            Self::Receipts(receipts) => receipts.encode(out),
421            Self::Receipts69(receipt69) => receipt69.encode(out),
422            Self::Receipts70(receipt70) => receipt70.encode(out),
423            Self::BlockRangeUpdate(block_range_update) => block_range_update.encode(out),
424            Self::Other(unknown) => out.put_slice(&unknown.payload),
425        }
426    }
427    fn length(&self) -> usize {
428        match self {
429            Self::Status(status) => status.length(),
430            Self::NewBlockHashes(new_block_hashes) => new_block_hashes.length(),
431            Self::NewBlock(new_block) => new_block.length(),
432            Self::Transactions(transactions) => transactions.length(),
433            Self::NewPooledTransactionHashes66(hashes) => hashes.length(),
434            Self::NewPooledTransactionHashes68(hashes) => hashes.length(),
435            Self::GetBlockHeaders(request) => request.length(),
436            Self::BlockHeaders(headers) => headers.length(),
437            Self::GetBlockBodies(request) => request.length(),
438            Self::BlockBodies(bodies) => bodies.length(),
439            Self::GetPooledTransactions(request) => request.length(),
440            Self::PooledTransactions(transactions) => transactions.length(),
441            Self::GetNodeData(request) => request.length(),
442            Self::NodeData(data) => data.length(),
443            Self::GetReceipts(request) => request.length(),
444            Self::GetReceipts70(request) => request.length(),
445            Self::Receipts(receipts) => receipts.length(),
446            Self::Receipts69(receipt69) => receipt69.length(),
447            Self::Receipts70(receipt70) => receipt70.length(),
448            Self::BlockRangeUpdate(block_range_update) => block_range_update.length(),
449            Self::Other(unknown) => unknown.length(),
450        }
451    }
452}
453
454/// Represents broadcast messages of [`EthMessage`] with the same object that can be sent to
455/// multiple peers.
456///
457/// Messages that contain a list of hashes depend on the peer the message is sent to. A peer should
458/// never receive a hash of an object (block, transaction) it has already seen.
459///
460/// Note: This is only useful for outgoing messages.
461#[derive(Clone, Debug, PartialEq, Eq)]
462pub enum EthBroadcastMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
463    /// Represents a new block broadcast message.
464    NewBlock(Arc<N::NewBlockPayload>),
465    /// Represents a transactions broadcast message.
466    Transactions(SharedTransactions<N::BroadcastedTransaction>),
467}
468
469// === impl EthBroadcastMessage ===
470
471impl<N: NetworkPrimitives> EthBroadcastMessage<N> {
472    /// Returns the message's ID.
473    pub const fn message_id(&self) -> EthMessageID {
474        match self {
475            Self::NewBlock(_) => EthMessageID::NewBlock,
476            Self::Transactions(_) => EthMessageID::Transactions,
477        }
478    }
479}
480
481impl<N: NetworkPrimitives> Encodable for EthBroadcastMessage<N> {
482    fn encode(&self, out: &mut dyn BufMut) {
483        match self {
484            Self::NewBlock(new_block) => new_block.encode(out),
485            Self::Transactions(transactions) => transactions.encode(out),
486        }
487    }
488
489    fn length(&self) -> usize {
490        match self {
491            Self::NewBlock(new_block) => new_block.length(),
492            Self::Transactions(transactions) => transactions.length(),
493        }
494    }
495}
496
497/// Represents message IDs for eth protocol messages.
498#[repr(u8)]
499#[derive(Clone, Copy, Debug, PartialEq, Eq)]
500#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
501pub enum EthMessageID {
502    /// Status message.
503    Status = 0x00,
504    /// New block hashes message.
505    NewBlockHashes = 0x01,
506    /// Transactions message.
507    Transactions = 0x02,
508    /// Get block headers message.
509    GetBlockHeaders = 0x03,
510    /// Block headers message.
511    BlockHeaders = 0x04,
512    /// Get block bodies message.
513    GetBlockBodies = 0x05,
514    /// Block bodies message.
515    BlockBodies = 0x06,
516    /// New block message.
517    NewBlock = 0x07,
518    /// New pooled transaction hashes message.
519    NewPooledTransactionHashes = 0x08,
520    /// Requests pooled transactions.
521    GetPooledTransactions = 0x09,
522    /// Represents pooled transactions.
523    PooledTransactions = 0x0a,
524    /// Requests node data.
525    GetNodeData = 0x0d,
526    /// Represents node data.
527    NodeData = 0x0e,
528    /// Requests receipts.
529    GetReceipts = 0x0f,
530    /// Represents receipts.
531    Receipts = 0x10,
532    /// Block range update.
533    ///
534    /// Introduced in Eth69
535    BlockRangeUpdate = 0x11,
536    /// Represents unknown message types.
537    Other(u8),
538}
539
540impl EthMessageID {
541    /// Returns the corresponding `u8` value for an `EthMessageID`.
542    pub const fn to_u8(&self) -> u8 {
543        match self {
544            Self::Status => 0x00,
545            Self::NewBlockHashes => 0x01,
546            Self::Transactions => 0x02,
547            Self::GetBlockHeaders => 0x03,
548            Self::BlockHeaders => 0x04,
549            Self::GetBlockBodies => 0x05,
550            Self::BlockBodies => 0x06,
551            Self::NewBlock => 0x07,
552            Self::NewPooledTransactionHashes => 0x08,
553            Self::GetPooledTransactions => 0x09,
554            Self::PooledTransactions => 0x0a,
555            Self::GetNodeData => 0x0d,
556            Self::NodeData => 0x0e,
557            Self::GetReceipts => 0x0f,
558            Self::Receipts => 0x10,
559            Self::BlockRangeUpdate => 0x11,
560            Self::Other(value) => *value, // Return the stored `u8`
561        }
562    }
563
564    /// Returns the max value for the given version.
565    pub const fn max(version: EthVersion) -> u8 {
566        if version.is_eth69() {
567            Self::BlockRangeUpdate.to_u8()
568        } else {
569            Self::Receipts.to_u8()
570        }
571    }
572
573    /// Returns the total number of message types for the given version.
574    ///
575    /// This is used for message ID multiplexing.
576    ///
577    /// <https://github.com/ethereum/go-ethereum/blob/85077be58edea572f29c3b1a6a055077f1a56a8b/eth/protocols/eth/protocol.go#L45-L47>
578    pub const fn message_count(version: EthVersion) -> u8 {
579        Self::max(version) + 1
580    }
581}
582
583impl Encodable for EthMessageID {
584    fn encode(&self, out: &mut dyn BufMut) {
585        out.put_u8(self.to_u8());
586    }
587    fn length(&self) -> usize {
588        1
589    }
590}
591
592impl Decodable for EthMessageID {
593    fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
594        let id = match buf.first().ok_or(alloy_rlp::Error::InputTooShort)? {
595            0x00 => Self::Status,
596            0x01 => Self::NewBlockHashes,
597            0x02 => Self::Transactions,
598            0x03 => Self::GetBlockHeaders,
599            0x04 => Self::BlockHeaders,
600            0x05 => Self::GetBlockBodies,
601            0x06 => Self::BlockBodies,
602            0x07 => Self::NewBlock,
603            0x08 => Self::NewPooledTransactionHashes,
604            0x09 => Self::GetPooledTransactions,
605            0x0a => Self::PooledTransactions,
606            0x0d => Self::GetNodeData,
607            0x0e => Self::NodeData,
608            0x0f => Self::GetReceipts,
609            0x10 => Self::Receipts,
610            0x11 => Self::BlockRangeUpdate,
611            unknown => Self::Other(*unknown),
612        };
613        buf.advance(1);
614        Ok(id)
615    }
616}
617
618impl TryFrom<usize> for EthMessageID {
619    type Error = &'static str;
620
621    fn try_from(value: usize) -> Result<Self, Self::Error> {
622        match value {
623            0x00 => Ok(Self::Status),
624            0x01 => Ok(Self::NewBlockHashes),
625            0x02 => Ok(Self::Transactions),
626            0x03 => Ok(Self::GetBlockHeaders),
627            0x04 => Ok(Self::BlockHeaders),
628            0x05 => Ok(Self::GetBlockBodies),
629            0x06 => Ok(Self::BlockBodies),
630            0x07 => Ok(Self::NewBlock),
631            0x08 => Ok(Self::NewPooledTransactionHashes),
632            0x09 => Ok(Self::GetPooledTransactions),
633            0x0a => Ok(Self::PooledTransactions),
634            0x0d => Ok(Self::GetNodeData),
635            0x0e => Ok(Self::NodeData),
636            0x0f => Ok(Self::GetReceipts),
637            0x10 => Ok(Self::Receipts),
638            0x11 => Ok(Self::BlockRangeUpdate),
639            _ => Err("Invalid message ID"),
640        }
641    }
642}
643
644/// This is used for all request-response style `eth` protocol messages.
645/// This can represent either a request or a response, since both include a message payload and
646/// request id.
647#[derive(Clone, Debug, PartialEq, Eq)]
648#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
649#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
650pub struct RequestPair<T> {
651    /// id for the contained request or response message
652    pub request_id: u64,
653
654    /// the request or response message payload
655    pub message: T,
656}
657
658impl<T> RequestPair<T> {
659    /// Converts the message type with the given closure.
660    pub fn map<F, R>(self, f: F) -> RequestPair<R>
661    where
662        F: FnOnce(T) -> R,
663    {
664        let Self { request_id, message } = self;
665        RequestPair { request_id, message: f(message) }
666    }
667}
668
669/// Allows messages with request ids to be serialized into RLP bytes.
670impl<T> Encodable for RequestPair<T>
671where
672    T: Encodable,
673{
674    fn encode(&self, out: &mut dyn alloy_rlp::BufMut) {
675        let header =
676            Header { list: true, payload_length: self.request_id.length() + self.message.length() };
677
678        header.encode(out);
679        self.request_id.encode(out);
680        self.message.encode(out);
681    }
682
683    fn length(&self) -> usize {
684        let mut length = 0;
685        length += self.request_id.length();
686        length += self.message.length();
687        length += length_of_length(length);
688        length
689    }
690}
691
692/// Allows messages with request ids to be deserialized into RLP bytes.
693impl<T> Decodable for RequestPair<T>
694where
695    T: Decodable,
696{
697    fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
698        let header = Header::decode(buf)?;
699
700        let initial_length = buf.len();
701        let request_id = u64::decode(buf)?;
702        let message = T::decode(buf)?;
703
704        // Check that the buffer consumed exactly payload_length bytes after decoding the
705        // RequestPair
706        let consumed_len = initial_length - buf.len();
707        if consumed_len != header.payload_length {
708            return Err(alloy_rlp::Error::UnexpectedLength)
709        }
710
711        Ok(Self { request_id, message })
712    }
713}
714
715#[cfg(test)]
716mod tests {
717    use super::MessageError;
718    use crate::{
719        message::RequestPair, EthMessage, EthMessageID, EthNetworkPrimitives, EthVersion,
720        GetNodeData, NodeData, ProtocolMessage, RawCapabilityMessage,
721    };
722    use alloy_primitives::hex;
723    use alloy_rlp::{Decodable, Encodable, Error};
724    use reth_ethereum_primitives::BlockBody;
725
726    fn encode<T: Encodable>(value: T) -> Vec<u8> {
727        let mut buf = vec![];
728        value.encode(&mut buf);
729        buf
730    }
731
732    #[test]
733    fn test_removed_message_at_eth67() {
734        let get_node_data = EthMessage::<EthNetworkPrimitives>::GetNodeData(RequestPair {
735            request_id: 1337,
736            message: GetNodeData(vec![]),
737        });
738        let buf = encode(ProtocolMessage {
739            message_type: EthMessageID::GetNodeData,
740            message: get_node_data,
741        });
742        let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
743            crate::EthVersion::Eth67,
744            &mut &buf[..],
745        );
746        assert!(matches!(msg, Err(MessageError::Invalid(..))));
747
748        let node_data = EthMessage::<EthNetworkPrimitives>::NodeData(RequestPair {
749            request_id: 1337,
750            message: NodeData(vec![]),
751        });
752        let buf =
753            encode(ProtocolMessage { message_type: EthMessageID::NodeData, message: node_data });
754        let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
755            crate::EthVersion::Eth67,
756            &mut &buf[..],
757        );
758        assert!(matches!(msg, Err(MessageError::Invalid(..))));
759    }
760
761    #[test]
762    fn request_pair_encode() {
763        let request_pair = RequestPair { request_id: 1337, message: vec![5u8] };
764
765        // c5: start of list (c0) + len(full_list) (length is <55 bytes)
766        // 82: 0x80 + len(1337)
767        // 05 39: 1337 (request_id)
768        // === full_list ===
769        // c1: start of list (c0) + len(list) (length is <55 bytes)
770        // 05: 5 (message)
771        let expected = hex!("c5820539c105");
772        let got = encode(request_pair);
773        assert_eq!(expected[..], got, "expected: {expected:X?}, got: {got:X?}",);
774    }
775
776    #[test]
777    fn request_pair_decode() {
778        let raw_pair = &hex!("c5820539c105")[..];
779
780        let expected = RequestPair { request_id: 1337, message: vec![5u8] };
781
782        let got = RequestPair::<Vec<u8>>::decode(&mut &*raw_pair).unwrap();
783        assert_eq!(expected.length(), raw_pair.len());
784        assert_eq!(expected, got);
785    }
786
787    #[test]
788    fn malicious_request_pair_decode() {
789        // A maliciously encoded request pair, where the len(full_list) is 5, but it
790        // actually consumes 6 bytes when decoding
791        //
792        // c5: start of list (c0) + len(full_list) (length is <55 bytes)
793        // 82: 0x80 + len(1337)
794        // 05 39: 1337 (request_id)
795        // === full_list ===
796        // c2: start of list (c0) + len(list) (length is <55 bytes)
797        // 05 05: 5 5(message)
798        let raw_pair = &hex!("c5820539c20505")[..];
799
800        let result = RequestPair::<Vec<u8>>::decode(&mut &*raw_pair);
801        assert!(matches!(result, Err(Error::UnexpectedLength)));
802    }
803
804    #[test]
805    fn empty_block_bodies_protocol() {
806        let empty_block_bodies =
807            ProtocolMessage::from(EthMessage::<EthNetworkPrimitives>::BlockBodies(RequestPair {
808                request_id: 0,
809                message: Default::default(),
810            }));
811        let mut buf = Vec::new();
812        empty_block_bodies.encode(&mut buf);
813        let decoded =
814            ProtocolMessage::decode_message(EthVersion::Eth68, &mut buf.as_slice()).unwrap();
815        assert_eq!(empty_block_bodies, decoded);
816    }
817
818    #[test]
819    fn empty_block_body_protocol() {
820        let empty_block_bodies =
821            ProtocolMessage::from(EthMessage::<EthNetworkPrimitives>::BlockBodies(RequestPair {
822                request_id: 0,
823                message: vec![BlockBody {
824                    transactions: vec![],
825                    ommers: vec![],
826                    withdrawals: Some(Default::default()),
827                }]
828                .into(),
829            }));
830        let mut buf = Vec::new();
831        empty_block_bodies.encode(&mut buf);
832        let decoded =
833            ProtocolMessage::decode_message(EthVersion::Eth68, &mut buf.as_slice()).unwrap();
834        assert_eq!(empty_block_bodies, decoded);
835    }
836
837    #[test]
838    fn decode_block_bodies_message() {
839        let buf = hex!("06c48199c1c0");
840        let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
841            EthVersion::Eth68,
842            &mut &buf[..],
843        )
844        .unwrap_err();
845        assert!(matches!(msg, MessageError::RlpError(alloy_rlp::Error::InputTooShort)));
846    }
847
848    #[test]
849    fn custom_message_roundtrip() {
850        let custom_payload = vec![1, 2, 3, 4, 5];
851        let custom_message = RawCapabilityMessage::new(0x20, custom_payload.into());
852        let protocol_message = ProtocolMessage::<EthNetworkPrimitives> {
853            message_type: EthMessageID::Other(0x20),
854            message: EthMessage::Other(custom_message),
855        };
856
857        let encoded = encode(protocol_message.clone());
858        let decoded = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
859            EthVersion::Eth68,
860            &mut &encoded[..],
861        )
862        .unwrap();
863
864        assert_eq!(protocol_message, decoded);
865    }
866
867    #[test]
868    fn custom_message_empty_payload_roundtrip() {
869        let custom_message = RawCapabilityMessage::new(0x30, vec![].into());
870        let protocol_message = ProtocolMessage::<EthNetworkPrimitives> {
871            message_type: EthMessageID::Other(0x30),
872            message: EthMessage::Other(custom_message),
873        };
874
875        let encoded = encode(protocol_message.clone());
876        let decoded = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
877            EthVersion::Eth68,
878            &mut &encoded[..],
879        )
880        .unwrap();
881
882        assert_eq!(protocol_message, decoded);
883    }
884}