1use super::{
10 broadcast::NewBlockHashes, BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders,
11 GetNodeData, GetPooledTransactions, GetReceipts, GetReceipts70, NewPooledTransactionHashes66,
12 NewPooledTransactionHashes68, NodeData, PooledTransactions, Receipts, Status, StatusEth69,
13 Transactions,
14};
15use crate::{
16 status::StatusMessage, BlockRangeUpdate, EthNetworkPrimitives, EthVersion, NetworkPrimitives,
17 RawCapabilityMessage, Receipts69, Receipts70, SharedTransactions,
18};
19use alloc::{boxed::Box, string::String, sync::Arc};
20use alloy_primitives::{
21 bytes::{Buf, BufMut},
22 Bytes,
23};
24use alloy_rlp::{length_of_length, Decodable, Encodable, Header};
25use core::fmt::Debug;
26
27pub const MAX_MESSAGE_SIZE: usize = 10 * 1024 * 1024;
30
31#[derive(thiserror::Error, Debug)]
33pub enum MessageError {
34 #[error("message id {1:?} is invalid for version {0:?}")]
36 Invalid(EthVersion, EthMessageID),
37 #[error("RLP error: {0}")]
39 RlpError(#[from] alloy_rlp::Error),
40 #[error("{0}")]
42 Other(String),
43}
44
45#[derive(Clone, Debug, PartialEq, Eq)]
47#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
48pub struct ProtocolMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
49 pub message_type: EthMessageID,
51 #[cfg_attr(
53 feature = "serde",
54 serde(bound = "EthMessage<N>: serde::Serialize + serde::de::DeserializeOwned")
55 )]
56 pub message: EthMessage<N>,
57}
58
59impl<N: NetworkPrimitives> ProtocolMessage<N> {
60 pub fn decode_message(version: EthVersion, buf: &mut &[u8]) -> Result<Self, MessageError> {
64 let message_type = EthMessageID::decode(buf)?;
65
66 let message = match message_type {
69 EthMessageID::Status => EthMessage::Status(if version < EthVersion::Eth69 {
70 StatusMessage::Legacy(Status::decode(buf)?)
71 } else {
72 StatusMessage::Eth69(StatusEth69::decode(buf)?)
73 }),
74 EthMessageID::NewBlockHashes => {
75 EthMessage::NewBlockHashes(NewBlockHashes::decode(buf)?)
76 }
77 EthMessageID::NewBlock => {
78 EthMessage::NewBlock(Box::new(N::NewBlockPayload::decode(buf)?))
79 }
80 EthMessageID::Transactions => EthMessage::Transactions(Transactions::decode(buf)?),
81 EthMessageID::NewPooledTransactionHashes => {
82 if version >= EthVersion::Eth68 {
83 EthMessage::NewPooledTransactionHashes68(NewPooledTransactionHashes68::decode(
84 buf,
85 )?)
86 } else {
87 EthMessage::NewPooledTransactionHashes66(NewPooledTransactionHashes66::decode(
88 buf,
89 )?)
90 }
91 }
92 EthMessageID::GetBlockHeaders => EthMessage::GetBlockHeaders(RequestPair::decode(buf)?),
93 EthMessageID::BlockHeaders => EthMessage::BlockHeaders(RequestPair::decode(buf)?),
94 EthMessageID::GetBlockBodies => EthMessage::GetBlockBodies(RequestPair::decode(buf)?),
95 EthMessageID::BlockBodies => EthMessage::BlockBodies(RequestPair::decode(buf)?),
96 EthMessageID::GetPooledTransactions => {
97 EthMessage::GetPooledTransactions(RequestPair::decode(buf)?)
98 }
99 EthMessageID::PooledTransactions => {
100 EthMessage::PooledTransactions(RequestPair::decode(buf)?)
101 }
102 EthMessageID::GetNodeData => {
103 if version >= EthVersion::Eth67 {
104 return Err(MessageError::Invalid(version, EthMessageID::GetNodeData))
105 }
106 EthMessage::GetNodeData(RequestPair::decode(buf)?)
107 }
108 EthMessageID::NodeData => {
109 if version >= EthVersion::Eth67 {
110 return Err(MessageError::Invalid(version, EthMessageID::GetNodeData))
111 }
112 EthMessage::NodeData(RequestPair::decode(buf)?)
113 }
114 EthMessageID::GetReceipts => {
115 if version >= EthVersion::Eth70 {
116 EthMessage::GetReceipts70(RequestPair::decode(buf)?)
117 } else {
118 EthMessage::GetReceipts(RequestPair::decode(buf)?)
119 }
120 }
121 EthMessageID::Receipts => {
122 match version {
123 v if v >= EthVersion::Eth70 => {
124 EthMessage::Receipts70(RequestPair::decode(buf)?)
128 }
129 EthVersion::Eth69 => {
130 EthMessage::Receipts69(RequestPair::decode(buf)?)
132 }
133 _ => {
134 EthMessage::Receipts(RequestPair::decode(buf)?)
136 }
137 }
138 }
139 EthMessageID::BlockRangeUpdate => {
140 if version < EthVersion::Eth69 {
141 return Err(MessageError::Invalid(version, EthMessageID::BlockRangeUpdate))
142 }
143 EthMessage::BlockRangeUpdate(BlockRangeUpdate::decode(buf)?)
144 }
145 EthMessageID::Other(_) => {
146 let raw_payload = Bytes::copy_from_slice(buf);
147 buf.advance(raw_payload.len());
148 EthMessage::Other(RawCapabilityMessage::new(
149 message_type.to_u8() as usize,
150 raw_payload.into(),
151 ))
152 }
153 };
154 Ok(Self { message_type, message })
155 }
156}
157
158impl<N: NetworkPrimitives> Encodable for ProtocolMessage<N> {
159 fn encode(&self, out: &mut dyn BufMut) {
162 self.message_type.encode(out);
163 self.message.encode(out);
164 }
165 fn length(&self) -> usize {
166 self.message_type.length() + self.message.length()
167 }
168}
169
170impl<N: NetworkPrimitives> From<EthMessage<N>> for ProtocolMessage<N> {
171 fn from(message: EthMessage<N>) -> Self {
172 Self { message_type: message.message_id(), message }
173 }
174}
175
176#[derive(Clone, Debug)]
178pub struct ProtocolBroadcastMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
179 pub message_type: EthMessageID,
181 pub message: EthBroadcastMessage<N>,
184}
185
186impl<N: NetworkPrimitives> Encodable for ProtocolBroadcastMessage<N> {
187 fn encode(&self, out: &mut dyn BufMut) {
190 self.message_type.encode(out);
191 self.message.encode(out);
192 }
193 fn length(&self) -> usize {
194 self.message_type.length() + self.message.length()
195 }
196}
197
198impl<N: NetworkPrimitives> From<EthBroadcastMessage<N>> for ProtocolBroadcastMessage<N> {
199 fn from(message: EthBroadcastMessage<N>) -> Self {
200 Self { message_type: message.message_id(), message }
201 }
202}
203
204#[derive(Clone, Debug, PartialEq, Eq)]
228#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
229pub enum EthMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
230 Status(StatusMessage),
232 NewBlockHashes(NewBlockHashes),
234 #[cfg_attr(
236 feature = "serde",
237 serde(bound = "N::NewBlockPayload: serde::Serialize + serde::de::DeserializeOwned")
238 )]
239 NewBlock(Box<N::NewBlockPayload>),
240 #[cfg_attr(
242 feature = "serde",
243 serde(bound = "N::BroadcastedTransaction: serde::Serialize + serde::de::DeserializeOwned")
244 )]
245 Transactions(Transactions<N::BroadcastedTransaction>),
246 NewPooledTransactionHashes66(NewPooledTransactionHashes66),
248 NewPooledTransactionHashes68(NewPooledTransactionHashes68),
250 GetBlockHeaders(RequestPair<GetBlockHeaders>),
253 #[cfg_attr(
255 feature = "serde",
256 serde(bound = "N::BlockHeader: serde::Serialize + serde::de::DeserializeOwned")
257 )]
258 BlockHeaders(RequestPair<BlockHeaders<N::BlockHeader>>),
259 GetBlockBodies(RequestPair<GetBlockBodies>),
261 #[cfg_attr(
263 feature = "serde",
264 serde(bound = "N::BlockBody: serde::Serialize + serde::de::DeserializeOwned")
265 )]
266 BlockBodies(RequestPair<BlockBodies<N::BlockBody>>),
267 GetPooledTransactions(RequestPair<GetPooledTransactions>),
269 #[cfg_attr(
271 feature = "serde",
272 serde(bound = "N::PooledTransaction: serde::Serialize + serde::de::DeserializeOwned")
273 )]
274 PooledTransactions(RequestPair<PooledTransactions<N::PooledTransaction>>),
275 GetNodeData(RequestPair<GetNodeData>),
277 NodeData(RequestPair<NodeData>),
279 GetReceipts(RequestPair<GetReceipts>),
281 GetReceipts70(RequestPair<GetReceipts70>),
287 #[cfg_attr(
289 feature = "serde",
290 serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
291 )]
292 Receipts(RequestPair<Receipts<N::Receipt>>),
293 #[cfg_attr(
295 feature = "serde",
296 serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
297 )]
298 Receipts69(RequestPair<Receipts69<N::Receipt>>),
299 #[cfg_attr(
301 feature = "serde",
302 serde(bound = "N::Receipt: serde::Serialize + serde::de::DeserializeOwned")
303 )]
304 Receipts70(RequestPair<Receipts70<N::Receipt>>),
309 #[cfg_attr(
311 feature = "serde",
312 serde(bound = "N::BroadcastedTransaction: serde::Serialize + serde::de::DeserializeOwned")
313 )]
314 BlockRangeUpdate(BlockRangeUpdate),
315 Other(RawCapabilityMessage),
317}
318
319impl<N: NetworkPrimitives> EthMessage<N> {
320 pub const fn message_id(&self) -> EthMessageID {
322 match self {
323 Self::Status(_) => EthMessageID::Status,
324 Self::NewBlockHashes(_) => EthMessageID::NewBlockHashes,
325 Self::NewBlock(_) => EthMessageID::NewBlock,
326 Self::Transactions(_) => EthMessageID::Transactions,
327 Self::NewPooledTransactionHashes66(_) | Self::NewPooledTransactionHashes68(_) => {
328 EthMessageID::NewPooledTransactionHashes
329 }
330 Self::GetBlockHeaders(_) => EthMessageID::GetBlockHeaders,
331 Self::BlockHeaders(_) => EthMessageID::BlockHeaders,
332 Self::GetBlockBodies(_) => EthMessageID::GetBlockBodies,
333 Self::BlockBodies(_) => EthMessageID::BlockBodies,
334 Self::GetPooledTransactions(_) => EthMessageID::GetPooledTransactions,
335 Self::PooledTransactions(_) => EthMessageID::PooledTransactions,
336 Self::GetNodeData(_) => EthMessageID::GetNodeData,
337 Self::NodeData(_) => EthMessageID::NodeData,
338 Self::GetReceipts(_) | Self::GetReceipts70(_) => EthMessageID::GetReceipts,
339 Self::Receipts(_) | Self::Receipts69(_) | Self::Receipts70(_) => EthMessageID::Receipts,
340 Self::BlockRangeUpdate(_) => EthMessageID::BlockRangeUpdate,
341 Self::Other(msg) => EthMessageID::Other(msg.id as u8),
342 }
343 }
344
345 pub const fn is_request(&self) -> bool {
347 matches!(
348 self,
349 Self::GetBlockBodies(_) |
350 Self::GetBlockHeaders(_) |
351 Self::GetReceipts(_) |
352 Self::GetReceipts70(_) |
353 Self::GetPooledTransactions(_) |
354 Self::GetNodeData(_)
355 )
356 }
357
358 pub const fn is_response(&self) -> bool {
360 matches!(
361 self,
362 Self::PooledTransactions(_) |
363 Self::Receipts(_) |
364 Self::Receipts69(_) |
365 Self::Receipts70(_) |
366 Self::BlockHeaders(_) |
367 Self::BlockBodies(_) |
368 Self::NodeData(_)
369 )
370 }
371
372 pub fn map_versioned(self, version: EthVersion) -> Self {
377 if version >= EthVersion::Eth70 {
381 return match self {
382 Self::GetReceipts(pair) => {
383 let RequestPair { request_id, message } = pair;
384 let req = RequestPair {
385 request_id,
386 message: GetReceipts70 {
387 first_block_receipt_index: 0,
388 block_hashes: message.0,
389 },
390 };
391 Self::GetReceipts70(req)
392 }
393 other => other,
394 }
395 }
396
397 self
398 }
399}
400
401impl<N: NetworkPrimitives> Encodable for EthMessage<N> {
402 fn encode(&self, out: &mut dyn BufMut) {
403 match self {
404 Self::Status(status) => status.encode(out),
405 Self::NewBlockHashes(new_block_hashes) => new_block_hashes.encode(out),
406 Self::NewBlock(new_block) => new_block.encode(out),
407 Self::Transactions(transactions) => transactions.encode(out),
408 Self::NewPooledTransactionHashes66(hashes) => hashes.encode(out),
409 Self::NewPooledTransactionHashes68(hashes) => hashes.encode(out),
410 Self::GetBlockHeaders(request) => request.encode(out),
411 Self::BlockHeaders(headers) => headers.encode(out),
412 Self::GetBlockBodies(request) => request.encode(out),
413 Self::BlockBodies(bodies) => bodies.encode(out),
414 Self::GetPooledTransactions(request) => request.encode(out),
415 Self::PooledTransactions(transactions) => transactions.encode(out),
416 Self::GetNodeData(request) => request.encode(out),
417 Self::NodeData(data) => data.encode(out),
418 Self::GetReceipts(request) => request.encode(out),
419 Self::GetReceipts70(request) => request.encode(out),
420 Self::Receipts(receipts) => receipts.encode(out),
421 Self::Receipts69(receipt69) => receipt69.encode(out),
422 Self::Receipts70(receipt70) => receipt70.encode(out),
423 Self::BlockRangeUpdate(block_range_update) => block_range_update.encode(out),
424 Self::Other(unknown) => out.put_slice(&unknown.payload),
425 }
426 }
427 fn length(&self) -> usize {
428 match self {
429 Self::Status(status) => status.length(),
430 Self::NewBlockHashes(new_block_hashes) => new_block_hashes.length(),
431 Self::NewBlock(new_block) => new_block.length(),
432 Self::Transactions(transactions) => transactions.length(),
433 Self::NewPooledTransactionHashes66(hashes) => hashes.length(),
434 Self::NewPooledTransactionHashes68(hashes) => hashes.length(),
435 Self::GetBlockHeaders(request) => request.length(),
436 Self::BlockHeaders(headers) => headers.length(),
437 Self::GetBlockBodies(request) => request.length(),
438 Self::BlockBodies(bodies) => bodies.length(),
439 Self::GetPooledTransactions(request) => request.length(),
440 Self::PooledTransactions(transactions) => transactions.length(),
441 Self::GetNodeData(request) => request.length(),
442 Self::NodeData(data) => data.length(),
443 Self::GetReceipts(request) => request.length(),
444 Self::GetReceipts70(request) => request.length(),
445 Self::Receipts(receipts) => receipts.length(),
446 Self::Receipts69(receipt69) => receipt69.length(),
447 Self::Receipts70(receipt70) => receipt70.length(),
448 Self::BlockRangeUpdate(block_range_update) => block_range_update.length(),
449 Self::Other(unknown) => unknown.length(),
450 }
451 }
452}
453
454#[derive(Clone, Debug, PartialEq, Eq)]
462pub enum EthBroadcastMessage<N: NetworkPrimitives = EthNetworkPrimitives> {
463 NewBlock(Arc<N::NewBlockPayload>),
465 Transactions(SharedTransactions<N::BroadcastedTransaction>),
467}
468
469impl<N: NetworkPrimitives> EthBroadcastMessage<N> {
472 pub const fn message_id(&self) -> EthMessageID {
474 match self {
475 Self::NewBlock(_) => EthMessageID::NewBlock,
476 Self::Transactions(_) => EthMessageID::Transactions,
477 }
478 }
479}
480
481impl<N: NetworkPrimitives> Encodable for EthBroadcastMessage<N> {
482 fn encode(&self, out: &mut dyn BufMut) {
483 match self {
484 Self::NewBlock(new_block) => new_block.encode(out),
485 Self::Transactions(transactions) => transactions.encode(out),
486 }
487 }
488
489 fn length(&self) -> usize {
490 match self {
491 Self::NewBlock(new_block) => new_block.length(),
492 Self::Transactions(transactions) => transactions.length(),
493 }
494 }
495}
496
497#[repr(u8)]
499#[derive(Clone, Copy, Debug, PartialEq, Eq)]
500#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
501pub enum EthMessageID {
502 Status = 0x00,
504 NewBlockHashes = 0x01,
506 Transactions = 0x02,
508 GetBlockHeaders = 0x03,
510 BlockHeaders = 0x04,
512 GetBlockBodies = 0x05,
514 BlockBodies = 0x06,
516 NewBlock = 0x07,
518 NewPooledTransactionHashes = 0x08,
520 GetPooledTransactions = 0x09,
522 PooledTransactions = 0x0a,
524 GetNodeData = 0x0d,
526 NodeData = 0x0e,
528 GetReceipts = 0x0f,
530 Receipts = 0x10,
532 BlockRangeUpdate = 0x11,
536 Other(u8),
538}
539
540impl EthMessageID {
541 pub const fn to_u8(&self) -> u8 {
543 match self {
544 Self::Status => 0x00,
545 Self::NewBlockHashes => 0x01,
546 Self::Transactions => 0x02,
547 Self::GetBlockHeaders => 0x03,
548 Self::BlockHeaders => 0x04,
549 Self::GetBlockBodies => 0x05,
550 Self::BlockBodies => 0x06,
551 Self::NewBlock => 0x07,
552 Self::NewPooledTransactionHashes => 0x08,
553 Self::GetPooledTransactions => 0x09,
554 Self::PooledTransactions => 0x0a,
555 Self::GetNodeData => 0x0d,
556 Self::NodeData => 0x0e,
557 Self::GetReceipts => 0x0f,
558 Self::Receipts => 0x10,
559 Self::BlockRangeUpdate => 0x11,
560 Self::Other(value) => *value, }
562 }
563
564 pub const fn max(version: EthVersion) -> u8 {
566 if version.is_eth69() {
567 Self::BlockRangeUpdate.to_u8()
568 } else {
569 Self::Receipts.to_u8()
570 }
571 }
572
573 pub const fn message_count(version: EthVersion) -> u8 {
579 Self::max(version) + 1
580 }
581}
582
583impl Encodable for EthMessageID {
584 fn encode(&self, out: &mut dyn BufMut) {
585 out.put_u8(self.to_u8());
586 }
587 fn length(&self) -> usize {
588 1
589 }
590}
591
592impl Decodable for EthMessageID {
593 fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
594 let id = match buf.first().ok_or(alloy_rlp::Error::InputTooShort)? {
595 0x00 => Self::Status,
596 0x01 => Self::NewBlockHashes,
597 0x02 => Self::Transactions,
598 0x03 => Self::GetBlockHeaders,
599 0x04 => Self::BlockHeaders,
600 0x05 => Self::GetBlockBodies,
601 0x06 => Self::BlockBodies,
602 0x07 => Self::NewBlock,
603 0x08 => Self::NewPooledTransactionHashes,
604 0x09 => Self::GetPooledTransactions,
605 0x0a => Self::PooledTransactions,
606 0x0d => Self::GetNodeData,
607 0x0e => Self::NodeData,
608 0x0f => Self::GetReceipts,
609 0x10 => Self::Receipts,
610 0x11 => Self::BlockRangeUpdate,
611 unknown => Self::Other(*unknown),
612 };
613 buf.advance(1);
614 Ok(id)
615 }
616}
617
618impl TryFrom<usize> for EthMessageID {
619 type Error = &'static str;
620
621 fn try_from(value: usize) -> Result<Self, Self::Error> {
622 match value {
623 0x00 => Ok(Self::Status),
624 0x01 => Ok(Self::NewBlockHashes),
625 0x02 => Ok(Self::Transactions),
626 0x03 => Ok(Self::GetBlockHeaders),
627 0x04 => Ok(Self::BlockHeaders),
628 0x05 => Ok(Self::GetBlockBodies),
629 0x06 => Ok(Self::BlockBodies),
630 0x07 => Ok(Self::NewBlock),
631 0x08 => Ok(Self::NewPooledTransactionHashes),
632 0x09 => Ok(Self::GetPooledTransactions),
633 0x0a => Ok(Self::PooledTransactions),
634 0x0d => Ok(Self::GetNodeData),
635 0x0e => Ok(Self::NodeData),
636 0x0f => Ok(Self::GetReceipts),
637 0x10 => Ok(Self::Receipts),
638 0x11 => Ok(Self::BlockRangeUpdate),
639 _ => Err("Invalid message ID"),
640 }
641 }
642}
643
644#[derive(Clone, Debug, PartialEq, Eq)]
648#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
649#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
650pub struct RequestPair<T> {
651 pub request_id: u64,
653
654 pub message: T,
656}
657
658impl<T> RequestPair<T> {
659 pub fn map<F, R>(self, f: F) -> RequestPair<R>
661 where
662 F: FnOnce(T) -> R,
663 {
664 let Self { request_id, message } = self;
665 RequestPair { request_id, message: f(message) }
666 }
667}
668
669impl<T> Encodable for RequestPair<T>
671where
672 T: Encodable,
673{
674 fn encode(&self, out: &mut dyn alloy_rlp::BufMut) {
675 let header =
676 Header { list: true, payload_length: self.request_id.length() + self.message.length() };
677
678 header.encode(out);
679 self.request_id.encode(out);
680 self.message.encode(out);
681 }
682
683 fn length(&self) -> usize {
684 let mut length = 0;
685 length += self.request_id.length();
686 length += self.message.length();
687 length += length_of_length(length);
688 length
689 }
690}
691
692impl<T> Decodable for RequestPair<T>
694where
695 T: Decodable,
696{
697 fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
698 let header = Header::decode(buf)?;
699
700 let initial_length = buf.len();
701 let request_id = u64::decode(buf)?;
702 let message = T::decode(buf)?;
703
704 let consumed_len = initial_length - buf.len();
707 if consumed_len != header.payload_length {
708 return Err(alloy_rlp::Error::UnexpectedLength)
709 }
710
711 Ok(Self { request_id, message })
712 }
713}
714
715#[cfg(test)]
716mod tests {
717 use super::MessageError;
718 use crate::{
719 message::RequestPair, EthMessage, EthMessageID, EthNetworkPrimitives, EthVersion,
720 GetNodeData, NodeData, ProtocolMessage, RawCapabilityMessage,
721 };
722 use alloy_primitives::hex;
723 use alloy_rlp::{Decodable, Encodable, Error};
724 use reth_ethereum_primitives::BlockBody;
725
726 fn encode<T: Encodable>(value: T) -> Vec<u8> {
727 let mut buf = vec![];
728 value.encode(&mut buf);
729 buf
730 }
731
732 #[test]
733 fn test_removed_message_at_eth67() {
734 let get_node_data = EthMessage::<EthNetworkPrimitives>::GetNodeData(RequestPair {
735 request_id: 1337,
736 message: GetNodeData(vec![]),
737 });
738 let buf = encode(ProtocolMessage {
739 message_type: EthMessageID::GetNodeData,
740 message: get_node_data,
741 });
742 let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
743 crate::EthVersion::Eth67,
744 &mut &buf[..],
745 );
746 assert!(matches!(msg, Err(MessageError::Invalid(..))));
747
748 let node_data = EthMessage::<EthNetworkPrimitives>::NodeData(RequestPair {
749 request_id: 1337,
750 message: NodeData(vec![]),
751 });
752 let buf =
753 encode(ProtocolMessage { message_type: EthMessageID::NodeData, message: node_data });
754 let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
755 crate::EthVersion::Eth67,
756 &mut &buf[..],
757 );
758 assert!(matches!(msg, Err(MessageError::Invalid(..))));
759 }
760
761 #[test]
762 fn request_pair_encode() {
763 let request_pair = RequestPair { request_id: 1337, message: vec![5u8] };
764
765 let expected = hex!("c5820539c105");
772 let got = encode(request_pair);
773 assert_eq!(expected[..], got, "expected: {expected:X?}, got: {got:X?}",);
774 }
775
776 #[test]
777 fn request_pair_decode() {
778 let raw_pair = &hex!("c5820539c105")[..];
779
780 let expected = RequestPair { request_id: 1337, message: vec![5u8] };
781
782 let got = RequestPair::<Vec<u8>>::decode(&mut &*raw_pair).unwrap();
783 assert_eq!(expected.length(), raw_pair.len());
784 assert_eq!(expected, got);
785 }
786
787 #[test]
788 fn malicious_request_pair_decode() {
789 let raw_pair = &hex!("c5820539c20505")[..];
799
800 let result = RequestPair::<Vec<u8>>::decode(&mut &*raw_pair);
801 assert!(matches!(result, Err(Error::UnexpectedLength)));
802 }
803
804 #[test]
805 fn empty_block_bodies_protocol() {
806 let empty_block_bodies =
807 ProtocolMessage::from(EthMessage::<EthNetworkPrimitives>::BlockBodies(RequestPair {
808 request_id: 0,
809 message: Default::default(),
810 }));
811 let mut buf = Vec::new();
812 empty_block_bodies.encode(&mut buf);
813 let decoded =
814 ProtocolMessage::decode_message(EthVersion::Eth68, &mut buf.as_slice()).unwrap();
815 assert_eq!(empty_block_bodies, decoded);
816 }
817
818 #[test]
819 fn empty_block_body_protocol() {
820 let empty_block_bodies =
821 ProtocolMessage::from(EthMessage::<EthNetworkPrimitives>::BlockBodies(RequestPair {
822 request_id: 0,
823 message: vec![BlockBody {
824 transactions: vec![],
825 ommers: vec![],
826 withdrawals: Some(Default::default()),
827 }]
828 .into(),
829 }));
830 let mut buf = Vec::new();
831 empty_block_bodies.encode(&mut buf);
832 let decoded =
833 ProtocolMessage::decode_message(EthVersion::Eth68, &mut buf.as_slice()).unwrap();
834 assert_eq!(empty_block_bodies, decoded);
835 }
836
837 #[test]
838 fn decode_block_bodies_message() {
839 let buf = hex!("06c48199c1c0");
840 let msg = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
841 EthVersion::Eth68,
842 &mut &buf[..],
843 )
844 .unwrap_err();
845 assert!(matches!(msg, MessageError::RlpError(alloy_rlp::Error::InputTooShort)));
846 }
847
848 #[test]
849 fn custom_message_roundtrip() {
850 let custom_payload = vec![1, 2, 3, 4, 5];
851 let custom_message = RawCapabilityMessage::new(0x20, custom_payload.into());
852 let protocol_message = ProtocolMessage::<EthNetworkPrimitives> {
853 message_type: EthMessageID::Other(0x20),
854 message: EthMessage::Other(custom_message),
855 };
856
857 let encoded = encode(protocol_message.clone());
858 let decoded = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
859 EthVersion::Eth68,
860 &mut &encoded[..],
861 )
862 .unwrap();
863
864 assert_eq!(protocol_message, decoded);
865 }
866
867 #[test]
868 fn custom_message_empty_payload_roundtrip() {
869 let custom_message = RawCapabilityMessage::new(0x30, vec![].into());
870 let protocol_message = ProtocolMessage::<EthNetworkPrimitives> {
871 message_type: EthMessageID::Other(0x30),
872 message: EthMessage::Other(custom_message),
873 };
874
875 let encoded = encode(protocol_message.clone());
876 let decoded = ProtocolMessage::<EthNetworkPrimitives>::decode_message(
877 EthVersion::Eth68,
878 &mut &encoded[..],
879 )
880 .unwrap();
881
882 assert_eq!(protocol_message, decoded);
883 }
884}