Skip to main content

reth_eth_wire_types/
message.rs

1//! Implements Ethereum wire protocol for versions 66 through 70.
2//! Defines structs/enums for messages, request-response pairs, and broadcasts.
3//! Handles compatibility with [`EthVersion`].
4//!
5//! Examples include creating, encoding, and decoding protocol messages.
6//!
7//! Reference: [Ethereum Wire Protocol](https://github.com/ethereum/devp2p/blob/master/caps/eth.md).
8
9use super::{
10    broadcast::NewBlockHashes, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders,
11    GetNodeData, GetPooledTransactions, GetReceipts, GetReceipts70, NewPooledTransactionHashes66,
12    NewPooledTransactionHashes68, NodeData, PooledTransactions, Receipts, Status, StatusEth69,
13    Transactions,
14};
15use crate::{
16    status::StatusMessage, BlockRangeUpdate, EthNetworkPrimitives, EthVersion, NetworkPrimitives,
17    RawCapabilityMessage, Receipts69, Receipts70, SharedTransactions,
18};
19use alloc::{boxed::Box, string::String, sync::Arc};
20use alloy_primitives::{
21    bytes::{Buf, BufMut},
22    Bytes,
23};
24use alloy_rlp::{length_of_length, Decodable, Encodable, Header};
25use core::fmt::Debug;
26
27/// [`MAX_MESSAGE_SIZE`] is the maximum cap on the size of a protocol message.
28// https://github.com/ethereum/go-ethereum/blob/30602163d5d8321fbc68afdcbbaf2362b2641bde/eth/protocols/eth/protocol.go#L50
29pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
30
31/// Error when sending/receiving a message
32#[derive(thiserror::Error, Debug)]
33pub enum MessageError {
34    /// Flags an unrecognized message ID for a given protocol version.
35    #[error("message id {1:?} is invalid for version {0:?}")]
36    Invalid(EthVersion, EthMessageID),
37    /// Expected a Status message but received a different message type.
38    #[error("expected status message but received {0:?}")]
39    ExpectedStatusMessage(EthMessageID),
40    /// Thrown when rlp decoding a message failed.
41    #[error("RLP error: {0}")]
42    RlpError(#[from] alloy_rlp::Error),
43    /// Other message error with custom message
44    #[error("{0}")]
45    Other(String),
46}
47
48/// An `eth` protocol message, containing a message ID and payload.
49#[derive(Clone, Debug, PartialEq, Eq)]
50#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
51pub struct ProtocolMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
52    /// The unique identifier representing the type of the Ethereum message.
53    pub message_type: EthMessageID,
54    /// The content of the message, including specific data based on the message type.
55    #[cfg_attr(
56        feature = "serde",
57        serde(bound = "EthMessage<N>: serde::Serialize + serde::de::DeserializeOwned")
58    )]
59    pub message: EthMessage<N>,
60}
61
62impl<N: NetworkPrimitives> ProtocolMessage<N> {
63    /// Decode only a Status message from RLP bytes.
64    ///
65    /// This is used during the eth handshake where only a Status message is a valid response.
66    /// Returns an error if the message is not a Status message.
67    pub fn decode_status(
68        version: EthVersion,
69        buf: &mut &[u8],
70    ) -> Result<StatusMessage, MessageError> {
71        let message_type = EthMessageID::decode(buf)?;
72
73        if message_type != EthMessageID::Status {
74            return Err(MessageError::ExpectedStatusMessage(message_type))
75        }
76
77        let status = if version < EthVersion::Eth69 {
78            StatusMessage::Legacy(Status::decode(buf)?)
79        } else {
80            StatusMessage::Eth69(StatusEth69::decode(buf)?)
81        };
82
83        Ok(status)
84    }
85
86    /// Create a new `ProtocolMessage` from a message type and message rlp bytes.
87    ///
88    /// This will enforce decoding according to the given [`EthVersion`] of the connection.
89    pub fn decode_message(version: EthVersion, buf: &mut &[u8]) -> Result<Self, MessageError> {
90        let message_type = EthMessageID::decode(buf)?;
91
92        // For EIP-7642 (https://github.com/ethereum/EIPs/blob/master/EIPS/eip-7642.md):
93        // pre-merge (legacy) status messages include total difficulty, whereas eth/69 omits it.
94        let message = match message_type {
95            EthMessageID::Status => EthMessage::Status(if version < EthVersion::Eth69 {
96                StatusMessage::Legacy(Status::decode(buf)?)
97            } else {
98                StatusMessage::Eth69(StatusEth69::decode(buf)?)
99            }),
100            EthMessageID::NewBlockHashes => {
101                EthMessage::NewBlockHashes(NewBlockHashes::decode(buf)?)
102            }
103            EthMessageID::NewBlock => {
104                EthMessage::NewBlock(Box::new(N::NewBlockPayload::decode(buf)?))
105            }
106            EthMessageID::Transactions => EthMessage::Transactions(Transactions::decode(buf)?),
107            EthMessageID::NewPooledTransactionHashes => {
108                if version >= EthVersion::Eth68 {
109                    EthMessage::NewPooledTransactionHashes68(NewPooledTransactionHashes68::decode(
110                        buf,
111                    )?)
112                } else {
113                    EthMessage::NewPooledTransactionHashes66(NewPooledTransactionHashes66::decode(
114                        buf,
115                    )?)
116                }
117            }
118            EthMessageID::GetBlockHeaders => EthMessage::GetBlockHeaders(RequestPair::decode(buf)?),
119            EthMessageID::BlockHeaders => EthMessage::BlockHeaders(RequestPair::decode(buf)?),
120            EthMessageID::GetBlockBodies => EthMessage::GetBlockBodies(RequestPair::decode(buf)?),
121            EthMessageID::BlockBodies => EthMessage::BlockBodies(RequestPair::decode(buf)?),
122            EthMessageID::GetPooledTransactions => {
123                EthMessage::GetPooledTransactions(RequestPair::decode(buf)?)
124            }
125            EthMessageID::PooledTransactions => {
126                EthMessage::PooledTransactions(RequestPair::decode(buf)?)
127            }
128            EthMessageID::GetNodeData => {
129                if version >= EthVersion::Eth67 {
130                    return Err(MessageError::Invalid(version, EthMessageID::GetNodeData))
131                }
132                EthMessage::GetNodeData(RequestPair::decode(buf)?)
133            }
134            EthMessageID::NodeData => {
135                if version >= EthVersion::Eth67 {
136                    return Err(MessageError::Invalid(version, EthMessageID::GetNodeData))
137                }
138                EthMessage::NodeData(RequestPair::decode(buf)?)
139            }
140            EthMessageID::GetReceipts => {
141                if version >= EthVersion::Eth70 {
142                    EthMessage::GetReceipts70(RequestPair::decode(buf)?)
143                } else {
144                    EthMessage::GetReceipts(RequestPair::decode(buf)?)
145                }
146            }
147            EthMessageID::Receipts => {
148                match version {
149                    v if v >= EthVersion::Eth70 => {
150                        // eth/70 continues to omit bloom filters and adds the
151                        // `lastBlockIncomplete` flag, encoded as
152                        // `[request-id, lastBlockIncomplete, [[receipt₁, receipt₂], ...]]`.
153                        EthMessage::Receipts70(RequestPair::decode(buf)?)
154                    }
155                    EthVersion::Eth69 => {
156                        // with eth69, receipts no longer include the bloom
157                        EthMessage::Receipts69(RequestPair::decode(buf)?)
158                    }
159                    _ => {
160                        // before eth69 we need to decode the bloom  as well
161                        EthMessage::Receipts(RequestPair::decode(buf)?)
162                    }
163                }
164            }
165            EthMessageID::BlockRangeUpdate => {
166                if version < EthVersion::Eth69 {
167                    return Err(MessageError::Invalid(version, EthMessageID::BlockRangeUpdate))
168                }
169                EthMessage::BlockRangeUpdate(BlockRangeUpdate::decode(buf)?)
170            }
171            EthMessageID::Other(_) => {
172                let raw_payload = Bytes::copy_from_slice(buf);
173                buf.advance(raw_payload.len());
174                EthMessage::Other(RawCapabilityMessage::new(
175                    message_type.to_u8() as usize,
176                    raw_payload.into(),
177                ))
178            }
179        };
180        Ok(Self { message_type, message })
181    }
182}
183
184impl<N: NetworkPrimitives> Encodable for ProtocolMessage<N> {
185    /// Encodes the protocol message into bytes. The message type is encoded as a single byte and
186    /// prepended to the message.
187    fn encode(&self, out: &mut dyn BufMut) {
188        self.message_type.encode(out);
189        self.message.encode(out);
190    }
191    fn length(&self) -> usize {
192        self.message_type.length() + self.message.length()
193    }
194}
195
196impl<N: NetworkPrimitives> From<EthMessage<N>> for ProtocolMessage<N> {
197    fn from(message: EthMessage<N>) -> Self {
198        Self { message_type: message.message_id(), message }
199    }
200}
201
202/// Represents messages that can be sent to multiple peers.
203#[derive(Clone, Debug)]
204pub struct ProtocolBroadcastMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
205    /// The unique identifier representing the type of the Ethereum message.
206    pub message_type: EthMessageID,
207    /// The content of the message to be broadcasted, including specific data based on the message
208    /// type.
209    pub message: EthBroadcastMessage<N>,
210}
211
212impl<N: NetworkPrimitives> Encodable for ProtocolBroadcastMessage<N> {
213    /// Encodes the protocol message into bytes. The message type is encoded as a single byte and
214    /// prepended to the message.
215    fn encode(&self, out: &mut dyn BufMut) {
216        self.message_type.encode(out);
217        self.message.encode(out);
218    }
219    fn length(&self) -> usize {
220        self.message_type.length() + self.message.length()
221    }
222}
223
224impl<N: NetworkPrimitives> From<EthBroadcastMessage<N>> for ProtocolBroadcastMessage<N> {
225    fn from(message: EthBroadcastMessage<N>) -> Self {
226        Self { message_type: message.message_id(), message }
227    }
228}
229
230/// Represents a message in the eth wire protocol, versions 66, 67, 68 and 69.
231///
232/// The ethereum wire protocol is a set of messages that are broadcast to the network in two
233/// styles:
234///  * A request message sent by a peer (such as [`GetPooledTransactions`]), and an associated
235///    response message (such as [`PooledTransactions`]).
236///  * A message that is broadcast to the network, without a corresponding request.
237///
238/// The newer `eth/66` is an efficiency upgrade on top of `eth/65`, introducing a request id to
239/// correlate request-response message pairs. This allows for request multiplexing.
240///
241/// The `eth/67` is based on `eth/66` but only removes two messages, [`GetNodeData`] and
242/// [`NodeData`].
243///
244/// The `eth/68` changes only `NewPooledTransactionHashes` to include `types` and `sized`. For
245/// it, `NewPooledTransactionHashes` is renamed as [`NewPooledTransactionHashes66`] and
246/// [`NewPooledTransactionHashes68`] is defined.
247///
248/// The `eth/69` announces the historical block range served by the node. Removes total difficulty
249/// information. And removes the Bloom field from receipts transferred over the protocol.
250///
251/// The `eth/70` (EIP-7975) keeps the eth/69 status format and introduces partial receipts.
252/// requests/responses.
253#[derive(Clone, Debug, PartialEq, Eq)]
254#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
255pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
256    /// Represents a Status message required for the protocol handshake.
257    Status(StatusMessage),
258    /// Represents a `NewBlockHashes` message broadcast to the network.
259    NewBlockHashes(NewBlockHashes),
260    /// Represents a `NewBlock` message broadcast to the network.
261    #[cfg_attr(
262        feature = "serde",
263        serde(bound = "N::NewBlockPayload: serde::Serialize + serde::de::DeserializeOwned")
264    )]
265    NewBlock(Box<N::NewBlockPayload>),
266    /// Represents a Transactions message broadcast to the network.
267    #[cfg_attr(
268        feature = "serde",
269        serde(bound = "N::BroadcastedTransaction: serde::Serialize + serde::de::DeserializeOwned")
270    )]
271    Transactions(Transactions<N::BroadcastedTransaction>),
272    /// Represents a `NewPooledTransactionHashes` message for eth/66 version.
273    NewPooledTransactionHashes66(NewPooledTransactionHashes66),
274    /// Represents a `NewPooledTransactionHashes` message for eth/68 version.
275    NewPooledTransactionHashes68(NewPooledTransactionHashes68),
276    // The following messages are request-response message pairs
277    /// Represents a `GetBlockHeaders` request-response pair.
278    GetBlockHeaders(RequestPair<GetBlockHeaders>),
279    /// Represents a `BlockHeaders` request-response pair.
280    #[cfg_attr(
281        feature = "serde",
282        serde(bound = "N::BlockHeader: serde::Serialize + serde::de::DeserializeOwned")
283    )]
284    BlockHeaders(RequestPair<BlockHeaders<N::BlockHeader>>),
285    /// Represents a `GetBlockBodies` request-response pair.
286    GetBlockBodies(RequestPair<GetBlockBodies>),
287    /// Represents a `BlockBodies` request-response pair.
288    #[cfg_attr(
289        feature = "serde",
290        serde(bound = "N::BlockBody: serde::Serialize + serde::de::DeserializeOwned")
291    )]
292    BlockBodies(RequestPair<BlockBodies<N::BlockBody>>),
293    /// Represents a `GetPooledTransactions` request-response pair.
294    GetPooledTransactions(RequestPair<GetPooledTransactions>),
295    /// Represents a `PooledTransactions` request-response pair.
296    #[cfg_attr(
297        feature = "serde",
298        serde(bound = "N::PooledTransaction: serde::Serialize + serde::de::DeserializeOwned")
299    )]
300    PooledTransactions(RequestPair<PooledTransactions<N::PooledTransaction>>),
301    /// Represents a `GetNodeData` request-response pair.
302    GetNodeData(RequestPair<GetNodeData>),
303    /// Represents a `NodeData` request-response pair.
304    NodeData(RequestPair<NodeData>),
305    /// Represents a `GetReceipts` request-response pair.
306    GetReceipts(RequestPair<GetReceipts>),
307    /// Represents a `GetReceipts` request for eth/70.
308    ///
309    /// Note: Unlike earlier protocol versions, the eth/70 encoding for
310    /// `GetReceipts` in EIP-7975 inlines the request id. The type still wraps
311    /// a [`RequestPair`], but with a custom inline encoding.
312    GetReceipts70(RequestPair<GetReceipts70>),
313    /// Represents a Receipts request-response pair.
314    #[cfg_attr(
315        feature = "serde",
316        serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
317    )]
318    Receipts(RequestPair<Receipts<N::Receipt>>),
319    /// Represents a Receipts request-response pair for eth/69.
320    #[cfg_attr(
321        feature = "serde",
322        serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
323    )]
324    Receipts69(RequestPair<Receipts69<N::Receipt>>),
325    /// Represents a Receipts request-response pair for eth/70.
326    #[cfg_attr(
327        feature = "serde",
328        serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
329    )]
330    ///
331    /// Note: The eth/70 encoding for `Receipts` in EIP-7975 inlines the
332    /// request id. The type still wraps a [`RequestPair`], but with a custom
333    /// inline encoding.
334    Receipts70(RequestPair<Receipts70<N::Receipt>>),
335    /// Represents a `BlockRangeUpdate` message broadcast to the network.
336    #[cfg_attr(
337        feature = "serde",
338        serde(bound = "N::BroadcastedTransaction: serde::Serialize + serde::de::DeserializeOwned")
339    )]
340    BlockRangeUpdate(BlockRangeUpdate),
341    /// Represents an encoded message that doesn't match any other variant
342    Other(RawCapabilityMessage),
343}
344
345impl<N: NetworkPrimitives> EthMessage<N> {
346    /// Returns the message's ID.
347    pub const fn message_id(&self) -> EthMessageID {
348        match self {
349            Self::Status(_) => EthMessageID::Status,
350            Self::NewBlockHashes(_) => EthMessageID::NewBlockHashes,
351            Self::NewBlock(_) => EthMessageID::NewBlock,
352            Self::Transactions(_) => EthMessageID::Transactions,
353            Self::NewPooledTransactionHashes66(_) | Self::NewPooledTransactionHashes68(_) => {
354                EthMessageID::NewPooledTransactionHashes
355            }
356            Self::GetBlockHeaders(_) => EthMessageID::GetBlockHeaders,
357            Self::BlockHeaders(_) => EthMessageID::BlockHeaders,
358            Self::GetBlockBodies(_) => EthMessageID::GetBlockBodies,
359            Self::BlockBodies(_) => EthMessageID::BlockBodies,
360            Self::GetPooledTransactions(_) => EthMessageID::GetPooledTransactions,
361            Self::PooledTransactions(_) => EthMessageID::PooledTransactions,
362            Self::GetNodeData(_) => EthMessageID::GetNodeData,
363            Self::NodeData(_) => EthMessageID::NodeData,
364            Self::GetReceipts(_) | Self::GetReceipts70(_) => EthMessageID::GetReceipts,
365            Self::Receipts(_) | Self::Receipts69(_) | Self::Receipts70(_) => EthMessageID::Receipts,
366            Self::BlockRangeUpdate(_) => EthMessageID::BlockRangeUpdate,
367            Self::Other(msg) => EthMessageID::Other(msg.id as u8),
368        }
369    }
370
371    /// Returns true if the message variant is a request.
372    pub const fn is_request(&self) -> bool {
373        matches!(
374            self,
375            Self::GetBlockBodies(_) |
376                Self::GetBlockHeaders(_) |
377                Self::GetReceipts(_) |
378                Self::GetReceipts70(_) |
379                Self::GetPooledTransactions(_) |
380                Self::GetNodeData(_)
381        )
382    }
383
384    /// Returns true if the message variant is a response to a request.
385    pub const fn is_response(&self) -> bool {
386        matches!(
387            self,
388            Self::PooledTransactions(_) |
389                Self::Receipts(_) |
390                Self::Receipts69(_) |
391                Self::Receipts70(_) |
392                Self::BlockHeaders(_) |
393                Self::BlockBodies(_) |
394                Self::NodeData(_)
395        )
396    }
397
398    /// Converts the message types where applicable.
399    ///
400    /// This handles up/downcasting where appropriate, for example for different receipt request
401    /// types.
402    pub fn map_versioned(self, version: EthVersion) -> Self {
403        // For eth/70 peers we send `GetReceipts` using the new eth/70
404        // encoding with `firstBlockReceiptIndex = 0`, while keeping the
405        // user-facing `PeerRequest` API unchanged.
406        if version >= EthVersion::Eth70 {
407            return match self {
408                Self::GetReceipts(pair) => {
409                    let RequestPair { request_id, message } = pair;
410                    let req = RequestPair {
411                        request_id,
412                        message: GetReceipts70 {
413                            first_block_receipt_index: 0,
414                            block_hashes: message.0,
415                        },
416                    };
417                    Self::GetReceipts70(req)
418                }
419                other => other,
420            }
421        }
422
423        self
424    }
425}
426
427impl<N: NetworkPrimitives> Encodable for EthMessage<N> {
428    fn encode(&self, out: &mut dyn BufMut) {
429        match self {
430            Self::Status(status) => status.encode(out),
431            Self::NewBlockHashes(new_block_hashes) => new_block_hashes.encode(out),
432            Self::NewBlock(new_block) => new_block.encode(out),
433            Self::Transactions(transactions) => transactions.encode(out),
434            Self::NewPooledTransactionHashes66(hashes) => hashes.encode(out),
435            Self::NewPooledTransactionHashes68(hashes) => hashes.encode(out),
436            Self::GetBlockHeaders(request) => request.encode(out),
437            Self::BlockHeaders(headers) => headers.encode(out),
438            Self::GetBlockBodies(request) => request.encode(out),
439            Self::BlockBodies(bodies) => bodies.encode(out),
440            Self::GetPooledTransactions(request) => request.encode(out),
441            Self::PooledTransactions(transactions) => transactions.encode(out),
442            Self::GetNodeData(request) => request.encode(out),
443            Self::NodeData(data) => data.encode(out),
444            Self::GetReceipts(request) => request.encode(out),
445            Self::GetReceipts70(request) => request.encode(out),
446            Self::Receipts(receipts) => receipts.encode(out),
447            Self::Receipts69(receipt69) => receipt69.encode(out),
448            Self::Receipts70(receipt70) => receipt70.encode(out),
449            Self::BlockRangeUpdate(block_range_update) => block_range_update.encode(out),
450            Self::Other(unknown) => out.put_slice(&unknown.payload),
451        }
452    }
453    fn length(&self) -> usize {
454        match self {
455            Self::Status(status) => status.length(),
456            Self::NewBlockHashes(new_block_hashes) => new_block_hashes.length(),
457            Self::NewBlock(new_block) => new_block.length(),
458            Self::Transactions(transactions) => transactions.length(),
459            Self::NewPooledTransactionHashes66(hashes) => hashes.length(),
460            Self::NewPooledTransactionHashes68(hashes) => hashes.length(),
461            Self::GetBlockHeaders(request) => request.length(),
462            Self::BlockHeaders(headers) => headers.length(),
463            Self::GetBlockBodies(request) => request.length(),
464            Self::BlockBodies(bodies) => bodies.length(),
465            Self::GetPooledTransactions(request) => request.length(),
466            Self::PooledTransactions(transactions) => transactions.length(),
467            Self::GetNodeData(request) => request.length(),
468            Self::NodeData(data) => data.length(),
469            Self::GetReceipts(request) => request.length(),
470            Self::GetReceipts70(request) => request.length(),
471            Self::Receipts(receipts) => receipts.length(),
472            Self::Receipts69(receipt69) => receipt69.length(),
473            Self::Receipts70(receipt70) => receipt70.length(),
474            Self::BlockRangeUpdate(block_range_update) => block_range_update.length(),
475            Self::Other(unknown) => unknown.length(),
476        }
477    }
478}
479
480/// Represents broadcast messages of [`EthMessage`] with the same object that can be sent to
481/// multiple peers.
482///
483/// Messages that contain a list of hashes depend on the peer the message is sent to. A peer should
484/// never receive a hash of an object (block, transaction) it has already seen.
485///
486/// Note: This is only useful for outgoing messages.
487#[derive(Clone, Debug, PartialEq, Eq)]
488pub enum EthBroadcastMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
489    /// Represents a new block broadcast message.
490    NewBlock(Arc<N::NewBlockPayload>),
491    /// Represents a transactions broadcast message.
492    Transactions(SharedTransactions<N::BroadcastedTransaction>),
493}
494
495// === impl EthBroadcastMessage ===
496
497impl<N: NetworkPrimitives> EthBroadcastMessage<N> {
498    /// Returns the message's ID.
499    pub const fn message_id(&self) -> EthMessageID {
500        match self {
501            Self::NewBlock(_) => EthMessageID::NewBlock,
502            Self::Transactions(_) => EthMessageID::Transactions,
503        }
504    }
505}
506
507impl<N: NetworkPrimitives> Encodable for EthBroadcastMessage<N> {
508    fn encode(&self, out: &mut dyn BufMut) {
509        match self {
510            Self::NewBlock(new_block) => new_block.encode(out),
511            Self::Transactions(transactions) => transactions.encode(out),
512        }
513    }
514
515    fn length(&self) -> usize {
516        match self {
517            Self::NewBlock(new_block) => new_block.length(),
518            Self::Transactions(transactions) => transactions.length(),
519        }
520    }
521}
522
523/// Represents message IDs for eth protocol messages.
524#[repr(u8)]
525#[derive(Clone, Copy, Debug, PartialEq, Eq)]
526#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
527pub enum EthMessageID {
528    /// Status message.
529    Status = 0x00,
530    /// New block hashes message.
531    NewBlockHashes = 0x01,
532    /// Transactions message.
533    Transactions = 0x02,
534    /// Get block headers message.
535    GetBlockHeaders = 0x03,
536    /// Block headers message.
537    BlockHeaders = 0x04,
538    /// Get block bodies message.
539    GetBlockBodies = 0x05,
540    /// Block bodies message.
541    BlockBodies = 0x06,
542    /// New block message.
543    NewBlock = 0x07,
544    /// New pooled transaction hashes message.
545    NewPooledTransactionHashes = 0x08,
546    /// Requests pooled transactions.
547    GetPooledTransactions = 0x09,
548    /// Represents pooled transactions.
549    PooledTransactions = 0x0a,
550    /// Requests node data.
551    GetNodeData = 0x0d,
552    /// Represents node data.
553    NodeData = 0x0e,
554    /// Requests receipts.
555    GetReceipts = 0x0f,
556    /// Represents receipts.
557    Receipts = 0x10,
558    /// Block range update.
559    ///
560    /// Introduced in Eth69
561    BlockRangeUpdate = 0x11,
562    /// Represents unknown message types.
563    Other(u8),
564}
565
566impl EthMessageID {
567    /// Returns the corresponding `u8` value for an `EthMessageID`.
568    pub const fn to_u8(&self) -> u8 {
569        match self {
570            Self::Status => 0x00,
571            Self::NewBlockHashes => 0x01,
572            Self::Transactions => 0x02,
573            Self::GetBlockHeaders => 0x03,
574            Self::BlockHeaders => 0x04,
575            Self::GetBlockBodies => 0x05,
576            Self::BlockBodies => 0x06,
577            Self::NewBlock => 0x07,
578            Self::NewPooledTransactionHashes => 0x08,
579            Self::GetPooledTransactions => 0x09,
580            Self::PooledTransactions => 0x0a,
581            Self::GetNodeData => 0x0d,
582            Self::NodeData => 0x0e,
583            Self::GetReceipts => 0x0f,
584            Self::Receipts => 0x10,
585            Self::BlockRangeUpdate => 0x11,
586            Self::Other(value) => *value, // Return the stored `u8`
587        }
588    }
589
590    /// Returns the max value for the given version.
591    pub const fn max(version: EthVersion) -> u8 {
592        if version as u8 >= EthVersion::Eth69 as u8 {
593            Self::BlockRangeUpdate.to_u8()
594        } else {
595            Self::Receipts.to_u8()
596        }
597    }
598
599    /// Returns the total number of message types for the given version.
600    ///
601    /// This is used for message ID multiplexing.
602    ///
603    /// <https://github.com/ethereum/go-ethereum/blob/85077be58edea572f29c3b1a6a055077f1a56a8b/eth/protocols/eth/protocol.go#L45-L47>
604    pub const fn message_count(version: EthVersion) -> u8 {
605        Self::max(version) + 1
606    }
607}
608
609impl Encodable for EthMessageID {
610    fn encode(&self, out: &mut dyn BufMut) {
611        out.put_u8(self.to_u8());
612    }
613    fn length(&self) -> usize {
614        1
615    }
616}
617
618impl Decodable for EthMessageID {
619    fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
620        let id = match buf.first().ok_or(alloy_rlp::Error::InputTooShort)? {
621            0x00 => Self::Status,
622            0x01 => Self::NewBlockHashes,
623            0x02 => Self::Transactions,
624            0x03 => Self::GetBlockHeaders,
625            0x04 => Self::BlockHeaders,
626            0x05 => Self::GetBlockBodies,
627            0x06 => Self::BlockBodies,
628            0x07 => Self::NewBlock,
629            0x08 => Self::NewPooledTransactionHashes,
630            0x09 => Self::GetPooledTransactions,
631            0x0a => Self::PooledTransactions,
632            0x0d => Self::GetNodeData,
633            0x0e => Self::NodeData,
634            0x0f => Self::GetReceipts,
635            0x10 => Self::Receipts,
636            0x11 => Self::BlockRangeUpdate,
637            unknown => Self::Other(*unknown),
638        };
639        buf.advance(1);
640        Ok(id)
641    }
642}
643
644impl TryFrom<usize> for EthMessageID {
645    type Error = &'static str;
646
647    fn try_from(value: usize) -> Result<Self, Self::Error> {
648        match value {
649            0x00 => Ok(Self::Status),
650            0x01 => Ok(Self::NewBlockHashes),
651            0x02 => Ok(Self::Transactions),
652            0x03 => Ok(Self::GetBlockHeaders),
653            0x04 => Ok(Self::BlockHeaders),
654            0x05 => Ok(Self::GetBlockBodies),
655            0x06 => Ok(Self::BlockBodies),
656            0x07 => Ok(Self::NewBlock),
657            0x08 => Ok(Self::NewPooledTransactionHashes),
658            0x09 => Ok(Self::GetPooledTransactions),
659            0x0a => Ok(Self::PooledTransactions),
660            0x0d => Ok(Self::GetNodeData),
661            0x0e => Ok(Self::NodeData),
662            0x0f => Ok(Self::GetReceipts),
663            0x10 => Ok(Self::Receipts),
664            0x11 => Ok(Self::BlockRangeUpdate),
665            _ => Err("Invalid message ID"),
666        }
667    }
668}
669
670/// This is used for all request-response style `eth` protocol messages.
671/// This can represent either a request or a response, since both include a message payload and
672/// request id.
673#[derive(Clone, Debug, PartialEq, Eq)]
674#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
675#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
676pub struct RequestPair<T> {
677    /// id for the contained request or response message
678    pub request_id: u64,
679
680    /// the request or response message payload
681    pub message: T,
682}
683
684impl<T> RequestPair<T> {
685    /// Converts the message type with the given closure.
686    pub fn map<F, R>(self, f: F) -> RequestPair<R>
687    where
688        F: FnOnce(T) -> R,
689    {
690        let Self { request_id, message } = self;
691        RequestPair { request_id, message: f(message) }
692    }
693}
694
695/// Allows messages with request ids to be serialized into RLP bytes.
696impl<T> Encodable for RequestPair<T>
697where
698    T: Encodable,
699{
700    fn encode(&self, out: &mut dyn alloy_rlp::BufMut) {
701        let header =
702            Header { list: true, payload_length: self.request_id.length() + self.message.length() };
703
704        header.encode(out);
705        self.request_id.encode(out);
706        self.message.encode(out);
707    }
708
709    fn length(&self) -> usize {
710        let mut length = 0;
711        length += self.request_id.length();
712        length += self.message.length();
713        length += length_of_length(length);
714        length
715    }
716}
717
718/// Allows messages with request ids to be deserialized into RLP bytes.
719impl<T> Decodable for RequestPair<T>
720where
721    T: Decodable,
722{
723    fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
724        let header = Header::decode(buf)?;
725
726        let initial_length = buf.len();
727        let request_id = u64::decode(buf)?;
728        let message = T::decode(buf)?;
729
730        // Check that the buffer consumed exactly payload_length bytes after decoding the
731        // RequestPair
732        let consumed_len = initial_length - buf.len();
733        if consumed_len != header.payload_length {
734            return Err(alloy_rlp::Error::UnexpectedLength)
735        }
736
737        Ok(Self { request_id, message })
738    }
739}
740
741#[cfg(test)]
742mod tests {
743    use super::MessageError;
744    use crate::{
745        message::RequestPair, EthMessage, EthMessageID, EthNetworkPrimitives, EthVersion,
746        GetNodeData, NodeData, ProtocolMessage, RawCapabilityMessage,
747    };
748    use alloy_primitives::hex;
749    use alloy_rlp::{Decodable, Encodable, Error};
750    use reth_ethereum_primitives::BlockBody;
751
752    fn encode<T: Encodable>(value: T) -> Vec<u8> {
753        let mut buf = vec![];
754        value.encode(&mut buf);
755        buf
756    }
757
758    #[test]
759    fn test_removed_message_at_eth67() {
760        let get_node_data = EthMessage::<EthNetworkPrimitives>::GetNodeData(RequestPair {
761            request_id: 1337,
762            message: GetNodeData(vec![]),
763        });
764        let buf = encode(ProtocolMessage {
765            message_type: EthMessageID::GetNodeData,
766            message: get_node_data,
767        });
768        let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
769            crate::EthVersion::Eth67,
770            &mut &buf[..],
771        );
772        assert!(matches!(msg, Err(MessageError::Invalid(..))));
773
774        let node_data = EthMessage::<EthNetworkPrimitives>::NodeData(RequestPair {
775            request_id: 1337,
776            message: NodeData(vec![]),
777        });
778        let buf =
779            encode(ProtocolMessage { message_type: EthMessageID::NodeData, message: node_data });
780        let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
781            crate::EthVersion::Eth67,
782            &mut &buf[..],
783        );
784        assert!(matches!(msg, Err(MessageError::Invalid(..))));
785    }
786
787    #[test]
788    fn request_pair_encode() {
789        let request_pair = RequestPair { request_id: 1337, message: vec![5u8] };
790
791        // c5: start of list (c0) + len(full_list) (length is <55 bytes)
792        // 82: 0x80 + len(1337)
793        // 05 39: 1337 (request_id)
794        // === full_list ===
795        // c1: start of list (c0) + len(list) (length is <55 bytes)
796        // 05: 5 (message)
797        let expected = hex!("c5820539c105");
798        let got = encode(request_pair);
799        assert_eq!(expected[..], got, "expected: {expected:X?}, got: {got:X?}",);
800    }
801
802    #[test]
803    fn request_pair_decode() {
804        let raw_pair = &hex!("c5820539c105")[..];
805
806        let expected = RequestPair { request_id: 1337, message: vec![5u8] };
807
808        let got = RequestPair::<Vec<u8>>::decode(&mut &*raw_pair).unwrap();
809        assert_eq!(expected.length(), raw_pair.len());
810        assert_eq!(expected, got);
811    }
812
813    #[test]
814    fn malicious_request_pair_decode() {
815        // A maliciously encoded request pair, where the len(full_list) is 5, but it
816        // actually consumes 6 bytes when decoding
817        //
818        // c5: start of list (c0) + len(full_list) (length is <55 bytes)
819        // 82: 0x80 + len(1337)
820        // 05 39: 1337 (request_id)
821        // === full_list ===
822        // c2: start of list (c0) + len(list) (length is <55 bytes)
823        // 05 05: 5 5(message)
824        let raw_pair = &hex!("c5820539c20505")[..];
825
826        let result = RequestPair::<Vec<u8>>::decode(&mut &*raw_pair);
827        assert!(matches!(result, Err(Error::UnexpectedLength)));
828    }
829
830    #[test]
831    fn empty_block_bodies_protocol() {
832        let empty_block_bodies =
833            ProtocolMessage::from(EthMessage::<EthNetworkPrimitives>::BlockBodies(RequestPair {
834                request_id: 0,
835                message: Default::default(),
836            }));
837        let mut buf = Vec::new();
838        empty_block_bodies.encode(&mut buf);
839        let decoded =
840            ProtocolMessage::decode_message(EthVersion::Eth68, &mut buf.as_slice()).unwrap();
841        assert_eq!(empty_block_bodies, decoded);
842    }
843
844    #[test]
845    fn empty_block_body_protocol() {
846        let empty_block_bodies =
847            ProtocolMessage::from(EthMessage::<EthNetworkPrimitives>::BlockBodies(RequestPair {
848                request_id: 0,
849                message: vec![BlockBody {
850                    transactions: vec![],
851                    ommers: vec![],
852                    withdrawals: Some(Default::default()),
853                }]
854                .into(),
855            }));
856        let mut buf = Vec::new();
857        empty_block_bodies.encode(&mut buf);
858        let decoded =
859            ProtocolMessage::decode_message(EthVersion::Eth68, &mut buf.as_slice()).unwrap();
860        assert_eq!(empty_block_bodies, decoded);
861    }
862
863    #[test]
864    fn decode_block_bodies_message() {
865        let buf = hex!("06c48199c1c0");
866        let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
867            EthVersion::Eth68,
868            &mut &buf[..],
869        )
870        .unwrap_err();
871        assert!(matches!(msg, MessageError::RlpError(alloy_rlp::Error::InputTooShort)));
872    }
873
874    #[test]
875    fn custom_message_roundtrip() {
876        let custom_payload = vec![1, 2, 3, 4, 5];
877        let custom_message = RawCapabilityMessage::new(0x20, custom_payload.into());
878        let protocol_message = ProtocolMessage::<EthNetworkPrimitives> {
879            message_type: EthMessageID::Other(0x20),
880            message: EthMessage::Other(custom_message),
881        };
882
883        let encoded = encode(protocol_message.clone());
884        let decoded = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
885            EthVersion::Eth68,
886            &mut &encoded[..],
887        )
888        .unwrap();
889
890        assert_eq!(protocol_message, decoded);
891    }
892
893    #[test]
894    fn custom_message_empty_payload_roundtrip() {
895        let custom_message = RawCapabilityMessage::new(0x30, vec![].into());
896        let protocol_message = ProtocolMessage::<EthNetworkPrimitives> {
897            message_type: EthMessageID::Other(0x30),
898            message: EthMessage::Other(custom_message),
899        };
900
901        let encoded = encode(protocol_message.clone());
902        let decoded = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
903            EthVersion::Eth68,
904            &mut &encoded[..],
905        )
906        .unwrap();
907
908        assert_eq!(protocol_message, decoded);
909    }
910
911    #[test]
912    fn decode_status_success() {
913        use crate::{Status, StatusMessage};
914        use alloy_hardforks::{ForkHash, ForkId};
915        use alloy_primitives::{B256, U256};
916
917        let status = Status {
918            version: EthVersion::Eth68,
919            chain: alloy_chains::Chain::mainnet(),
920            total_difficulty: U256::from(100u64),
921            blockhash: B256::random(),
922            genesis: B256::random(),
923            forkid: ForkId { hash: ForkHash([0xb7, 0x15, 0x07, 0x7d]), next: 0 },
924        };
925
926        let protocol_message = ProtocolMessage::<EthNetworkPrimitives>::from(EthMessage::Status(
927            StatusMessage::Legacy(status),
928        ));
929        let encoded = encode(protocol_message);
930
931        let decoded = ProtocolMessage::<EthNetworkPrimitives>::decode_status(
932            EthVersion::Eth68,
933            &mut &encoded[..],
934        )
935        .unwrap();
936
937        assert!(matches!(decoded, StatusMessage::Legacy(s) if s == status));
938    }
939
940    #[test]
941    fn eth_message_id_max_includes_block_range_update() {
942        assert_eq!(EthMessageID::max(EthVersion::Eth69), EthMessageID::BlockRangeUpdate.to_u8(),);
943        assert_eq!(EthMessageID::max(EthVersion::Eth70), EthMessageID::BlockRangeUpdate.to_u8(),);
944        assert_eq!(EthMessageID::max(EthVersion::Eth68), EthMessageID::Receipts.to_u8());
945    }
946
947    #[test]
948    fn decode_status_rejects_non_status() {
949        let msg = EthMessage::<EthNetworkPrimitives>::GetBlockBodies(RequestPair {
950            request_id: 1,
951            message: crate::GetBlockBodies::default(),
952        });
953        let protocol_message =
954            ProtocolMessage { message_type: EthMessageID::GetBlockBodies, message: msg };
955        let encoded = encode(protocol_message);
956
957        let result = ProtocolMessage::<EthNetworkPrimitives>::decode_status(
958            EthVersion::Eth68,
959            &mut &encoded[..],
960        );
961
962        assert!(matches!(
963            result,
964            Err(MessageError::ExpectedStatusMessage(EthMessageID::GetBlockBodies))
965        ));
966    }
967}