1use super::{
10 broadcast::NewBlockHashes, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders,
11 GetNodeData, GetPooledTransactions, GetReceipts, GetReceipts70, NewPooledTransactionHashes66,
12 NewPooledTransactionHashes68, NodeData, PooledTransactions, Receipts, Status, StatusEth69,
13 Transactions,
14};
15use crate::{
16 status::StatusMessage, BlockRangeUpdate, EthNetworkPrimitives, EthVersion, NetworkPrimitives,
17 RawCapabilityMessage, Receipts69, Receipts70, SharedTransactions,
18};
19use alloc::{boxed::Box, string::String, sync::Arc};
20use alloy_primitives::{
21 bytes::{Buf, BufMut},
22 Bytes,
23};
24use alloy_rlp::{length_of_length, Decodable, Encodable, Header};
25use core::fmt::Debug;
26
27pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
30
31#[derive(thiserror::Error, Debug)]
33pub enum MessageError {
34 #[error("message id {1:?} is invalid for version {0:?}")]
36 Invalid(EthVersion, EthMessageID),
37 #[error("expected status message but received {0:?}")]
39 ExpectedStatusMessage(EthMessageID),
40 #[error("RLP error: {0}")]
42 RlpError(#[from] alloy_rlp::Error),
43 #[error("{0}")]
45 Other(String),
46}
47
48#[derive(Clone, Debug, PartialEq, Eq)]
50#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
51pub struct ProtocolMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
52 pub message_type: EthMessageID,
54 #[cfg_attr(
56 feature = "serde",
57 serde(bound = "EthMessage<N>: serde::Serialize + serde::de::DeserializeOwned")
58 )]
59 pub message: EthMessage<N>,
60}
61
62impl<N: NetworkPrimitives> ProtocolMessage<N> {
63 pub fn decode_status(
68 version: EthVersion,
69 buf: &mut &[u8],
70 ) -> Result<StatusMessage, MessageError> {
71 let message_type = EthMessageID::decode(buf)?;
72
73 if message_type != EthMessageID::Status {
74 return Err(MessageError::ExpectedStatusMessage(message_type))
75 }
76
77 let status = if version < EthVersion::Eth69 {
78 StatusMessage::Legacy(Status::decode(buf)?)
79 } else {
80 StatusMessage::Eth69(StatusEth69::decode(buf)?)
81 };
82
83 Ok(status)
84 }
85
86 pub fn decode_message(version: EthVersion, buf: &mut &[u8]) -> Result<Self, MessageError> {
90 let message_type = EthMessageID::decode(buf)?;
91
92 let message = match message_type {
95 EthMessageID::Status => EthMessage::Status(if version < EthVersion::Eth69 {
96 StatusMessage::Legacy(Status::decode(buf)?)
97 } else {
98 StatusMessage::Eth69(StatusEth69::decode(buf)?)
99 }),
100 EthMessageID::NewBlockHashes => {
101 EthMessage::NewBlockHashes(NewBlockHashes::decode(buf)?)
102 }
103 EthMessageID::NewBlock => {
104 EthMessage::NewBlock(Box::new(N::NewBlockPayload::decode(buf)?))
105 }
106 EthMessageID::Transactions => EthMessage::Transactions(Transactions::decode(buf)?),
107 EthMessageID::NewPooledTransactionHashes => {
108 if version >= EthVersion::Eth68 {
109 EthMessage::NewPooledTransactionHashes68(NewPooledTransactionHashes68::decode(
110 buf,
111 )?)
112 } else {
113 EthMessage::NewPooledTransactionHashes66(NewPooledTransactionHashes66::decode(
114 buf,
115 )?)
116 }
117 }
118 EthMessageID::GetBlockHeaders => EthMessage::GetBlockHeaders(RequestPair::decode(buf)?),
119 EthMessageID::BlockHeaders => EthMessage::BlockHeaders(RequestPair::decode(buf)?),
120 EthMessageID::GetBlockBodies => EthMessage::GetBlockBodies(RequestPair::decode(buf)?),
121 EthMessageID::BlockBodies => EthMessage::BlockBodies(RequestPair::decode(buf)?),
122 EthMessageID::GetPooledTransactions => {
123 EthMessage::GetPooledTransactions(RequestPair::decode(buf)?)
124 }
125 EthMessageID::PooledTransactions => {
126 EthMessage::PooledTransactions(RequestPair::decode(buf)?)
127 }
128 EthMessageID::GetNodeData => {
129 if version >= EthVersion::Eth67 {
130 return Err(MessageError::Invalid(version, EthMessageID::GetNodeData))
131 }
132 EthMessage::GetNodeData(RequestPair::decode(buf)?)
133 }
134 EthMessageID::NodeData => {
135 if version >= EthVersion::Eth67 {
136 return Err(MessageError::Invalid(version, EthMessageID::GetNodeData))
137 }
138 EthMessage::NodeData(RequestPair::decode(buf)?)
139 }
140 EthMessageID::GetReceipts => {
141 if version >= EthVersion::Eth70 {
142 EthMessage::GetReceipts70(RequestPair::decode(buf)?)
143 } else {
144 EthMessage::GetReceipts(RequestPair::decode(buf)?)
145 }
146 }
147 EthMessageID::Receipts => {
148 match version {
149 v if v >= EthVersion::Eth70 => {
150 EthMessage::Receipts70(RequestPair::decode(buf)?)
154 }
155 EthVersion::Eth69 => {
156 EthMessage::Receipts69(RequestPair::decode(buf)?)
158 }
159 _ => {
160 EthMessage::Receipts(RequestPair::decode(buf)?)
162 }
163 }
164 }
165 EthMessageID::BlockRangeUpdate => {
166 if version < EthVersion::Eth69 {
167 return Err(MessageError::Invalid(version, EthMessageID::BlockRangeUpdate))
168 }
169 EthMessage::BlockRangeUpdate(BlockRangeUpdate::decode(buf)?)
170 }
171 EthMessageID::Other(_) => {
172 let raw_payload = Bytes::copy_from_slice(buf);
173 buf.advance(raw_payload.len());
174 EthMessage::Other(RawCapabilityMessage::new(
175 message_type.to_u8() as usize,
176 raw_payload.into(),
177 ))
178 }
179 };
180 Ok(Self { message_type, message })
181 }
182}
183
184impl<N: NetworkPrimitives> Encodable for ProtocolMessage<N> {
185 fn encode(&self, out: &mut dyn BufMut) {
188 self.message_type.encode(out);
189 self.message.encode(out);
190 }
191 fn length(&self) -> usize {
192 self.message_type.length() + self.message.length()
193 }
194}
195
196impl<N: NetworkPrimitives> From<EthMessage<N>> for ProtocolMessage<N> {
197 fn from(message: EthMessage<N>) -> Self {
198 Self { message_type: message.message_id(), message }
199 }
200}
201
202#[derive(Clone, Debug)]
204pub struct ProtocolBroadcastMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
205 pub message_type: EthMessageID,
207 pub message: EthBroadcastMessage<N>,
210}
211
212impl<N: NetworkPrimitives> Encodable for ProtocolBroadcastMessage<N> {
213 fn encode(&self, out: &mut dyn BufMut) {
216 self.message_type.encode(out);
217 self.message.encode(out);
218 }
219 fn length(&self) -> usize {
220 self.message_type.length() + self.message.length()
221 }
222}
223
224impl<N: NetworkPrimitives> From<EthBroadcastMessage<N>> for ProtocolBroadcastMessage<N> {
225 fn from(message: EthBroadcastMessage<N>) -> Self {
226 Self { message_type: message.message_id(), message }
227 }
228}
229
230#[derive(Clone, Debug, PartialEq, Eq)]
254#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
255pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
256 Status(StatusMessage),
258 NewBlockHashes(NewBlockHashes),
260 #[cfg_attr(
262 feature = "serde",
263 serde(bound = "N::NewBlockPayload: serde::Serialize + serde::de::DeserializeOwned")
264 )]
265 NewBlock(Box<N::NewBlockPayload>),
266 #[cfg_attr(
268 feature = "serde",
269 serde(bound = "N::BroadcastedTransaction: serde::Serialize + serde::de::DeserializeOwned")
270 )]
271 Transactions(Transactions<N::BroadcastedTransaction>),
272 NewPooledTransactionHashes66(NewPooledTransactionHashes66),
274 NewPooledTransactionHashes68(NewPooledTransactionHashes68),
276 GetBlockHeaders(RequestPair<GetBlockHeaders>),
279 #[cfg_attr(
281 feature = "serde",
282 serde(bound = "N::BlockHeader: serde::Serialize + serde::de::DeserializeOwned")
283 )]
284 BlockHeaders(RequestPair<BlockHeaders<N::BlockHeader>>),
285 GetBlockBodies(RequestPair<GetBlockBodies>),
287 #[cfg_attr(
289 feature = "serde",
290 serde(bound = "N::BlockBody: serde::Serialize + serde::de::DeserializeOwned")
291 )]
292 BlockBodies(RequestPair<BlockBodies<N::BlockBody>>),
293 GetPooledTransactions(RequestPair<GetPooledTransactions>),
295 #[cfg_attr(
297 feature = "serde",
298 serde(bound = "N::PooledTransaction: serde::Serialize + serde::de::DeserializeOwned")
299 )]
300 PooledTransactions(RequestPair<PooledTransactions<N::PooledTransaction>>),
301 GetNodeData(RequestPair<GetNodeData>),
303 NodeData(RequestPair<NodeData>),
305 GetReceipts(RequestPair<GetReceipts>),
307 GetReceipts70(RequestPair<GetReceipts70>),
313 #[cfg_attr(
315 feature = "serde",
316 serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
317 )]
318 Receipts(RequestPair<Receipts<N::Receipt>>),
319 #[cfg_attr(
321 feature = "serde",
322 serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
323 )]
324 Receipts69(RequestPair<Receipts69<N::Receipt>>),
325 #[cfg_attr(
327 feature = "serde",
328 serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
329 )]
330 Receipts70(RequestPair<Receipts70<N::Receipt>>),
335 #[cfg_attr(
337 feature = "serde",
338 serde(bound = "N::BroadcastedTransaction: serde::Serialize + serde::de::DeserializeOwned")
339 )]
340 BlockRangeUpdate(BlockRangeUpdate),
341 Other(RawCapabilityMessage),
343}
344
345impl<N: NetworkPrimitives> EthMessage<N> {
346 pub const fn message_id(&self) -> EthMessageID {
348 match self {
349 Self::Status(_) => EthMessageID::Status,
350 Self::NewBlockHashes(_) => EthMessageID::NewBlockHashes,
351 Self::NewBlock(_) => EthMessageID::NewBlock,
352 Self::Transactions(_) => EthMessageID::Transactions,
353 Self::NewPooledTransactionHashes66(_) | Self::NewPooledTransactionHashes68(_) => {
354 EthMessageID::NewPooledTransactionHashes
355 }
356 Self::GetBlockHeaders(_) => EthMessageID::GetBlockHeaders,
357 Self::BlockHeaders(_) => EthMessageID::BlockHeaders,
358 Self::GetBlockBodies(_) => EthMessageID::GetBlockBodies,
359 Self::BlockBodies(_) => EthMessageID::BlockBodies,
360 Self::GetPooledTransactions(_) => EthMessageID::GetPooledTransactions,
361 Self::PooledTransactions(_) => EthMessageID::PooledTransactions,
362 Self::GetNodeData(_) => EthMessageID::GetNodeData,
363 Self::NodeData(_) => EthMessageID::NodeData,
364 Self::GetReceipts(_) | Self::GetReceipts70(_) => EthMessageID::GetReceipts,
365 Self::Receipts(_) | Self::Receipts69(_) | Self::Receipts70(_) => EthMessageID::Receipts,
366 Self::BlockRangeUpdate(_) => EthMessageID::BlockRangeUpdate,
367 Self::Other(msg) => EthMessageID::Other(msg.id as u8),
368 }
369 }
370
371 pub const fn is_request(&self) -> bool {
373 matches!(
374 self,
375 Self::GetBlockBodies(_) |
376 Self::GetBlockHeaders(_) |
377 Self::GetReceipts(_) |
378 Self::GetReceipts70(_) |
379 Self::GetPooledTransactions(_) |
380 Self::GetNodeData(_)
381 )
382 }
383
384 pub const fn is_response(&self) -> bool {
386 matches!(
387 self,
388 Self::PooledTransactions(_) |
389 Self::Receipts(_) |
390 Self::Receipts69(_) |
391 Self::Receipts70(_) |
392 Self::BlockHeaders(_) |
393 Self::BlockBodies(_) |
394 Self::NodeData(_)
395 )
396 }
397
398 pub fn map_versioned(self, version: EthVersion) -> Self {
403 if version >= EthVersion::Eth70 {
407 return match self {
408 Self::GetReceipts(pair) => {
409 let RequestPair { request_id, message } = pair;
410 let req = RequestPair {
411 request_id,
412 message: GetReceipts70 {
413 first_block_receipt_index: 0,
414 block_hashes: message.0,
415 },
416 };
417 Self::GetReceipts70(req)
418 }
419 other => other,
420 }
421 }
422
423 self
424 }
425}
426
427impl<N: NetworkPrimitives> Encodable for EthMessage<N> {
428 fn encode(&self, out: &mut dyn BufMut) {
429 match self {
430 Self::Status(status) => status.encode(out),
431 Self::NewBlockHashes(new_block_hashes) => new_block_hashes.encode(out),
432 Self::NewBlock(new_block) => new_block.encode(out),
433 Self::Transactions(transactions) => transactions.encode(out),
434 Self::NewPooledTransactionHashes66(hashes) => hashes.encode(out),
435 Self::NewPooledTransactionHashes68(hashes) => hashes.encode(out),
436 Self::GetBlockHeaders(request) => request.encode(out),
437 Self::BlockHeaders(headers) => headers.encode(out),
438 Self::GetBlockBodies(request) => request.encode(out),
439 Self::BlockBodies(bodies) => bodies.encode(out),
440 Self::GetPooledTransactions(request) => request.encode(out),
441 Self::PooledTransactions(transactions) => transactions.encode(out),
442 Self::GetNodeData(request) => request.encode(out),
443 Self::NodeData(data) => data.encode(out),
444 Self::GetReceipts(request) => request.encode(out),
445 Self::GetReceipts70(request) => request.encode(out),
446 Self::Receipts(receipts) => receipts.encode(out),
447 Self::Receipts69(receipt69) => receipt69.encode(out),
448 Self::Receipts70(receipt70) => receipt70.encode(out),
449 Self::BlockRangeUpdate(block_range_update) => block_range_update.encode(out),
450 Self::Other(unknown) => out.put_slice(&unknown.payload),
451 }
452 }
453 fn length(&self) -> usize {
454 match self {
455 Self::Status(status) => status.length(),
456 Self::NewBlockHashes(new_block_hashes) => new_block_hashes.length(),
457 Self::NewBlock(new_block) => new_block.length(),
458 Self::Transactions(transactions) => transactions.length(),
459 Self::NewPooledTransactionHashes66(hashes) => hashes.length(),
460 Self::NewPooledTransactionHashes68(hashes) => hashes.length(),
461 Self::GetBlockHeaders(request) => request.length(),
462 Self::BlockHeaders(headers) => headers.length(),
463 Self::GetBlockBodies(request) => request.length(),
464 Self::BlockBodies(bodies) => bodies.length(),
465 Self::GetPooledTransactions(request) => request.length(),
466 Self::PooledTransactions(transactions) => transactions.length(),
467 Self::GetNodeData(request) => request.length(),
468 Self::NodeData(data) => data.length(),
469 Self::GetReceipts(request) => request.length(),
470 Self::GetReceipts70(request) => request.length(),
471 Self::Receipts(receipts) => receipts.length(),
472 Self::Receipts69(receipt69) => receipt69.length(),
473 Self::Receipts70(receipt70) => receipt70.length(),
474 Self::BlockRangeUpdate(block_range_update) => block_range_update.length(),
475 Self::Other(unknown) => unknown.length(),
476 }
477 }
478}
479
480#[derive(Clone, Debug, PartialEq, Eq)]
488pub enum EthBroadcastMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
489 NewBlock(Arc<N::NewBlockPayload>),
491 Transactions(SharedTransactions<N::BroadcastedTransaction>),
493}
494
495impl<N: NetworkPrimitives> EthBroadcastMessage<N> {
498 pub const fn message_id(&self) -> EthMessageID {
500 match self {
501 Self::NewBlock(_) => EthMessageID::NewBlock,
502 Self::Transactions(_) => EthMessageID::Transactions,
503 }
504 }
505}
506
507impl<N: NetworkPrimitives> Encodable for EthBroadcastMessage<N> {
508 fn encode(&self, out: &mut dyn BufMut) {
509 match self {
510 Self::NewBlock(new_block) => new_block.encode(out),
511 Self::Transactions(transactions) => transactions.encode(out),
512 }
513 }
514
515 fn length(&self) -> usize {
516 match self {
517 Self::NewBlock(new_block) => new_block.length(),
518 Self::Transactions(transactions) => transactions.length(),
519 }
520 }
521}
522
523#[repr(u8)]
525#[derive(Clone, Copy, Debug, PartialEq, Eq)]
526#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
527pub enum EthMessageID {
528 Status = 0x00,
530 NewBlockHashes = 0x01,
532 Transactions = 0x02,
534 GetBlockHeaders = 0x03,
536 BlockHeaders = 0x04,
538 GetBlockBodies = 0x05,
540 BlockBodies = 0x06,
542 NewBlock = 0x07,
544 NewPooledTransactionHashes = 0x08,
546 GetPooledTransactions = 0x09,
548 PooledTransactions = 0x0a,
550 GetNodeData = 0x0d,
552 NodeData = 0x0e,
554 GetReceipts = 0x0f,
556 Receipts = 0x10,
558 BlockRangeUpdate = 0x11,
562 Other(u8),
564}
565
566impl EthMessageID {
567 pub const fn to_u8(&self) -> u8 {
569 match self {
570 Self::Status => 0x00,
571 Self::NewBlockHashes => 0x01,
572 Self::Transactions => 0x02,
573 Self::GetBlockHeaders => 0x03,
574 Self::BlockHeaders => 0x04,
575 Self::GetBlockBodies => 0x05,
576 Self::BlockBodies => 0x06,
577 Self::NewBlock => 0x07,
578 Self::NewPooledTransactionHashes => 0x08,
579 Self::GetPooledTransactions => 0x09,
580 Self::PooledTransactions => 0x0a,
581 Self::GetNodeData => 0x0d,
582 Self::NodeData => 0x0e,
583 Self::GetReceipts => 0x0f,
584 Self::Receipts => 0x10,
585 Self::BlockRangeUpdate => 0x11,
586 Self::Other(value) => *value, }
588 }
589
590 pub const fn max(version: EthVersion) -> u8 {
592 if version as u8 >= EthVersion::Eth69 as u8 {
593 Self::BlockRangeUpdate.to_u8()
594 } else {
595 Self::Receipts.to_u8()
596 }
597 }
598
599 pub const fn message_count(version: EthVersion) -> u8 {
605 Self::max(version) + 1
606 }
607}
608
609impl Encodable for EthMessageID {
610 fn encode(&self, out: &mut dyn BufMut) {
611 out.put_u8(self.to_u8());
612 }
613 fn length(&self) -> usize {
614 1
615 }
616}
617
618impl Decodable for EthMessageID {
619 fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
620 let id = match buf.first().ok_or(alloy_rlp::Error::InputTooShort)? {
621 0x00 => Self::Status,
622 0x01 => Self::NewBlockHashes,
623 0x02 => Self::Transactions,
624 0x03 => Self::GetBlockHeaders,
625 0x04 => Self::BlockHeaders,
626 0x05 => Self::GetBlockBodies,
627 0x06 => Self::BlockBodies,
628 0x07 => Self::NewBlock,
629 0x08 => Self::NewPooledTransactionHashes,
630 0x09 => Self::GetPooledTransactions,
631 0x0a => Self::PooledTransactions,
632 0x0d => Self::GetNodeData,
633 0x0e => Self::NodeData,
634 0x0f => Self::GetReceipts,
635 0x10 => Self::Receipts,
636 0x11 => Self::BlockRangeUpdate,
637 unknown => Self::Other(*unknown),
638 };
639 buf.advance(1);
640 Ok(id)
641 }
642}
643
644impl TryFrom<usize> for EthMessageID {
645 type Error = &'static str;
646
647 fn try_from(value: usize) -> Result<Self, Self::Error> {
648 match value {
649 0x00 => Ok(Self::Status),
650 0x01 => Ok(Self::NewBlockHashes),
651 0x02 => Ok(Self::Transactions),
652 0x03 => Ok(Self::GetBlockHeaders),
653 0x04 => Ok(Self::BlockHeaders),
654 0x05 => Ok(Self::GetBlockBodies),
655 0x06 => Ok(Self::BlockBodies),
656 0x07 => Ok(Self::NewBlock),
657 0x08 => Ok(Self::NewPooledTransactionHashes),
658 0x09 => Ok(Self::GetPooledTransactions),
659 0x0a => Ok(Self::PooledTransactions),
660 0x0d => Ok(Self::GetNodeData),
661 0x0e => Ok(Self::NodeData),
662 0x0f => Ok(Self::GetReceipts),
663 0x10 => Ok(Self::Receipts),
664 0x11 => Ok(Self::BlockRangeUpdate),
665 _ => Err("Invalid message ID"),
666 }
667 }
668}
669
670#[derive(Clone, Debug, PartialEq, Eq)]
674#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
675#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
676pub struct RequestPair<T> {
677 pub request_id: u64,
679
680 pub message: T,
682}
683
684impl<T> RequestPair<T> {
685 pub fn map<F, R>(self, f: F) -> RequestPair<R>
687 where
688 F: FnOnce(T) -> R,
689 {
690 let Self { request_id, message } = self;
691 RequestPair { request_id, message: f(message) }
692 }
693}
694
695impl<T> Encodable for RequestPair<T>
697where
698 T: Encodable,
699{
700 fn encode(&self, out: &mut dyn alloy_rlp::BufMut) {
701 let header =
702 Header { list: true, payload_length: self.request_id.length() + self.message.length() };
703
704 header.encode(out);
705 self.request_id.encode(out);
706 self.message.encode(out);
707 }
708
709 fn length(&self) -> usize {
710 let mut length = 0;
711 length += self.request_id.length();
712 length += self.message.length();
713 length += length_of_length(length);
714 length
715 }
716}
717
718impl<T> Decodable for RequestPair<T>
720where
721 T: Decodable,
722{
723 fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
724 let header = Header::decode(buf)?;
725
726 let initial_length = buf.len();
727 let request_id = u64::decode(buf)?;
728 let message = T::decode(buf)?;
729
730 let consumed_len = initial_length - buf.len();
733 if consumed_len != header.payload_length {
734 return Err(alloy_rlp::Error::UnexpectedLength)
735 }
736
737 Ok(Self { request_id, message })
738 }
739}
740
741#[cfg(test)]
742mod tests {
743 use super::MessageError;
744 use crate::{
745 message::RequestPair, EthMessage, EthMessageID, EthNetworkPrimitives, EthVersion,
746 GetNodeData, NodeData, ProtocolMessage, RawCapabilityMessage,
747 };
748 use alloy_primitives::hex;
749 use alloy_rlp::{Decodable, Encodable, Error};
750 use reth_ethereum_primitives::BlockBody;
751
752 fn encode<T: Encodable>(value: T) -> Vec<u8> {
753 let mut buf = vec![];
754 value.encode(&mut buf);
755 buf
756 }
757
758 #[test]
759 fn test_removed_message_at_eth67() {
760 let get_node_data = EthMessage::<EthNetworkPrimitives>::GetNodeData(RequestPair {
761 request_id: 1337,
762 message: GetNodeData(vec![]),
763 });
764 let buf = encode(ProtocolMessage {
765 message_type: EthMessageID::GetNodeData,
766 message: get_node_data,
767 });
768 let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
769 crate::EthVersion::Eth67,
770 &mut &buf[..],
771 );
772 assert!(matches!(msg, Err(MessageError::Invalid(..))));
773
774 let node_data = EthMessage::<EthNetworkPrimitives>::NodeData(RequestPair {
775 request_id: 1337,
776 message: NodeData(vec![]),
777 });
778 let buf =
779 encode(ProtocolMessage { message_type: EthMessageID::NodeData, message: node_data });
780 let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
781 crate::EthVersion::Eth67,
782 &mut &buf[..],
783 );
784 assert!(matches!(msg, Err(MessageError::Invalid(..))));
785 }
786
787 #[test]
788 fn request_pair_encode() {
789 let request_pair = RequestPair { request_id: 1337, message: vec![5u8] };
790
791 let expected = hex!("c5820539c105");
798 let got = encode(request_pair);
799 assert_eq!(expected[..], got, "expected: {expected:X?}, got: {got:X?}",);
800 }
801
802 #[test]
803 fn request_pair_decode() {
804 let raw_pair = &hex!("c5820539c105")[..];
805
806 let expected = RequestPair { request_id: 1337, message: vec![5u8] };
807
808 let got = RequestPair::<Vec<u8>>::decode(&mut &*raw_pair).unwrap();
809 assert_eq!(expected.length(), raw_pair.len());
810 assert_eq!(expected, got);
811 }
812
813 #[test]
814 fn malicious_request_pair_decode() {
815 let raw_pair = &hex!("c5820539c20505")[..];
825
826 let result = RequestPair::<Vec<u8>>::decode(&mut &*raw_pair);
827 assert!(matches!(result, Err(Error::UnexpectedLength)));
828 }
829
830 #[test]
831 fn empty_block_bodies_protocol() {
832 let empty_block_bodies =
833 ProtocolMessage::from(EthMessage::<EthNetworkPrimitives>::BlockBodies(RequestPair {
834 request_id: 0,
835 message: Default::default(),
836 }));
837 let mut buf = Vec::new();
838 empty_block_bodies.encode(&mut buf);
839 let decoded =
840 ProtocolMessage::decode_message(EthVersion::Eth68, &mut buf.as_slice()).unwrap();
841 assert_eq!(empty_block_bodies, decoded);
842 }
843
844 #[test]
845 fn empty_block_body_protocol() {
846 let empty_block_bodies =
847 ProtocolMessage::from(EthMessage::<EthNetworkPrimitives>::BlockBodies(RequestPair {
848 request_id: 0,
849 message: vec![BlockBody {
850 transactions: vec![],
851 ommers: vec![],
852 withdrawals: Some(Default::default()),
853 }]
854 .into(),
855 }));
856 let mut buf = Vec::new();
857 empty_block_bodies.encode(&mut buf);
858 let decoded =
859 ProtocolMessage::decode_message(EthVersion::Eth68, &mut buf.as_slice()).unwrap();
860 assert_eq!(empty_block_bodies, decoded);
861 }
862
863 #[test]
864 fn decode_block_bodies_message() {
865 let buf = hex!("06c48199c1c0");
866 let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
867 EthVersion::Eth68,
868 &mut &buf[..],
869 )
870 .unwrap_err();
871 assert!(matches!(msg, MessageError::RlpError(alloy_rlp::Error::InputTooShort)));
872 }
873
874 #[test]
875 fn custom_message_roundtrip() {
876 let custom_payload = vec![1, 2, 3, 4, 5];
877 let custom_message = RawCapabilityMessage::new(0x20, custom_payload.into());
878 let protocol_message = ProtocolMessage::<EthNetworkPrimitives> {
879 message_type: EthMessageID::Other(0x20),
880 message: EthMessage::Other(custom_message),
881 };
882
883 let encoded = encode(protocol_message.clone());
884 let decoded = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
885 EthVersion::Eth68,
886 &mut &encoded[..],
887 )
888 .unwrap();
889
890 assert_eq!(protocol_message, decoded);
891 }
892
893 #[test]
894 fn custom_message_empty_payload_roundtrip() {
895 let custom_message = RawCapabilityMessage::new(0x30, vec![].into());
896 let protocol_message = ProtocolMessage::<EthNetworkPrimitives> {
897 message_type: EthMessageID::Other(0x30),
898 message: EthMessage::Other(custom_message),
899 };
900
901 let encoded = encode(protocol_message.clone());
902 let decoded = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
903 EthVersion::Eth68,
904 &mut &encoded[..],
905 )
906 .unwrap();
907
908 assert_eq!(protocol_message, decoded);
909 }
910
911 #[test]
912 fn decode_status_success() {
913 use crate::{Status, StatusMessage};
914 use alloy_hardforks::{ForkHash, ForkId};
915 use alloy_primitives::{B256, U256};
916
917 let status = Status {
918 version: EthVersion::Eth68,
919 chain: alloy_chains::Chain::mainnet(),
920 total_difficulty: U256::from(100u64),
921 blockhash: B256::random(),
922 genesis: B256::random(),
923 forkid: ForkId { hash: ForkHash([0xb7, 0x15, 0x07, 0x7d]), next: 0 },
924 };
925
926 let protocol_message = ProtocolMessage::<EthNetworkPrimitives>::from(EthMessage::Status(
927 StatusMessage::Legacy(status),
928 ));
929 let encoded = encode(protocol_message);
930
931 let decoded = ProtocolMessage::<EthNetworkPrimitives>::decode_status(
932 EthVersion::Eth68,
933 &mut &encoded[..],
934 )
935 .unwrap();
936
937 assert!(matches!(decoded, StatusMessage::Legacy(s) if s == status));
938 }
939
940 #[test]
941 fn eth_message_id_max_includes_block_range_update() {
942 assert_eq!(EthMessageID::max(EthVersion::Eth69), EthMessageID::BlockRangeUpdate.to_u8(),);
943 assert_eq!(EthMessageID::max(EthVersion::Eth70), EthMessageID::BlockRangeUpdate.to_u8(),);
944 assert_eq!(EthMessageID::max(EthVersion::Eth68), EthMessageID::Receipts.to_u8());
945 }
946
947 #[test]
948 fn decode_status_rejects_non_status() {
949 let msg = EthMessage::<EthNetworkPrimitives>::GetBlockBodies(RequestPair {
950 request_id: 1,
951 message: crate::GetBlockBodies::default(),
952 });
953 let protocol_message =
954 ProtocolMessage { message_type: EthMessageID::GetBlockBodies, message: msg };
955 let encoded = encode(protocol_message);
956
957 let result = ProtocolMessage::<EthNetworkPrimitives>::decode_status(
958 EthVersion::Eth68,
959 &mut &encoded[..],
960 );
961
962 assert!(matches!(
963 result,
964 Err(MessageError::ExpectedStatusMessage(EthMessageID::GetBlockBodies))
965 ));
966 }
967}