1use crate::{EthMessage, EthVersion, NetworkPrimitives};
4use alloc::{sync::Arc, vec::Vec};
5use alloy_primitives::{
6 map::{B256Map, B256Set},
7 Bytes, TxHash, B128, B256, U128,
8};
9use alloy_rlp::{
10 Decodable, Encodable, Header, RlpDecodable, RlpDecodableWrapper, RlpEncodable,
11 RlpEncodableWrapper,
12};
13use core::{fmt::Debug, mem};
14use derive_more::{Constructor, Deref, DerefMut, From, IntoIterator};
15use reth_codecs_derive::{add_arbitrary_tests, generate_tests};
16use reth_ethereum_primitives::TransactionSigned;
17use reth_primitives_traits::{Block, InMemorySize, SignedTransaction};
18
19#[derive(
21 Clone,
22 Debug,
23 PartialEq,
24 Eq,
25 RlpEncodableWrapper,
26 RlpDecodableWrapper,
27 Default,
28 Deref,
29 DerefMut,
30 IntoIterator,
31)]
32#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
33#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
34#[add_arbitrary_tests(rlp)]
35pub struct NewBlockHashes(
36 pub Vec<BlockHashNumber>,
39);
40
41impl NewBlockHashes {
44 pub fn latest(&self) -> Option<&BlockHashNumber> {
46 self.iter().max_by_key(|b| b.number)
47 }
48}
49
50#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable, Default)]
52#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
53#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
54#[add_arbitrary_tests(rlp)]
55pub struct BlockHashNumber {
56 pub hash: B256,
58 pub number: u64,
60}
61
62impl From<Vec<BlockHashNumber>> for NewBlockHashes {
63 fn from(v: Vec<BlockHashNumber>) -> Self {
64 Self(v)
65 }
66}
67
68impl From<NewBlockHashes> for Vec<BlockHashNumber> {
69 fn from(v: NewBlockHashes) -> Self {
70 v.0
71 }
72}
73
74pub trait NewBlockPayload:
76 Encodable + Decodable + Clone + Eq + Debug + Send + Sync + Unpin + 'static
77{
78 type Block: Block;
80
81 fn block(&self) -> &Self::Block;
83}
84
85#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable, Default)]
88#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
89#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
90pub struct NewBlock<B = reth_ethereum_primitives::Block> {
91 pub block: B,
93 pub td: U128,
95}
96
97impl<B: Block + 'static> NewBlockPayload for NewBlock<B> {
98 type Block = B;
99
100 fn block(&self) -> &Self::Block {
101 &self.block
102 }
103}
104
105generate_tests!(#[rlp, 25] NewBlock<reth_ethereum_primitives::Block>, EthNewBlockTests);
106
107#[derive(
110 Clone,
111 Debug,
112 PartialEq,
113 Eq,
114 RlpEncodableWrapper,
115 RlpDecodableWrapper,
116 Default,
117 Deref,
118 IntoIterator,
119)]
120#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
121#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
122#[add_arbitrary_tests(rlp, 10)]
123pub struct Transactions<T = TransactionSigned>(
124 pub Vec<T>,
126);
127
128impl<T: SignedTransaction> Transactions<T> {
129 pub fn has_eip4844(&self) -> bool {
131 self.iter().any(|tx| tx.is_eip4844())
132 }
133}
134
135impl<T> From<Vec<T>> for Transactions<T> {
136 fn from(txs: Vec<T>) -> Self {
137 Self(txs)
138 }
139}
140
141impl<T> From<Transactions<T>> for Vec<T> {
142 fn from(txs: Transactions<T>) -> Self {
143 txs.0
144 }
145}
146
147impl<T: Decodable + InMemorySize> Transactions<T> {
148 pub fn decode_with_memory_budget(
152 buf: &mut &[u8],
153 memory_budget: usize,
154 ) -> alloy_rlp::Result<Self> {
155 decode_list_with_memory_budget(buf, memory_budget).map(Self)
156 }
157}
158
159pub fn decode_list_with_memory_budget<T: Decodable + InMemorySize>(
162 buf: &mut &[u8],
163 memory_budget: usize,
164) -> alloy_rlp::Result<Vec<T>> {
165 let header = Header::decode(buf)?;
166 if !header.list {
167 return Err(alloy_rlp::Error::UnexpectedString);
168 }
169 if buf.len() < header.payload_length {
170 return Err(alloy_rlp::Error::InputTooShort);
171 }
172
173 let (payload, rest) = buf.split_at(header.payload_length);
174 let mut payload = payload;
175
176 let mut txs = Vec::new();
177 let mut total_size = 0usize;
178
179 while !payload.is_empty() {
180 let item = T::decode(&mut payload)?;
181 total_size = total_size.saturating_add(item.size());
182
183 if total_size > memory_budget {
184 break;
185 }
186
187 txs.push(item);
188 }
189
190 *buf = rest;
191 Ok(txs)
192}
193
194#[derive(
199 Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Deref, IntoIterator,
200)]
201#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
202#[add_arbitrary_tests(rlp, 20)]
203pub struct SharedTransactions<T = TransactionSigned>(
204 pub Vec<Arc<T>>,
206);
207
208#[derive(Clone, Debug, PartialEq, Eq)]
210#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
211pub enum NewPooledTransactionHashes {
212 Eth66(NewPooledTransactionHashes66),
214 Eth68(NewPooledTransactionHashes68),
218 Eth72(NewPooledTransactionHashes72),
225}
226
227impl NewPooledTransactionHashes {
230 pub const fn version(&self) -> EthVersion {
232 match self {
233 Self::Eth66(_) => EthVersion::Eth66,
234 Self::Eth68(_) => EthVersion::Eth68,
235 Self::Eth72(_) => EthVersion::Eth72,
236 }
237 }
238
239 pub const fn is_valid_for_version(&self, version: EthVersion) -> bool {
241 match self {
242 Self::Eth66(_) => {
243 matches!(version, EthVersion::Eth67 | EthVersion::Eth66)
244 }
245 Self::Eth68(_) => {
246 matches!(
247 version,
248 EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71
249 )
250 }
251 Self::Eth72(_) => {
252 matches!(version, EthVersion::Eth72)
253 }
254 }
255 }
256
257 pub fn iter_hashes(&self) -> impl Iterator<Item = &B256> + '_ {
259 match self {
260 Self::Eth66(msg) => msg.iter(),
261 Self::Eth68(msg) => msg.hashes.iter(),
262 Self::Eth72(msg) => msg.hashes.iter(),
263 }
264 }
265
266 pub const fn hashes(&self) -> &Vec<B256> {
268 match self {
269 Self::Eth66(msg) => &msg.0,
270 Self::Eth68(msg) => &msg.hashes,
271 Self::Eth72(msg) => &msg.hashes,
272 }
273 }
274
275 pub const fn hashes_mut(&mut self) -> &mut Vec<B256> {
277 match self {
278 Self::Eth66(msg) => &mut msg.0,
279 Self::Eth68(msg) => &mut msg.hashes,
280 Self::Eth72(msg) => &mut msg.hashes,
281 }
282 }
283
284 pub fn into_hashes(self) -> Vec<B256> {
286 match self {
287 Self::Eth66(msg) => msg.0,
288 Self::Eth68(msg) => msg.hashes,
289 Self::Eth72(msg) => msg.hashes,
290 }
291 }
292
293 pub fn into_iter_hashes(self) -> impl Iterator<Item = B256> {
295 match self {
296 Self::Eth66(msg) => msg.into_iter(),
297 Self::Eth68(msg) => msg.hashes.into_iter(),
298 Self::Eth72(msg) => msg.hashes.into_iter(),
299 }
300 }
301
302 pub fn truncate(&mut self, len: usize) {
305 match self {
306 Self::Eth66(msg) => msg.truncate(len),
307 Self::Eth68(msg) => {
308 msg.types.truncate(len);
309 msg.sizes.truncate(len);
310 msg.hashes.truncate(len);
311 }
312 Self::Eth72(msg) => {
313 msg.types.truncate(len);
314 msg.sizes.truncate(len);
315 msg.hashes.truncate(len);
316 }
317 }
318 }
319
320 pub const fn is_empty(&self) -> bool {
322 match self {
323 Self::Eth66(msg) => msg.0.is_empty(),
324 Self::Eth68(msg) => msg.hashes.is_empty(),
325 Self::Eth72(msg) => msg.hashes.is_empty(),
326 }
327 }
328
329 pub const fn len(&self) -> usize {
331 match self {
332 Self::Eth66(msg) => msg.0.len(),
333 Self::Eth68(msg) => msg.hashes.len(),
334 Self::Eth72(msg) => msg.hashes.len(),
335 }
336 }
337
338 pub const fn as_eth72(&self) -> Option<&NewPooledTransactionHashes72> {
340 match self {
341 Self::Eth66(_) | Self::Eth68(_) => None,
342 Self::Eth72(msg) => Some(msg),
343 }
344 }
345
346 pub const fn as_eth72_mut(&mut self) -> Option<&mut NewPooledTransactionHashes72> {
348 match self {
349 Self::Eth66(_) | Self::Eth68(_) => None,
350 Self::Eth72(msg) => Some(msg),
351 }
352 }
353
354 pub const fn as_eth68(&self) -> Option<&NewPooledTransactionHashes68> {
356 match self {
357 Self::Eth66(_) | Self::Eth72(_) => None,
358 Self::Eth68(msg) => Some(msg),
359 }
360 }
361
362 pub const fn as_eth68_mut(&mut self) -> Option<&mut NewPooledTransactionHashes68> {
364 match self {
365 Self::Eth66(_) | Self::Eth72(_) => None,
366 Self::Eth68(msg) => Some(msg),
367 }
368 }
369
370 pub const fn as_eth66_mut(&mut self) -> Option<&mut NewPooledTransactionHashes66> {
372 match self {
373 Self::Eth66(msg) => Some(msg),
374 Self::Eth68(_) | Self::Eth72(_) => None,
375 }
376 }
377
378 pub fn take_eth68(&mut self) -> Option<NewPooledTransactionHashes68> {
380 match self {
381 Self::Eth66(_) | Self::Eth72(_) => None,
382 Self::Eth68(msg) => Some(mem::take(msg)),
383 }
384 }
385
386 pub fn take_eth66(&mut self) -> Option<NewPooledTransactionHashes66> {
388 match self {
389 Self::Eth66(msg) => Some(mem::take(msg)),
390 Self::Eth68(_) | Self::Eth72(_) => None,
391 }
392 }
393}
394
395impl<N: NetworkPrimitives> From<NewPooledTransactionHashes> for EthMessage<N> {
396 fn from(value: NewPooledTransactionHashes) -> Self {
397 match value {
398 NewPooledTransactionHashes::Eth66(msg) => Self::NewPooledTransactionHashes66(msg),
399 NewPooledTransactionHashes::Eth68(msg) => Self::NewPooledTransactionHashes68(msg),
400 NewPooledTransactionHashes::Eth72(msg) => Self::NewPooledTransactionHashes72(msg),
401 }
402 }
403}
404
405impl From<NewPooledTransactionHashes66> for NewPooledTransactionHashes {
406 fn from(hashes: NewPooledTransactionHashes66) -> Self {
407 Self::Eth66(hashes)
408 }
409}
410
411impl From<NewPooledTransactionHashes68> for NewPooledTransactionHashes {
412 fn from(hashes: NewPooledTransactionHashes68) -> Self {
413 Self::Eth68(hashes)
414 }
415}
416
417impl From<NewPooledTransactionHashes72> for NewPooledTransactionHashes {
418 fn from(hashes: NewPooledTransactionHashes72) -> Self {
419 Self::Eth72(hashes)
420 }
421}
422
423#[derive(
426 Clone,
427 Debug,
428 PartialEq,
429 Eq,
430 RlpEncodableWrapper,
431 RlpDecodableWrapper,
432 Default,
433 Deref,
434 DerefMut,
435 IntoIterator,
436)]
437#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
438#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
439#[add_arbitrary_tests(rlp)]
440pub struct NewPooledTransactionHashes66(
441 pub Vec<B256>,
445);
446
447impl From<Vec<B256>> for NewPooledTransactionHashes66 {
448 fn from(v: Vec<B256>) -> Self {
449 Self(v)
450 }
451}
452
453#[derive(Clone, Debug, PartialEq, Eq, Default)]
456#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
457pub struct NewPooledTransactionHashes68 {
458 pub types: Vec<u8>,
482 pub sizes: Vec<usize>,
484 pub hashes: Vec<B256>,
486}
487
488#[cfg(feature = "arbitrary")]
489impl proptest::prelude::Arbitrary for NewPooledTransactionHashes68 {
490 type Parameters = ();
491 fn arbitrary_with(_args: ()) -> Self::Strategy {
492 use proptest::{collection::vec, prelude::*};
493 let vec_length = any::<usize>().prop_map(|x| x % 100 + 1); vec_length
497 .prop_flat_map(|len| {
498 let types_vec = vec(
500 proptest_arbitrary_interop::arb::<reth_ethereum_primitives::TxType>()
501 .prop_map(|ty| ty as u8),
502 len..=len,
503 );
504
505 let sizes_vec = vec(proptest::num::usize::ANY.prop_map(|x| x % 131072), len..=len);
507 let hashes_vec = vec(any::<B256>(), len..=len);
508
509 (types_vec, sizes_vec, hashes_vec)
510 })
511 .prop_map(|(types, sizes, hashes)| Self { types, sizes, hashes })
512 .boxed()
513 }
514
515 type Strategy = proptest::prelude::BoxedStrategy<Self>;
516}
517
518impl NewPooledTransactionHashes68 {
519 pub fn metadata_iter(&self) -> impl Iterator<Item = (&B256, (u8, usize))> {
521 self.hashes.iter().zip(self.types.iter().copied().zip(self.sizes.iter().copied()))
522 }
523
524 pub fn push<T: SignedTransaction>(&mut self, tx: &T) {
526 self.hashes.push(*tx.tx_hash());
527 self.sizes.push(tx.encode_2718_len());
528 self.types.push(tx.ty());
529 }
530
531 pub fn extend<'a, T: SignedTransaction>(&mut self, txs: impl IntoIterator<Item = &'a T>) {
533 for tx in txs {
534 self.push(tx);
535 }
536 }
537
538 pub fn shrink_to_fit(&mut self) {
540 self.hashes.shrink_to_fit();
541 self.sizes.shrink_to_fit();
542 self.types.shrink_to_fit()
543 }
544
545 pub fn with_transaction<T: SignedTransaction>(mut self, tx: &T) -> Self {
547 self.push(tx);
548 self
549 }
550
551 pub fn with_transactions<'a, T: SignedTransaction>(
553 mut self,
554 txs: impl IntoIterator<Item = &'a T>,
555 ) -> Self {
556 self.extend(txs);
557 self
558 }
559}
560
561impl Encodable for NewPooledTransactionHashes68 {
562 fn encode(&self, out: &mut dyn bytes::BufMut) {
563 #[derive(RlpEncodable)]
564 struct EncodableNewPooledTransactionHashes68<'a> {
565 types: &'a [u8],
566 sizes: &'a Vec<usize>,
567 hashes: &'a Vec<B256>,
568 }
569
570 let encodable = EncodableNewPooledTransactionHashes68 {
571 types: &self.types[..],
572 sizes: &self.sizes,
573 hashes: &self.hashes,
574 };
575
576 encodable.encode(out);
577 }
578 fn length(&self) -> usize {
579 #[derive(RlpEncodable)]
580 struct EncodableNewPooledTransactionHashes68<'a> {
581 types: &'a [u8],
582 sizes: &'a Vec<usize>,
583 hashes: &'a Vec<B256>,
584 }
585
586 let encodable = EncodableNewPooledTransactionHashes68 {
587 types: &self.types[..],
588 sizes: &self.sizes,
589 hashes: &self.hashes,
590 };
591
592 encodable.length()
593 }
594}
595
596impl Decodable for NewPooledTransactionHashes68 {
597 fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
598 #[derive(RlpDecodable)]
599 struct EncodableNewPooledTransactionHashes68 {
600 types: Bytes,
601 sizes: Vec<usize>,
602 hashes: Vec<B256>,
603 }
604
605 let encodable = EncodableNewPooledTransactionHashes68::decode(buf)?;
606 let msg = Self {
607 types: encodable.types.into(),
608 sizes: encodable.sizes,
609 hashes: encodable.hashes,
610 };
611
612 if msg.hashes.len() != msg.types.len() {
613 return Err(alloy_rlp::Error::ListLengthMismatch {
614 expected: msg.hashes.len(),
615 got: msg.types.len(),
616 })
617 }
618 if msg.hashes.len() != msg.sizes.len() {
619 return Err(alloy_rlp::Error::ListLengthMismatch {
620 expected: msg.hashes.len(),
621 got: msg.sizes.len(),
622 })
623 }
624
625 Ok(msg)
626 }
627}
628
629#[derive(Clone, Debug, PartialEq, Eq, Default)]
632#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
633pub struct NewPooledTransactionHashes72 {
634 pub types: Vec<u8>,
643 pub sizes: Vec<usize>,
645 pub hashes: Vec<B256>,
647 pub cell_mask: Option<B128>,
654}
655
656#[cfg(feature = "arbitrary")]
657impl proptest::prelude::Arbitrary for NewPooledTransactionHashes72 {
658 type Parameters = ();
659 fn arbitrary_with(_args: ()) -> Self::Strategy {
660 use proptest::{collection::vec, prelude::*};
661 let vec_length = any::<usize>().prop_map(|x| x % 100 + 1); vec_length
665 .prop_flat_map(|len| {
666 let types_vec = vec(
668 proptest_arbitrary_interop::arb::<reth_ethereum_primitives::TxType>()
669 .prop_map(|ty| ty as u8),
670 len..=len,
671 );
672
673 let sizes_vec = vec(proptest::num::usize::ANY.prop_map(|x| x % 131072), len..=len);
675 let hashes_vec = vec(any::<B256>(), len..=len);
676 let cell_mask = any::<Option<B128>>();
677
678 (types_vec, sizes_vec, hashes_vec, cell_mask)
679 })
680 .prop_map(|(types, sizes, hashes, cell_mask)| Self { types, sizes, hashes, cell_mask })
681 .boxed()
682 }
683
684 type Strategy = proptest::prelude::BoxedStrategy<Self>;
685}
686
687impl NewPooledTransactionHashes72 {
688 pub fn metadata_iter(&self) -> impl Iterator<Item = (&B256, (u8, usize))> {
690 self.hashes.iter().zip(self.types.iter().copied().zip(self.sizes.iter().copied()))
691 }
692
693 pub fn push<T: SignedTransaction>(&mut self, tx: &T) {
695 self.hashes.push(*tx.tx_hash());
696 self.sizes.push(tx.encode_2718_len());
697 self.types.push(tx.ty());
698 }
699
700 pub fn extend<'a, T: SignedTransaction>(&mut self, txs: impl IntoIterator<Item = &'a T>) {
702 for tx in txs {
703 self.push(tx);
704 }
705 }
706
707 pub fn shrink_to_fit(&mut self) {
709 self.hashes.shrink_to_fit();
710 self.sizes.shrink_to_fit();
711 self.types.shrink_to_fit()
712 }
713
714 pub fn with_transaction<T: SignedTransaction>(mut self, tx: &T) -> Self {
716 self.push(tx);
717 self
718 }
719
720 pub fn with_transactions<'a, T: SignedTransaction>(
722 mut self,
723 txs: impl IntoIterator<Item = &'a T>,
724 ) -> Self {
725 self.extend(txs);
726 self
727 }
728
729 fn payload_length(&self) -> usize {
730 self.types.as_slice().length() +
731 self.sizes.length() +
732 self.hashes.length() +
733 self.cell_mask.as_ref().map_or(1, Encodable::length)
734 }
735}
736
737impl Encodable for NewPooledTransactionHashes72 {
738 fn encode(&self, out: &mut dyn bytes::BufMut) {
739 Header { list: true, payload_length: self.payload_length() }.encode(out);
740 self.types.as_slice().encode(out);
741 self.sizes.encode(out);
742 self.hashes.encode(out);
743 if let Some(cell_mask) = &self.cell_mask {
744 cell_mask.encode(out);
745 } else {
746 out.put_u8(alloy_rlp::EMPTY_STRING_CODE);
747 }
748 }
749
750 fn length(&self) -> usize {
751 Header { list: true, payload_length: self.payload_length() }.length_with_payload()
752 }
753}
754
755impl Decodable for NewPooledTransactionHashes72 {
756 fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
757 let Header { list, payload_length } = Header::decode(buf)?;
758 if !list {
759 return Err(alloy_rlp::Error::UnexpectedString)
760 }
761 if buf.len() < payload_length {
762 return Err(alloy_rlp::Error::InputTooShort)
763 }
764
765 let (mut payload, rest) = buf.split_at(payload_length);
766 let types = Bytes::decode(&mut payload)?;
767 let sizes = Vec::<usize>::decode(&mut payload)?;
768 let hashes = Vec::<B256>::decode(&mut payload)?;
769 let Some(first_byte) = payload.first().copied() else {
770 return Err(alloy_rlp::Error::InputTooShort)
771 };
772 let cell_mask = if first_byte == alloy_rlp::EMPTY_STRING_CODE {
773 payload = &payload[1..];
774 None
775 } else {
776 Some(B128::decode(&mut payload)?)
777 };
778
779 if !payload.is_empty() {
780 return Err(alloy_rlp::Error::ListLengthMismatch {
781 expected: payload_length,
782 got: payload_length - payload.len(),
783 })
784 }
785
786 let msg = Self { types: types.into(), sizes, hashes, cell_mask };
787
788 if msg.hashes.len() != msg.types.len() {
789 return Err(alloy_rlp::Error::ListLengthMismatch {
790 expected: msg.hashes.len(),
791 got: msg.types.len(),
792 })
793 }
794 if msg.hashes.len() != msg.sizes.len() {
795 return Err(alloy_rlp::Error::ListLengthMismatch {
796 expected: msg.hashes.len(),
797 got: msg.sizes.len(),
798 })
799 }
800
801 *buf = rest;
802
803 Ok(msg)
804 }
805}
806
807pub trait DedupPayload {
809 type Value;
811
812 fn is_empty(&self) -> bool;
814
815 fn len(&self) -> usize;
817
818 fn dedup(self) -> PartiallyValidData<Self::Value>;
820}
821
822pub type Eth68TxMetadata = Option<(u8, usize)>;
824
825impl DedupPayload for NewPooledTransactionHashes {
826 type Value = Eth68TxMetadata;
827
828 fn is_empty(&self) -> bool {
829 self.is_empty()
830 }
831
832 fn len(&self) -> usize {
833 self.len()
834 }
835
836 fn dedup(self) -> PartiallyValidData<Self::Value> {
837 match self {
838 Self::Eth66(msg) => msg.dedup(),
839 Self::Eth68(msg) => msg.dedup(),
840 Self::Eth72(msg) => msg.dedup(),
841 }
842 }
843}
844
845impl DedupPayload for NewPooledTransactionHashes72 {
846 type Value = Eth68TxMetadata;
847
848 fn is_empty(&self) -> bool {
849 self.hashes.is_empty()
850 }
851
852 fn len(&self) -> usize {
853 self.hashes.len()
854 }
855
856 fn dedup(self) -> PartiallyValidData<Self::Value> {
857 let Self { hashes, mut sizes, mut types, .. } = self;
858
859 let mut deduped_data = B256Map::with_capacity_and_hasher(hashes.len(), Default::default());
860
861 for hash in hashes.into_iter().rev() {
862 if let (Some(ty), Some(size)) = (types.pop(), sizes.pop()) {
863 deduped_data.insert(hash, Some((ty, size)));
864 }
865 }
866
867 PartiallyValidData::from_raw_data_eth72(deduped_data)
868 }
869}
870
871impl DedupPayload for NewPooledTransactionHashes68 {
872 type Value = Eth68TxMetadata;
873
874 fn is_empty(&self) -> bool {
875 self.hashes.is_empty()
876 }
877
878 fn len(&self) -> usize {
879 self.hashes.len()
880 }
881
882 fn dedup(self) -> PartiallyValidData<Self::Value> {
883 let Self { hashes, mut sizes, mut types } = self;
884
885 let mut deduped_data = B256Map::with_capacity_and_hasher(hashes.len(), Default::default());
886
887 for hash in hashes.into_iter().rev() {
888 if let (Some(ty), Some(size)) = (types.pop(), sizes.pop()) {
889 deduped_data.insert(hash, Some((ty, size)));
890 }
891 }
892
893 PartiallyValidData::from_raw_data_eth68(deduped_data)
894 }
895}
896
897impl DedupPayload for NewPooledTransactionHashes66 {
898 type Value = Eth68TxMetadata;
899
900 fn is_empty(&self) -> bool {
901 self.0.is_empty()
902 }
903
904 fn len(&self) -> usize {
905 self.0.len()
906 }
907
908 fn dedup(self) -> PartiallyValidData<Self::Value> {
909 let Self(hashes) = self;
910
911 let mut deduped_data = B256Map::with_capacity_and_hasher(hashes.len(), Default::default());
912
913 let noop_value: Eth68TxMetadata = None;
914
915 for hash in hashes.into_iter().rev() {
916 deduped_data.insert(hash, noop_value);
917 }
918
919 PartiallyValidData::from_raw_data_eth66(deduped_data)
920 }
921}
922
923pub trait HandleMempoolData {
926 fn is_empty(&self) -> bool;
928
929 fn len(&self) -> usize;
931
932 fn retain_by_hash(&mut self, f: impl FnMut(&TxHash) -> bool);
934}
935
936pub trait HandleVersionedMempoolData {
938 fn msg_version(&self) -> EthVersion;
941}
942
943impl<T: SignedTransaction> HandleMempoolData for Vec<T> {
944 fn is_empty(&self) -> bool {
945 self.is_empty()
946 }
947
948 fn len(&self) -> usize {
949 self.len()
950 }
951
952 fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
953 self.retain(|tx| f(tx.tx_hash()))
954 }
955}
956
957macro_rules! handle_mempool_data_map_impl {
958 ($data_ty:ty, $(<$generic:ident>)?) => {
959 impl$(<$generic>)? HandleMempoolData for $data_ty {
960 fn is_empty(&self) -> bool {
961 self.data.is_empty()
962 }
963
964 fn len(&self) -> usize {
965 self.data.len()
966 }
967
968 fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
969 self.data.retain(|hash, _| f(hash));
970 }
971 }
972 };
973}
974
975#[derive(Debug, Deref, DerefMut, IntoIterator)]
978pub struct PartiallyValidData<V> {
979 #[deref]
980 #[deref_mut]
981 #[into_iterator]
982 data: B256Map<V>,
983 version: Option<EthVersion>,
984}
985
986handle_mempool_data_map_impl!(PartiallyValidData<V>, <V>);
987
988impl<V> PartiallyValidData<V> {
989 pub const fn from_raw_data(data: B256Map<V>, version: Option<EthVersion>) -> Self {
991 Self { data, version }
992 }
993
994 pub const fn from_raw_data_eth72(data: B256Map<V>) -> Self {
996 Self::from_raw_data(data, Some(EthVersion::Eth72))
997 }
998
999 pub const fn from_raw_data_eth68(data: B256Map<V>) -> Self {
1001 Self::from_raw_data(data, Some(EthVersion::Eth68))
1002 }
1003
1004 pub const fn from_raw_data_eth66(data: B256Map<V>) -> Self {
1006 Self::from_raw_data(data, Some(EthVersion::Eth66))
1007 }
1008
1009 pub fn empty_eth72() -> Self {
1012 Self::from_raw_data_eth72(B256Map::default())
1013 }
1014
1015 pub fn empty_eth68() -> Self {
1018 Self::from_raw_data_eth68(B256Map::default())
1019 }
1020
1021 pub fn empty_eth66() -> Self {
1024 Self::from_raw_data_eth66(B256Map::default())
1025 }
1026
1027 pub const fn msg_version(&self) -> Option<EthVersion> {
1030 self.version
1031 }
1032
1033 pub fn into_data(self) -> B256Map<V> {
1035 self.data
1036 }
1037}
1038
1039#[derive(Debug, Deref, DerefMut, IntoIterator, From)]
1042pub struct ValidAnnouncementData {
1043 #[deref]
1044 #[deref_mut]
1045 #[into_iterator]
1046 data: B256Map<Eth68TxMetadata>,
1047 version: EthVersion,
1048}
1049
1050handle_mempool_data_map_impl!(ValidAnnouncementData,);
1051
1052impl ValidAnnouncementData {
1053 pub fn into_request_hashes(self) -> (RequestTxHashes, EthVersion) {
1056 let hashes = self.data.into_keys().collect::<B256Set>();
1057
1058 (RequestTxHashes::new(hashes), self.version)
1059 }
1060
1061 pub fn from_partially_valid_data(data: PartiallyValidData<Eth68TxMetadata>) -> Self {
1065 let PartiallyValidData { data, version } = data;
1066
1067 let version = version.expect("should have eth version for conversion");
1068
1069 Self { data, version }
1070 }
1071
1072 pub fn into_data(self) -> B256Map<Eth68TxMetadata> {
1074 self.data
1075 }
1076}
1077
1078impl HandleVersionedMempoolData for ValidAnnouncementData {
1079 fn msg_version(&self) -> EthVersion {
1080 self.version
1081 }
1082}
1083
1084#[derive(Debug, Default, Deref, DerefMut, IntoIterator, Constructor)]
1086pub struct RequestTxHashes {
1087 #[deref]
1088 #[deref_mut]
1089 #[into_iterator(owned, ref)]
1090 hashes: B256Set,
1091}
1092
1093impl RequestTxHashes {
1094 pub fn with_capacity(capacity: usize) -> Self {
1099 Self::new(B256Set::with_capacity_and_hasher(capacity, Default::default()))
1100 }
1101
1102 fn empty() -> Self {
1104 Self::new(B256Set::default())
1105 }
1106
1107 pub fn retain_count(&mut self, count: usize) -> Self {
1109 let rest_capacity = self.hashes.len().saturating_sub(count);
1110 if rest_capacity == 0 {
1111 return Self::empty()
1112 }
1113 let mut rest = Self::with_capacity(rest_capacity);
1114
1115 let mut i = 0;
1116 self.hashes.retain(|hash| {
1117 if i >= count {
1118 rest.insert(*hash);
1119 return false
1120 }
1121 i += 1;
1122
1123 true
1124 });
1125
1126 rest
1127 }
1128}
1129
1130impl FromIterator<(TxHash, Eth68TxMetadata)> for RequestTxHashes {
1131 fn from_iter<I: IntoIterator<Item = (TxHash, Eth68TxMetadata)>>(iter: I) -> Self {
1132 Self::new(iter.into_iter().map(|(hash, _)| hash).collect())
1133 }
1134}
1135
1136#[derive(Clone, Debug, PartialEq, Eq, Default, RlpEncodable, RlpDecodable)]
1139#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
1140#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
1141pub struct BlockRangeUpdate {
1142 pub earliest: u64,
1144 pub latest: u64,
1146 pub latest_hash: B256,
1148}
1149
1150impl InMemorySize for NewPooledTransactionHashes {
1151 fn size(&self) -> usize {
1152 match self {
1153 Self::Eth66(msg) => msg.0.len() * core::mem::size_of::<B256>(),
1154 Self::Eth68(msg) => {
1155 msg.types.len() * core::mem::size_of::<u8>() +
1156 msg.sizes.len() * core::mem::size_of::<usize>() +
1157 msg.hashes.len() * core::mem::size_of::<B256>()
1158 }
1159 Self::Eth72(msg) => {
1160 msg.types.len() * core::mem::size_of::<u8>() +
1161 msg.sizes.len() * core::mem::size_of::<usize>() +
1162 msg.hashes.len() * core::mem::size_of::<B256>() +
1163 core::mem::size_of::<B128>()
1164 }
1165 }
1166 }
1167}
1168
1169#[cfg(test)]
1170mod tests {
1171 use super::*;
1172 use alloy_consensus::{transaction::TxHashRef, Typed2718};
1173 use alloy_eips::eip2718::Encodable2718;
1174 use alloy_primitives::{b256, hex, Signature, U256};
1175 use reth_ethereum_primitives::{Transaction, TransactionSigned};
1176 use std::str::FromStr;
1177
1178 fn test_encoding_vector<T: Encodable + Decodable + PartialEq + core::fmt::Debug>(
1181 input: (T, &[u8]),
1182 ) {
1183 let (expected_decoded, expected_encoded) = input;
1184 let mut encoded = Vec::new();
1185 expected_decoded.encode(&mut encoded);
1186
1187 assert_eq!(hex::encode(&encoded), hex::encode(expected_encoded));
1188
1189 let decoded = T::decode(&mut encoded.as_ref()).unwrap();
1190 assert_eq!(expected_decoded, decoded);
1191 }
1192
1193 #[test]
1194 fn can_return_latest_block() {
1195 let mut blocks = NewBlockHashes(vec![BlockHashNumber { hash: B256::random(), number: 0 }]);
1196 let latest = blocks.latest().unwrap();
1197 assert_eq!(latest.number, 0);
1198
1199 blocks.push(BlockHashNumber { hash: B256::random(), number: 100 });
1200 blocks.push(BlockHashNumber { hash: B256::random(), number: 2 });
1201 let latest = blocks.latest().unwrap();
1202 assert_eq!(latest.number, 100);
1203 }
1204
1205 #[test]
1206 fn eth_68_tx_hash_roundtrip() {
1207 let vectors = vec![
1208 (
1209 NewPooledTransactionHashes68 { types: vec![], sizes: vec![], hashes: vec![] },
1210 &hex!("c380c0c0")[..],
1211 ),
1212 (
1213 NewPooledTransactionHashes68 {
1214 types: vec![0x00],
1215 sizes: vec![0x00],
1216 hashes: vec![
1217 B256::from_str(
1218 "0x0000000000000000000000000000000000000000000000000000000000000000",
1219 )
1220 .unwrap(),
1221 ],
1222 },
1223 &hex!(
1224 "e500c180e1a00000000000000000000000000000000000000000000000000000000000000000"
1225 )[..],
1226 ),
1227 (
1228 NewPooledTransactionHashes68 {
1229 types: vec![0x00, 0x00],
1230 sizes: vec![0x00, 0x00],
1231 hashes: vec![
1232 B256::from_str(
1233 "0x0000000000000000000000000000000000000000000000000000000000000000",
1234 )
1235 .unwrap(),
1236 B256::from_str(
1237 "0x0000000000000000000000000000000000000000000000000000000000000000",
1238 )
1239 .unwrap(),
1240 ],
1241 },
1242 &hex!(
1243 "f84a820000c28080f842a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000"
1244 )[..],
1245 ),
1246 (
1247 NewPooledTransactionHashes68 {
1248 types: vec![0x02],
1249 sizes: vec![0xb6],
1250 hashes: vec![
1251 B256::from_str(
1252 "0xfecbed04c7b88d8e7221a0a3f5dc33f220212347fc167459ea5cc9c3eb4c1124",
1253 )
1254 .unwrap(),
1255 ],
1256 },
1257 &hex!(
1258 "e602c281b6e1a0fecbed04c7b88d8e7221a0a3f5dc33f220212347fc167459ea5cc9c3eb4c1124"
1259 )[..],
1260 ),
1261 (
1262 NewPooledTransactionHashes68 {
1263 types: vec![0xff, 0xff],
1264 sizes: vec![0xffffffff, 0xffffffff],
1265 hashes: vec![
1266 B256::from_str(
1267 "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
1268 )
1269 .unwrap(),
1270 B256::from_str(
1271 "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
1272 )
1273 .unwrap(),
1274 ],
1275 },
1276 &hex!(
1277 "f85282ffffca84ffffffff84fffffffff842a0ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffa0ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"
1278 )[..],
1279 ),
1280 (
1281 NewPooledTransactionHashes68 {
1282 types: vec![0xff, 0xff],
1283 sizes: vec![0xffffffff, 0xffffffff],
1284 hashes: vec![
1285 B256::from_str(
1286 "0xbeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafe",
1287 )
1288 .unwrap(),
1289 B256::from_str(
1290 "0xbeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafe",
1291 )
1292 .unwrap(),
1293 ],
1294 },
1295 &hex!(
1296 "f85282ffffca84ffffffff84fffffffff842a0beefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafea0beefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafe"
1297 )[..],
1298 ),
1299 (
1300 NewPooledTransactionHashes68 {
1301 types: vec![0x10, 0x10],
1302 sizes: vec![0xdeadc0de, 0xdeadc0de],
1303 hashes: vec![
1304 B256::from_str(
1305 "0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e4d4e2",
1306 )
1307 .unwrap(),
1308 B256::from_str(
1309 "0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e4d4e2",
1310 )
1311 .unwrap(),
1312 ],
1313 },
1314 &hex!(
1315 "f852821010ca84deadc0de84deadc0def842a03b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e4d4e2a03b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e4d4e2"
1316 )[..],
1317 ),
1318 (
1319 NewPooledTransactionHashes68 {
1320 types: vec![0x6f, 0x6f],
1321 sizes: vec![0x7fffffff, 0x7fffffff],
1322 hashes: vec![
1323 B256::from_str(
1324 "0x0000000000000000000000000000000000000000000000000000000000000002",
1325 )
1326 .unwrap(),
1327 B256::from_str(
1328 "0x0000000000000000000000000000000000000000000000000000000000000002",
1329 )
1330 .unwrap(),
1331 ],
1332 },
1333 &hex!(
1334 "f852826f6fca847fffffff847ffffffff842a00000000000000000000000000000000000000000000000000000000000000002a00000000000000000000000000000000000000000000000000000000000000002"
1335 )[..],
1336 ),
1337 ];
1338
1339 for vector in vectors {
1340 test_encoding_vector(vector);
1341 }
1342 }
1343
1344 #[test]
1345 fn eth_72_tx_hash_roundtrip() {
1346 let vectors = vec![
1347 (
1348 NewPooledTransactionHashes72 {
1349 types: vec![],
1350 sizes: vec![],
1351 hashes: vec![],
1352 cell_mask: None,
1353 },
1354 &hex!("c480c0c080")[..],
1355 ),
1356 (
1357 NewPooledTransactionHashes72 {
1358 types: vec![],
1359 sizes: vec![],
1360 hashes: vec![],
1361 cell_mask: Some(B128::repeat_byte(0x11)),
1362 },
1363 &hex!("d480c0c09011111111111111111111111111111111")[..],
1364 ),
1365 ];
1366
1367 for vector in vectors {
1368 test_encoding_vector(vector);
1369 }
1370 }
1371
1372 #[test]
1373 fn eth_72_rejects_missing_cell_mask() {
1374 let encoded_eth68_payload = hex!("c380c0c0");
1375
1376 let result = NewPooledTransactionHashes72::decode(&mut encoded_eth68_payload.as_ref());
1377
1378 assert!(matches!(result, Err(alloy_rlp::Error::InputTooShort)));
1379 }
1380
1381 #[test]
1382 fn request_hashes_retain_count_keep_subset() {
1383 let mut hashes = RequestTxHashes::new(
1384 [
1385 b256!("0x0000000000000000000000000000000000000000000000000000000000000001"),
1386 b256!("0x0000000000000000000000000000000000000000000000000000000000000002"),
1387 b256!("0x0000000000000000000000000000000000000000000000000000000000000003"),
1388 b256!("0x0000000000000000000000000000000000000000000000000000000000000004"),
1389 b256!("0x0000000000000000000000000000000000000000000000000000000000000005"),
1390 ]
1391 .into_iter()
1392 .collect::<B256Set>(),
1393 );
1394
1395 let rest = hashes.retain_count(3);
1396
1397 assert_eq!(3, hashes.len());
1398 assert_eq!(2, rest.len());
1399 }
1400
1401 #[test]
1402 fn request_hashes_retain_count_keep_all() {
1403 let mut hashes = RequestTxHashes::new(
1404 [
1405 b256!("0x0000000000000000000000000000000000000000000000000000000000000001"),
1406 b256!("0x0000000000000000000000000000000000000000000000000000000000000002"),
1407 b256!("0x0000000000000000000000000000000000000000000000000000000000000003"),
1408 b256!("0x0000000000000000000000000000000000000000000000000000000000000004"),
1409 b256!("0x0000000000000000000000000000000000000000000000000000000000000005"),
1410 ]
1411 .into_iter()
1412 .collect::<B256Set>(),
1413 );
1414
1415 let _ = hashes.retain_count(6);
1416
1417 assert_eq!(5, hashes.len());
1418 }
1419
1420 #[test]
1421 fn split_request_hashes_keep_none() {
1422 let mut hashes = RequestTxHashes::new(
1423 [
1424 b256!("0x0000000000000000000000000000000000000000000000000000000000000001"),
1425 b256!("0x0000000000000000000000000000000000000000000000000000000000000002"),
1426 b256!("0x0000000000000000000000000000000000000000000000000000000000000003"),
1427 b256!("0x0000000000000000000000000000000000000000000000000000000000000004"),
1428 b256!("0x0000000000000000000000000000000000000000000000000000000000000005"),
1429 ]
1430 .into_iter()
1431 .collect::<B256Set>(),
1432 );
1433
1434 let rest = hashes.retain_count(0);
1435
1436 assert_eq!(0, hashes.len());
1437 assert_eq!(5, rest.len());
1438 }
1439
1440 fn signed_transaction() -> impl SignedTransaction {
1441 TransactionSigned::new_unhashed(
1442 Transaction::Legacy(Default::default()),
1443 Signature::new(
1444 U256::from_str(
1445 "0x64b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c12",
1446 )
1447 .unwrap(),
1448 U256::from_str(
1449 "0x64b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c10",
1450 )
1451 .unwrap(),
1452 false,
1453 ),
1454 )
1455 }
1456
1457 #[test]
1458 fn test_pooled_tx_hashes_68_push() {
1459 let tx = signed_transaction();
1460 let mut tx_hashes =
1461 NewPooledTransactionHashes68 { types: vec![], sizes: vec![], hashes: vec![] };
1462 tx_hashes.push(&tx);
1463 assert_eq!(tx_hashes.types.len(), 1);
1464 assert_eq!(tx_hashes.sizes.len(), 1);
1465 assert_eq!(tx_hashes.hashes.len(), 1);
1466 assert_eq!(tx_hashes.types[0], tx.ty());
1467 assert_eq!(tx_hashes.sizes[0], tx.encode_2718_len());
1468 assert_eq!(tx_hashes.hashes[0], *tx.tx_hash());
1469 }
1470
1471 #[test]
1472 fn test_pooled_tx_hashes_68_extend() {
1473 let tx = signed_transaction();
1474 let txs = vec![tx.clone(), tx.clone()];
1475 let mut tx_hashes =
1476 NewPooledTransactionHashes68 { types: vec![], sizes: vec![], hashes: vec![] };
1477 tx_hashes.extend(&txs);
1478 assert_eq!(tx_hashes.types.len(), 2);
1479 assert_eq!(tx_hashes.sizes.len(), 2);
1480 assert_eq!(tx_hashes.hashes.len(), 2);
1481 assert_eq!(tx_hashes.types[0], tx.ty());
1482 assert_eq!(tx_hashes.sizes[0], tx.encode_2718_len());
1483 assert_eq!(tx_hashes.hashes[0], *tx.tx_hash());
1484 assert_eq!(tx_hashes.types[1], tx.ty());
1485 assert_eq!(tx_hashes.sizes[1], tx.encode_2718_len());
1486 assert_eq!(tx_hashes.hashes[1], *tx.tx_hash());
1487 }
1488
1489 #[test]
1490 fn test_pooled_tx_hashes_68_with_transaction() {
1491 let tx = signed_transaction();
1492 let tx_hashes =
1493 NewPooledTransactionHashes68 { types: vec![], sizes: vec![], hashes: vec![] }
1494 .with_transaction(&tx);
1495 assert_eq!(tx_hashes.types.len(), 1);
1496 assert_eq!(tx_hashes.sizes.len(), 1);
1497 assert_eq!(tx_hashes.hashes.len(), 1);
1498 assert_eq!(tx_hashes.types[0], tx.ty());
1499 assert_eq!(tx_hashes.sizes[0], tx.encode_2718_len());
1500 assert_eq!(tx_hashes.hashes[0], *tx.tx_hash());
1501 }
1502
1503 #[test]
1504 fn test_pooled_tx_hashes_68_with_transactions() {
1505 let tx = signed_transaction();
1506 let txs = vec![tx.clone(), tx.clone()];
1507 let tx_hashes =
1508 NewPooledTransactionHashes68 { types: vec![], sizes: vec![], hashes: vec![] }
1509 .with_transactions(&txs);
1510 assert_eq!(tx_hashes.types.len(), 2);
1511 assert_eq!(tx_hashes.sizes.len(), 2);
1512 assert_eq!(tx_hashes.hashes.len(), 2);
1513 assert_eq!(tx_hashes.types[0], tx.ty());
1514 assert_eq!(tx_hashes.sizes[0], tx.encode_2718_len());
1515 assert_eq!(tx_hashes.hashes[0], *tx.tx_hash());
1516 assert_eq!(tx_hashes.types[1], tx.ty());
1517 assert_eq!(tx_hashes.sizes[1], tx.encode_2718_len());
1518 assert_eq!(tx_hashes.hashes[1], *tx.tx_hash());
1519 }
1520}