reth_eth_wire_types/
message.rs

1//! Implements Ethereum wire protocol for versions 66, 67, and 68.
2//! Defines structs/enums for messages, request-response pairs, and broadcasts.
3//! Handles compatibility with [`EthVersion`].
4//!
5//! Examples include creating, encoding, and decoding protocol messages.
6//!
7//! Reference: [Ethereum Wire Protocol](https://github.com/ethereum/devp2p/blob/master/caps/eth.md).
8
9use super::{
10    broadcast::NewBlockHashes, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders,
11    GetNodeData, GetPooledTransactions, GetReceipts, NewPooledTransactionHashes66,
12    NewPooledTransactionHashes68, NodeData, PooledTransactions, Receipts, Status, StatusEth69,
13    Transactions,
14};
15use crate::{
16    status::StatusMessage, BlockRangeUpdate, EthNetworkPrimitives, EthVersion, NetworkPrimitives,
17    RawCapabilityMessage, Receipts69, SharedTransactions,
18};
19use alloc::{boxed::Box, string::String, sync::Arc};
20use alloy_primitives::{
21    bytes::{Buf, BufMut},
22    Bytes,
23};
24use alloy_rlp::{length_of_length, Decodable, Encodable, Header};
25use core::fmt::Debug;
26
27/// [`MAX_MESSAGE_SIZE`] is the maximum cap on the size of a protocol message.
28// https://github.com/ethereum/go-ethereum/blob/30602163d5d8321fbc68afdcbbaf2362b2641bde/eth/protocols/eth/protocol.go#L50
29pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
30
31/// Error when sending/receiving a message
32#[derive(thiserror::Error, Debug)]
33pub enum MessageError {
34    /// Flags an unrecognized message ID for a given protocol version.
35    #[error("message id {1:?} is invalid for version {0:?}")]
36    Invalid(EthVersion, EthMessageID),
37    /// Thrown when rlp decoding a message failed.
38    #[error("RLP error: {0}")]
39    RlpError(#[from] alloy_rlp::Error),
40    /// Other message error with custom message
41    #[error("{0}")]
42    Other(String),
43}
44
45/// An `eth` protocol message, containing a message ID and payload.
46#[derive(Clone, Debug, PartialEq, Eq)]
47#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
48pub struct ProtocolMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
49    /// The unique identifier representing the type of the Ethereum message.
50    pub message_type: EthMessageID,
51    /// The content of the message, including specific data based on the message type.
52    #[cfg_attr(
53        feature = "serde",
54        serde(bound = "EthMessage<N>: serde::Serialize + serde::de::DeserializeOwned")
55    )]
56    pub message: EthMessage<N>,
57}
58
59impl<N: NetworkPrimitives> ProtocolMessage<N> {
60    /// Create a new `ProtocolMessage` from a message type and message rlp bytes.
61    ///
62    /// This will enforce decoding according to the given [`EthVersion`] of the connection.
63    pub fn decode_message(version: EthVersion, buf: &mut &[u8]) -> Result<Self, MessageError> {
64        let message_type = EthMessageID::decode(buf)?;
65
66        // For EIP-7642 (https://github.com/ethereum/EIPs/blob/master/EIPS/eip-7642.md):
67        // pre-merge (legacy) status messages include total difficulty, whereas eth/69 omits it.
68        let message = match message_type {
69            EthMessageID::Status => EthMessage::Status(if version < EthVersion::Eth69 {
70                StatusMessage::Legacy(Status::decode(buf)?)
71            } else {
72                StatusMessage::Eth69(StatusEth69::decode(buf)?)
73            }),
74            EthMessageID::NewBlockHashes => {
75                EthMessage::NewBlockHashes(NewBlockHashes::decode(buf)?)
76            }
77            EthMessageID::NewBlock => {
78                EthMessage::NewBlock(Box::new(N::NewBlockPayload::decode(buf)?))
79            }
80            EthMessageID::Transactions => EthMessage::Transactions(Transactions::decode(buf)?),
81            EthMessageID::NewPooledTransactionHashes => {
82                if version >= EthVersion::Eth68 {
83                    EthMessage::NewPooledTransactionHashes68(NewPooledTransactionHashes68::decode(
84                        buf,
85                    )?)
86                } else {
87                    EthMessage::NewPooledTransactionHashes66(NewPooledTransactionHashes66::decode(
88                        buf,
89                    )?)
90                }
91            }
92            EthMessageID::GetBlockHeaders => EthMessage::GetBlockHeaders(RequestPair::decode(buf)?),
93            EthMessageID::BlockHeaders => EthMessage::BlockHeaders(RequestPair::decode(buf)?),
94            EthMessageID::GetBlockBodies => EthMessage::GetBlockBodies(RequestPair::decode(buf)?),
95            EthMessageID::BlockBodies => EthMessage::BlockBodies(RequestPair::decode(buf)?),
96            EthMessageID::GetPooledTransactions => {
97                EthMessage::GetPooledTransactions(RequestPair::decode(buf)?)
98            }
99            EthMessageID::PooledTransactions => {
100                EthMessage::PooledTransactions(RequestPair::decode(buf)?)
101            }
102            EthMessageID::GetNodeData => {
103                if version >= EthVersion::Eth67 {
104                    return Err(MessageError::Invalid(version, EthMessageID::GetNodeData))
105                }
106                EthMessage::GetNodeData(RequestPair::decode(buf)?)
107            }
108            EthMessageID::NodeData => {
109                if version >= EthVersion::Eth67 {
110                    return Err(MessageError::Invalid(version, EthMessageID::GetNodeData))
111                }
112                EthMessage::NodeData(RequestPair::decode(buf)?)
113            }
114            EthMessageID::GetReceipts => EthMessage::GetReceipts(RequestPair::decode(buf)?),
115            EthMessageID::Receipts => {
116                if version < EthVersion::Eth69 {
117                    EthMessage::Receipts(RequestPair::decode(buf)?)
118                } else {
119                    // with eth69, receipts no longer include the bloom
120                    EthMessage::Receipts69(RequestPair::decode(buf)?)
121                }
122            }
123            EthMessageID::BlockRangeUpdate => {
124                if version < EthVersion::Eth69 {
125                    return Err(MessageError::Invalid(version, EthMessageID::BlockRangeUpdate))
126                }
127                EthMessage::BlockRangeUpdate(BlockRangeUpdate::decode(buf)?)
128            }
129            EthMessageID::Other(_) => {
130                let raw_payload = Bytes::copy_from_slice(buf);
131                buf.advance(raw_payload.len());
132                EthMessage::Other(RawCapabilityMessage::new(
133                    message_type.to_u8() as usize,
134                    raw_payload.into(),
135                ))
136            }
137        };
138        Ok(Self { message_type, message })
139    }
140}
141
142impl<N: NetworkPrimitives> Encodable for ProtocolMessage<N> {
143    /// Encodes the protocol message into bytes. The message type is encoded as a single byte and
144    /// prepended to the message.
145    fn encode(&self, out: &mut dyn BufMut) {
146        self.message_type.encode(out);
147        self.message.encode(out);
148    }
149    fn length(&self) -> usize {
150        self.message_type.length() + self.message.length()
151    }
152}
153
154impl<N: NetworkPrimitives> From<EthMessage<N>> for ProtocolMessage<N> {
155    fn from(message: EthMessage<N>) -> Self {
156        Self { message_type: message.message_id(), message }
157    }
158}
159
160/// Represents messages that can be sent to multiple peers.
161#[derive(Clone, Debug)]
162pub struct ProtocolBroadcastMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
163    /// The unique identifier representing the type of the Ethereum message.
164    pub message_type: EthMessageID,
165    /// The content of the message to be broadcasted, including specific data based on the message
166    /// type.
167    pub message: EthBroadcastMessage<N>,
168}
169
170impl<N: NetworkPrimitives> Encodable for ProtocolBroadcastMessage<N> {
171    /// Encodes the protocol message into bytes. The message type is encoded as a single byte and
172    /// prepended to the message.
173    fn encode(&self, out: &mut dyn BufMut) {
174        self.message_type.encode(out);
175        self.message.encode(out);
176    }
177    fn length(&self) -> usize {
178        self.message_type.length() + self.message.length()
179    }
180}
181
182impl<N: NetworkPrimitives> From<EthBroadcastMessage<N>> for ProtocolBroadcastMessage<N> {
183    fn from(message: EthBroadcastMessage<N>) -> Self {
184        Self { message_type: message.message_id(), message }
185    }
186}
187
188/// Represents a message in the eth wire protocol, versions 66, 67, 68 and 69.
189///
190/// The ethereum wire protocol is a set of messages that are broadcast to the network in two
191/// styles:
192///  * A request message sent by a peer (such as [`GetPooledTransactions`]), and an associated
193///    response message (such as [`PooledTransactions`]).
194///  * A message that is broadcast to the network, without a corresponding request.
195///
196/// The newer `eth/66` is an efficiency upgrade on top of `eth/65`, introducing a request id to
197/// correlate request-response message pairs. This allows for request multiplexing.
198///
199/// The `eth/67` is based on `eth/66` but only removes two messages, [`GetNodeData`] and
200/// [`NodeData`].
201///
202/// The `eth/68` changes only `NewPooledTransactionHashes` to include `types` and `sized`. For
203/// it, `NewPooledTransactionHashes` is renamed as [`NewPooledTransactionHashes66`] and
204/// [`NewPooledTransactionHashes68`] is defined.
205///
206/// The `eth/69` announces the historical block range served by the node. Removes total difficulty
207/// information. And removes the Bloom field from receipts transferred over the protocol.
208#[derive(Clone, Debug, PartialEq, Eq)]
209#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
210pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
211    /// Represents a Status message required for the protocol handshake.
212    Status(StatusMessage),
213    /// Represents a `NewBlockHashes` message broadcast to the network.
214    NewBlockHashes(NewBlockHashes),
215    /// Represents a `NewBlock` message broadcast to the network.
216    #[cfg_attr(
217        feature = "serde",
218        serde(bound = "N::NewBlockPayload: serde::Serialize + serde::de::DeserializeOwned")
219    )]
220    NewBlock(Box<N::NewBlockPayload>),
221    /// Represents a Transactions message broadcast to the network.
222    #[cfg_attr(
223        feature = "serde",
224        serde(bound = "N::BroadcastedTransaction: serde::Serialize + serde::de::DeserializeOwned")
225    )]
226    Transactions(Transactions<N::BroadcastedTransaction>),
227    /// Represents a `NewPooledTransactionHashes` message for eth/66 version.
228    NewPooledTransactionHashes66(NewPooledTransactionHashes66),
229    /// Represents a `NewPooledTransactionHashes` message for eth/68 version.
230    NewPooledTransactionHashes68(NewPooledTransactionHashes68),
231    // The following messages are request-response message pairs
232    /// Represents a `GetBlockHeaders` request-response pair.
233    GetBlockHeaders(RequestPair<GetBlockHeaders>),
234    /// Represents a `BlockHeaders` request-response pair.
235    #[cfg_attr(
236        feature = "serde",
237        serde(bound = "N::BlockHeader: serde::Serialize + serde::de::DeserializeOwned")
238    )]
239    BlockHeaders(RequestPair<BlockHeaders<N::BlockHeader>>),
240    /// Represents a `GetBlockBodies` request-response pair.
241    GetBlockBodies(RequestPair<GetBlockBodies>),
242    /// Represents a `BlockBodies` request-response pair.
243    #[cfg_attr(
244        feature = "serde",
245        serde(bound = "N::BlockBody: serde::Serialize + serde::de::DeserializeOwned")
246    )]
247    BlockBodies(RequestPair<BlockBodies<N::BlockBody>>),
248    /// Represents a `GetPooledTransactions` request-response pair.
249    GetPooledTransactions(RequestPair<GetPooledTransactions>),
250    /// Represents a `PooledTransactions` request-response pair.
251    #[cfg_attr(
252        feature = "serde",
253        serde(bound = "N::PooledTransaction: serde::Serialize + serde::de::DeserializeOwned")
254    )]
255    PooledTransactions(RequestPair<PooledTransactions<N::PooledTransaction>>),
256    /// Represents a `GetNodeData` request-response pair.
257    GetNodeData(RequestPair<GetNodeData>),
258    /// Represents a `NodeData` request-response pair.
259    NodeData(RequestPair<NodeData>),
260    /// Represents a `GetReceipts` request-response pair.
261    GetReceipts(RequestPair<GetReceipts>),
262    /// Represents a Receipts request-response pair.
263    #[cfg_attr(
264        feature = "serde",
265        serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
266    )]
267    Receipts(RequestPair<Receipts<N::Receipt>>),
268    /// Represents a Receipts request-response pair for eth/69.
269    #[cfg_attr(
270        feature = "serde",
271        serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
272    )]
273    Receipts69(RequestPair<Receipts69<N::Receipt>>),
274    /// Represents a `BlockRangeUpdate` message broadcast to the network.
275    #[cfg_attr(
276        feature = "serde",
277        serde(bound = "N::BroadcastedTransaction: serde::Serialize + serde::de::DeserializeOwned")
278    )]
279    BlockRangeUpdate(BlockRangeUpdate),
280    /// Represents an encoded message that doesn't match any other variant
281    Other(RawCapabilityMessage),
282}
283
284impl<N: NetworkPrimitives> EthMessage<N> {
285    /// Returns the message's ID.
286    pub const fn message_id(&self) -> EthMessageID {
287        match self {
288            Self::Status(_) => EthMessageID::Status,
289            Self::NewBlockHashes(_) => EthMessageID::NewBlockHashes,
290            Self::NewBlock(_) => EthMessageID::NewBlock,
291            Self::Transactions(_) => EthMessageID::Transactions,
292            Self::NewPooledTransactionHashes66(_) | Self::NewPooledTransactionHashes68(_) => {
293                EthMessageID::NewPooledTransactionHashes
294            }
295            Self::GetBlockHeaders(_) => EthMessageID::GetBlockHeaders,
296            Self::BlockHeaders(_) => EthMessageID::BlockHeaders,
297            Self::GetBlockBodies(_) => EthMessageID::GetBlockBodies,
298            Self::BlockBodies(_) => EthMessageID::BlockBodies,
299            Self::GetPooledTransactions(_) => EthMessageID::GetPooledTransactions,
300            Self::PooledTransactions(_) => EthMessageID::PooledTransactions,
301            Self::GetNodeData(_) => EthMessageID::GetNodeData,
302            Self::NodeData(_) => EthMessageID::NodeData,
303            Self::GetReceipts(_) => EthMessageID::GetReceipts,
304            Self::Receipts(_) | Self::Receipts69(_) => EthMessageID::Receipts,
305            Self::BlockRangeUpdate(_) => EthMessageID::BlockRangeUpdate,
306            Self::Other(msg) => EthMessageID::Other(msg.id as u8),
307        }
308    }
309
310    /// Returns true if the message variant is a request.
311    pub const fn is_request(&self) -> bool {
312        matches!(
313            self,
314            Self::GetBlockBodies(_) |
315                Self::GetBlockHeaders(_) |
316                Self::GetReceipts(_) |
317                Self::GetPooledTransactions(_) |
318                Self::GetNodeData(_)
319        )
320    }
321
322    /// Returns true if the message variant is a response to a request.
323    pub const fn is_response(&self) -> bool {
324        matches!(
325            self,
326            Self::PooledTransactions(_) |
327                Self::Receipts(_) |
328                Self::Receipts69(_) |
329                Self::BlockHeaders(_) |
330                Self::BlockBodies(_) |
331                Self::NodeData(_)
332        )
333    }
334}
335
336impl<N: NetworkPrimitives> Encodable for EthMessage<N> {
337    fn encode(&self, out: &mut dyn BufMut) {
338        match self {
339            Self::Status(status) => status.encode(out),
340            Self::NewBlockHashes(new_block_hashes) => new_block_hashes.encode(out),
341            Self::NewBlock(new_block) => new_block.encode(out),
342            Self::Transactions(transactions) => transactions.encode(out),
343            Self::NewPooledTransactionHashes66(hashes) => hashes.encode(out),
344            Self::NewPooledTransactionHashes68(hashes) => hashes.encode(out),
345            Self::GetBlockHeaders(request) => request.encode(out),
346            Self::BlockHeaders(headers) => headers.encode(out),
347            Self::GetBlockBodies(request) => request.encode(out),
348            Self::BlockBodies(bodies) => bodies.encode(out),
349            Self::GetPooledTransactions(request) => request.encode(out),
350            Self::PooledTransactions(transactions) => transactions.encode(out),
351            Self::GetNodeData(request) => request.encode(out),
352            Self::NodeData(data) => data.encode(out),
353            Self::GetReceipts(request) => request.encode(out),
354            Self::Receipts(receipts) => receipts.encode(out),
355            Self::Receipts69(receipt69) => receipt69.encode(out),
356            Self::BlockRangeUpdate(block_range_update) => block_range_update.encode(out),
357            Self::Other(unknown) => out.put_slice(&unknown.payload),
358        }
359    }
360    fn length(&self) -> usize {
361        match self {
362            Self::Status(status) => status.length(),
363            Self::NewBlockHashes(new_block_hashes) => new_block_hashes.length(),
364            Self::NewBlock(new_block) => new_block.length(),
365            Self::Transactions(transactions) => transactions.length(),
366            Self::NewPooledTransactionHashes66(hashes) => hashes.length(),
367            Self::NewPooledTransactionHashes68(hashes) => hashes.length(),
368            Self::GetBlockHeaders(request) => request.length(),
369            Self::BlockHeaders(headers) => headers.length(),
370            Self::GetBlockBodies(request) => request.length(),
371            Self::BlockBodies(bodies) => bodies.length(),
372            Self::GetPooledTransactions(request) => request.length(),
373            Self::PooledTransactions(transactions) => transactions.length(),
374            Self::GetNodeData(request) => request.length(),
375            Self::NodeData(data) => data.length(),
376            Self::GetReceipts(request) => request.length(),
377            Self::Receipts(receipts) => receipts.length(),
378            Self::Receipts69(receipt69) => receipt69.length(),
379            Self::BlockRangeUpdate(block_range_update) => block_range_update.length(),
380            Self::Other(unknown) => unknown.length(),
381        }
382    }
383}
384
385/// Represents broadcast messages of [`EthMessage`] with the same object that can be sent to
386/// multiple peers.
387///
388/// Messages that contain a list of hashes depend on the peer the message is sent to. A peer should
389/// never receive a hash of an object (block, transaction) it has already seen.
390///
391/// Note: This is only useful for outgoing messages.
392#[derive(Clone, Debug, PartialEq, Eq)]
393pub enum EthBroadcastMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
394    /// Represents a new block broadcast message.
395    NewBlock(Arc<N::NewBlockPayload>),
396    /// Represents a transactions broadcast message.
397    Transactions(SharedTransactions<N::BroadcastedTransaction>),
398}
399
400// === impl EthBroadcastMessage ===
401
402impl<N: NetworkPrimitives> EthBroadcastMessage<N> {
403    /// Returns the message's ID.
404    pub const fn message_id(&self) -> EthMessageID {
405        match self {
406            Self::NewBlock(_) => EthMessageID::NewBlock,
407            Self::Transactions(_) => EthMessageID::Transactions,
408        }
409    }
410}
411
412impl<N: NetworkPrimitives> Encodable for EthBroadcastMessage<N> {
413    fn encode(&self, out: &mut dyn BufMut) {
414        match self {
415            Self::NewBlock(new_block) => new_block.encode(out),
416            Self::Transactions(transactions) => transactions.encode(out),
417        }
418    }
419
420    fn length(&self) -> usize {
421        match self {
422            Self::NewBlock(new_block) => new_block.length(),
423            Self::Transactions(transactions) => transactions.length(),
424        }
425    }
426}
427
428/// Represents message IDs for eth protocol messages.
429#[repr(u8)]
430#[derive(Clone, Copy, Debug, PartialEq, Eq)]
431#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
432pub enum EthMessageID {
433    /// Status message.
434    Status = 0x00,
435    /// New block hashes message.
436    NewBlockHashes = 0x01,
437    /// Transactions message.
438    Transactions = 0x02,
439    /// Get block headers message.
440    GetBlockHeaders = 0x03,
441    /// Block headers message.
442    BlockHeaders = 0x04,
443    /// Get block bodies message.
444    GetBlockBodies = 0x05,
445    /// Block bodies message.
446    BlockBodies = 0x06,
447    /// New block message.
448    NewBlock = 0x07,
449    /// New pooled transaction hashes message.
450    NewPooledTransactionHashes = 0x08,
451    /// Requests pooled transactions.
452    GetPooledTransactions = 0x09,
453    /// Represents pooled transactions.
454    PooledTransactions = 0x0a,
455    /// Requests node data.
456    GetNodeData = 0x0d,
457    /// Represents node data.
458    NodeData = 0x0e,
459    /// Requests receipts.
460    GetReceipts = 0x0f,
461    /// Represents receipts.
462    Receipts = 0x10,
463    /// Block range update.
464    ///
465    /// Introduced in Eth69
466    BlockRangeUpdate = 0x11,
467    /// Represents unknown message types.
468    Other(u8),
469}
470
471impl EthMessageID {
472    /// Returns the corresponding `u8` value for an `EthMessageID`.
473    pub const fn to_u8(&self) -> u8 {
474        match self {
475            Self::Status => 0x00,
476            Self::NewBlockHashes => 0x01,
477            Self::Transactions => 0x02,
478            Self::GetBlockHeaders => 0x03,
479            Self::BlockHeaders => 0x04,
480            Self::GetBlockBodies => 0x05,
481            Self::BlockBodies => 0x06,
482            Self::NewBlock => 0x07,
483            Self::NewPooledTransactionHashes => 0x08,
484            Self::GetPooledTransactions => 0x09,
485            Self::PooledTransactions => 0x0a,
486            Self::GetNodeData => 0x0d,
487            Self::NodeData => 0x0e,
488            Self::GetReceipts => 0x0f,
489            Self::Receipts => 0x10,
490            Self::BlockRangeUpdate => 0x11,
491            Self::Other(value) => *value, // Return the stored `u8`
492        }
493    }
494
495    /// Returns the max value for the given version.
496    pub const fn max(version: EthVersion) -> u8 {
497        if version.is_eth69() {
498            Self::BlockRangeUpdate.to_u8()
499        } else {
500            Self::Receipts.to_u8()
501        }
502    }
503
504    /// Returns the total number of message types for the given version.
505    ///
506    /// This is used for message ID multiplexing.
507    ///
508    /// <https://github.com/ethereum/go-ethereum/blob/85077be58edea572f29c3b1a6a055077f1a56a8b/eth/protocols/eth/protocol.go#L45-L47>
509    pub const fn message_count(version: EthVersion) -> u8 {
510        Self::max(version) + 1
511    }
512}
513
514impl Encodable for EthMessageID {
515    fn encode(&self, out: &mut dyn BufMut) {
516        out.put_u8(self.to_u8());
517    }
518    fn length(&self) -> usize {
519        1
520    }
521}
522
523impl Decodable for EthMessageID {
524    fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
525        let id = match buf.first().ok_or(alloy_rlp::Error::InputTooShort)? {
526            0x00 => Self::Status,
527            0x01 => Self::NewBlockHashes,
528            0x02 => Self::Transactions,
529            0x03 => Self::GetBlockHeaders,
530            0x04 => Self::BlockHeaders,
531            0x05 => Self::GetBlockBodies,
532            0x06 => Self::BlockBodies,
533            0x07 => Self::NewBlock,
534            0x08 => Self::NewPooledTransactionHashes,
535            0x09 => Self::GetPooledTransactions,
536            0x0a => Self::PooledTransactions,
537            0x0d => Self::GetNodeData,
538            0x0e => Self::NodeData,
539            0x0f => Self::GetReceipts,
540            0x10 => Self::Receipts,
541            0x11 => Self::BlockRangeUpdate,
542            unknown => Self::Other(*unknown),
543        };
544        buf.advance(1);
545        Ok(id)
546    }
547}
548
549impl TryFrom<usize> for EthMessageID {
550    type Error = &'static str;
551
552    fn try_from(value: usize) -> Result<Self, Self::Error> {
553        match value {
554            0x00 => Ok(Self::Status),
555            0x01 => Ok(Self::NewBlockHashes),
556            0x02 => Ok(Self::Transactions),
557            0x03 => Ok(Self::GetBlockHeaders),
558            0x04 => Ok(Self::BlockHeaders),
559            0x05 => Ok(Self::GetBlockBodies),
560            0x06 => Ok(Self::BlockBodies),
561            0x07 => Ok(Self::NewBlock),
562            0x08 => Ok(Self::NewPooledTransactionHashes),
563            0x09 => Ok(Self::GetPooledTransactions),
564            0x0a => Ok(Self::PooledTransactions),
565            0x0d => Ok(Self::GetNodeData),
566            0x0e => Ok(Self::NodeData),
567            0x0f => Ok(Self::GetReceipts),
568            0x10 => Ok(Self::Receipts),
569            0x11 => Ok(Self::BlockRangeUpdate),
570            _ => Err("Invalid message ID"),
571        }
572    }
573}
574
575/// This is used for all request-response style `eth` protocol messages.
576/// This can represent either a request or a response, since both include a message payload and
577/// request id.
578#[derive(Clone, Debug, PartialEq, Eq)]
579#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
580#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
581pub struct RequestPair<T> {
582    /// id for the contained request or response message
583    pub request_id: u64,
584
585    /// the request or response message payload
586    pub message: T,
587}
588
589impl<T> RequestPair<T> {
590    /// Converts the message type with the given closure.
591    pub fn map<F, R>(self, f: F) -> RequestPair<R>
592    where
593        F: FnOnce(T) -> R,
594    {
595        let Self { request_id, message } = self;
596        RequestPair { request_id, message: f(message) }
597    }
598}
599
600/// Allows messages with request ids to be serialized into RLP bytes.
601impl<T> Encodable for RequestPair<T>
602where
603    T: Encodable,
604{
605    fn encode(&self, out: &mut dyn alloy_rlp::BufMut) {
606        let header =
607            Header { list: true, payload_length: self.request_id.length() + self.message.length() };
608
609        header.encode(out);
610        self.request_id.encode(out);
611        self.message.encode(out);
612    }
613
614    fn length(&self) -> usize {
615        let mut length = 0;
616        length += self.request_id.length();
617        length += self.message.length();
618        length += length_of_length(length);
619        length
620    }
621}
622
623/// Allows messages with request ids to be deserialized into RLP bytes.
624impl<T> Decodable for RequestPair<T>
625where
626    T: Decodable,
627{
628    fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
629        let header = Header::decode(buf)?;
630
631        let initial_length = buf.len();
632        let request_id = u64::decode(buf)?;
633        let message = T::decode(buf)?;
634
635        // Check that the buffer consumed exactly payload_length bytes after decoding the
636        // RequestPair
637        let consumed_len = initial_length - buf.len();
638        if consumed_len != header.payload_length {
639            return Err(alloy_rlp::Error::UnexpectedLength)
640        }
641
642        Ok(Self { request_id, message })
643    }
644}
645
646#[cfg(test)]
647mod tests {
648    use super::MessageError;
649    use crate::{
650        message::RequestPair, EthMessage, EthMessageID, EthNetworkPrimitives, EthVersion,
651        GetNodeData, NodeData, ProtocolMessage, RawCapabilityMessage,
652    };
653    use alloy_primitives::hex;
654    use alloy_rlp::{Decodable, Encodable, Error};
655    use reth_ethereum_primitives::BlockBody;
656
657    fn encode<T: Encodable>(value: T) -> Vec<u8> {
658        let mut buf = vec![];
659        value.encode(&mut buf);
660        buf
661    }
662
663    #[test]
664    fn test_removed_message_at_eth67() {
665        let get_node_data = EthMessage::<EthNetworkPrimitives>::GetNodeData(RequestPair {
666            request_id: 1337,
667            message: GetNodeData(vec![]),
668        });
669        let buf = encode(ProtocolMessage {
670            message_type: EthMessageID::GetNodeData,
671            message: get_node_data,
672        });
673        let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
674            crate::EthVersion::Eth67,
675            &mut &buf[..],
676        );
677        assert!(matches!(msg, Err(MessageError::Invalid(..))));
678
679        let node_data = EthMessage::<EthNetworkPrimitives>::NodeData(RequestPair {
680            request_id: 1337,
681            message: NodeData(vec![]),
682        });
683        let buf =
684            encode(ProtocolMessage { message_type: EthMessageID::NodeData, message: node_data });
685        let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
686            crate::EthVersion::Eth67,
687            &mut &buf[..],
688        );
689        assert!(matches!(msg, Err(MessageError::Invalid(..))));
690    }
691
692    #[test]
693    fn request_pair_encode() {
694        let request_pair = RequestPair { request_id: 1337, message: vec![5u8] };
695
696        // c5: start of list (c0) + len(full_list) (length is <55 bytes)
697        // 82: 0x80 + len(1337)
698        // 05 39: 1337 (request_id)
699        // === full_list ===
700        // c1: start of list (c0) + len(list) (length is <55 bytes)
701        // 05: 5 (message)
702        let expected = hex!("c5820539c105");
703        let got = encode(request_pair);
704        assert_eq!(expected[..], got, "expected: {expected:X?}, got: {got:X?}",);
705    }
706
707    #[test]
708    fn request_pair_decode() {
709        let raw_pair = &hex!("c5820539c105")[..];
710
711        let expected = RequestPair { request_id: 1337, message: vec![5u8] };
712
713        let got = RequestPair::<Vec<u8>>::decode(&mut &*raw_pair).unwrap();
714        assert_eq!(expected.length(), raw_pair.len());
715        assert_eq!(expected, got);
716    }
717
718    #[test]
719    fn malicious_request_pair_decode() {
720        // A maliciously encoded request pair, where the len(full_list) is 5, but it
721        // actually consumes 6 bytes when decoding
722        //
723        // c5: start of list (c0) + len(full_list) (length is <55 bytes)
724        // 82: 0x80 + len(1337)
725        // 05 39: 1337 (request_id)
726        // === full_list ===
727        // c2: start of list (c0) + len(list) (length is <55 bytes)
728        // 05 05: 5 5(message)
729        let raw_pair = &hex!("c5820539c20505")[..];
730
731        let result = RequestPair::<Vec<u8>>::decode(&mut &*raw_pair);
732        assert!(matches!(result, Err(Error::UnexpectedLength)));
733    }
734
735    #[test]
736    fn empty_block_bodies_protocol() {
737        let empty_block_bodies =
738            ProtocolMessage::from(EthMessage::<EthNetworkPrimitives>::BlockBodies(RequestPair {
739                request_id: 0,
740                message: Default::default(),
741            }));
742        let mut buf = Vec::new();
743        empty_block_bodies.encode(&mut buf);
744        let decoded =
745            ProtocolMessage::decode_message(EthVersion::Eth68, &mut buf.as_slice()).unwrap();
746        assert_eq!(empty_block_bodies, decoded);
747    }
748
749    #[test]
750    fn empty_block_body_protocol() {
751        let empty_block_bodies =
752            ProtocolMessage::from(EthMessage::<EthNetworkPrimitives>::BlockBodies(RequestPair {
753                request_id: 0,
754                message: vec![BlockBody {
755                    transactions: vec![],
756                    ommers: vec![],
757                    withdrawals: Some(Default::default()),
758                }]
759                .into(),
760            }));
761        let mut buf = Vec::new();
762        empty_block_bodies.encode(&mut buf);
763        let decoded =
764            ProtocolMessage::decode_message(EthVersion::Eth68, &mut buf.as_slice()).unwrap();
765        assert_eq!(empty_block_bodies, decoded);
766    }
767
768    #[test]
769    fn decode_block_bodies_message() {
770        let buf = hex!("06c48199c1c0");
771        let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
772            EthVersion::Eth68,
773            &mut &buf[..],
774        )
775        .unwrap_err();
776        assert!(matches!(msg, MessageError::RlpError(alloy_rlp::Error::InputTooShort)));
777    }
778
779    #[test]
780    fn custom_message_roundtrip() {
781        let custom_payload = vec![1, 2, 3, 4, 5];
782        let custom_message = RawCapabilityMessage::new(0x20, custom_payload.into());
783        let protocol_message = ProtocolMessage::<EthNetworkPrimitives> {
784            message_type: EthMessageID::Other(0x20),
785            message: EthMessage::Other(custom_message),
786        };
787
788        let encoded = encode(protocol_message.clone());
789        let decoded = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
790            EthVersion::Eth68,
791            &mut &encoded[..],
792        )
793        .unwrap();
794
795        assert_eq!(protocol_message, decoded);
796    }
797
798    #[test]
799    fn custom_message_empty_payload_roundtrip() {
800        let custom_message = RawCapabilityMessage::new(0x30, vec![].into());
801        let protocol_message = ProtocolMessage::<EthNetworkPrimitives> {
802            message_type: EthMessageID::Other(0x30),
803            message: EthMessage::Other(custom_message),
804        };
805
806        let encoded = encode(protocol_message.clone());
807        let decoded = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
808            EthVersion::Eth68,
809            &mut &encoded[..],
810        )
811        .unwrap();
812
813        assert_eq!(protocol_message, decoded);
814    }
815}