Skip to main content

reth_eth_wire_types/
broadcast.rs

1//! Types for broadcasting new data.
2
3use crate::{EthMessage, EthVersion, NetworkPrimitives};
4use alloc::{sync::Arc, vec::Vec};
5use alloy_primitives::{
6    map::{HashMap, HashSet},
7    Bytes, TxHash, B256, U128,
8};
9use alloy_rlp::{
10    Decodable, Encodable, RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper,
11};
12use core::{fmt::Debug, mem};
13use derive_more::{Constructor, Deref, DerefMut, From, IntoIterator};
14use reth_codecs_derive::{add_arbitrary_tests, generate_tests};
15use reth_ethereum_primitives::TransactionSigned;
16use reth_primitives_traits::{Block, SignedTransaction};
17
18/// This informs peers of new blocks that have appeared on the network.
19#[derive(
20    Clone,
21    Debug,
22    PartialEq,
23    Eq,
24    RlpEncodableWrapper,
25    RlpDecodableWrapper,
26    Default,
27    Deref,
28    DerefMut,
29    IntoIterator,
30)]
31#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
32#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
33#[add_arbitrary_tests(rlp)]
34pub struct NewBlockHashes(
35    /// New block hashes and the block number for each blockhash.
36    /// Clients should request blocks using a [`GetBlockBodies`](crate::GetBlockBodies) message.
37    pub Vec<BlockHashNumber>,
38);
39
40// === impl NewBlockHashes ===
41
42impl NewBlockHashes {
43    /// Returns the latest block in the list of blocks.
44    pub fn latest(&self) -> Option<&BlockHashNumber> {
45        self.iter().max_by_key(|b| b.number)
46    }
47}
48
49/// A block hash _and_ a block number.
50#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable, Default)]
51#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
52#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
53#[add_arbitrary_tests(rlp)]
54pub struct BlockHashNumber {
55    /// The block hash
56    pub hash: B256,
57    /// The block number
58    pub number: u64,
59}
60
61impl From<Vec<BlockHashNumber>> for NewBlockHashes {
62    fn from(v: Vec<BlockHashNumber>) -> Self {
63        Self(v)
64    }
65}
66
67impl From<NewBlockHashes> for Vec<BlockHashNumber> {
68    fn from(v: NewBlockHashes) -> Self {
69        v.0
70    }
71}
72
73/// A trait for block payloads transmitted through p2p.
74pub trait NewBlockPayload:
75    Encodable + Decodable + Clone + Eq + Debug + Send + Sync + Unpin + 'static
76{
77    /// The block type.
78    type Block: Block;
79
80    /// Returns a reference to the block.
81    fn block(&self) -> &Self::Block;
82}
83
84/// A new block with the current total difficulty, which includes the difficulty of the returned
85/// block.
86#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable, Default)]
87#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
88#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
89pub struct NewBlock<B = reth_ethereum_primitives::Block> {
90    /// A new block.
91    pub block: B,
92    /// The current total difficulty.
93    pub td: U128,
94}
95
96impl<B: Block + 'static> NewBlockPayload for NewBlock<B> {
97    type Block = B;
98
99    fn block(&self) -> &Self::Block {
100        &self.block
101    }
102}
103
104generate_tests!(#[rlp, 25] NewBlock<reth_ethereum_primitives::Block>, EthNewBlockTests);
105
106/// This informs peers of transactions that have appeared on the network and are not yet included
107/// in a block.
108#[derive(
109    Clone,
110    Debug,
111    PartialEq,
112    Eq,
113    RlpEncodableWrapper,
114    RlpDecodableWrapper,
115    Default,
116    Deref,
117    IntoIterator,
118)]
119#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
120#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
121#[add_arbitrary_tests(rlp, 10)]
122pub struct Transactions<T = TransactionSigned>(
123    /// New transactions for the peer to include in its mempool.
124    pub Vec<T>,
125);
126
127impl<T: SignedTransaction> Transactions<T> {
128    /// Returns `true` if the list of transactions contains any blob transactions.
129    pub fn has_eip4844(&self) -> bool {
130        self.iter().any(|tx| tx.is_eip4844())
131    }
132}
133
134impl<T> From<Vec<T>> for Transactions<T> {
135    fn from(txs: Vec<T>) -> Self {
136        Self(txs)
137    }
138}
139
140impl<T> From<Transactions<T>> for Vec<T> {
141    fn from(txs: Transactions<T>) -> Self {
142        txs.0
143    }
144}
145
146/// Same as [`Transactions`] but this is intended as egress message send from local to _many_ peers.
147///
148/// The list of transactions is constructed on per-peers basis, but the underlying transaction
149/// objects are shared.
150#[derive(
151    Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Deref, IntoIterator,
152)]
153#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
154#[add_arbitrary_tests(rlp, 20)]
155pub struct SharedTransactions<T = TransactionSigned>(
156    /// New transactions for the peer to include in its mempool.
157    pub Vec<Arc<T>>,
158);
159
160/// A wrapper type for all different new pooled transaction types
161#[derive(Clone, Debug, PartialEq, Eq)]
162#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
163pub enum NewPooledTransactionHashes {
164    /// A list of transaction hashes valid for [66-68)
165    Eth66(NewPooledTransactionHashes66),
166    /// A list of transaction hashes valid from [68..]
167    ///
168    /// Note: it is assumed that the payload is valid (all vectors have the same length)
169    Eth68(NewPooledTransactionHashes68),
170}
171
172// === impl NewPooledTransactionHashes ===
173
174impl NewPooledTransactionHashes {
175    /// Returns the message [`EthVersion`].
176    pub const fn version(&self) -> EthVersion {
177        match self {
178            Self::Eth66(_) => EthVersion::Eth66,
179            Self::Eth68(_) => EthVersion::Eth68,
180        }
181    }
182
183    /// Returns `true` if the payload is valid for the given version
184    pub const fn is_valid_for_version(&self, version: EthVersion) -> bool {
185        match self {
186            Self::Eth66(_) => {
187                matches!(version, EthVersion::Eth67 | EthVersion::Eth66)
188            }
189            Self::Eth68(_) => {
190                matches!(
191                    version,
192                    EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71
193                )
194            }
195        }
196    }
197
198    /// Returns an iterator over all transaction hashes.
199    pub fn iter_hashes(&self) -> impl Iterator<Item = &B256> + '_ {
200        match self {
201            Self::Eth66(msg) => msg.iter(),
202            Self::Eth68(msg) => msg.hashes.iter(),
203        }
204    }
205
206    /// Returns an immutable reference to transaction hashes.
207    pub const fn hashes(&self) -> &Vec<B256> {
208        match self {
209            Self::Eth66(msg) => &msg.0,
210            Self::Eth68(msg) => &msg.hashes,
211        }
212    }
213
214    /// Returns a mutable reference to transaction hashes.
215    pub const fn hashes_mut(&mut self) -> &mut Vec<B256> {
216        match self {
217            Self::Eth66(msg) => &mut msg.0,
218            Self::Eth68(msg) => &mut msg.hashes,
219        }
220    }
221
222    /// Consumes the type and returns all hashes
223    pub fn into_hashes(self) -> Vec<B256> {
224        match self {
225            Self::Eth66(msg) => msg.0,
226            Self::Eth68(msg) => msg.hashes,
227        }
228    }
229
230    /// Returns an iterator over all transaction hashes.
231    pub fn into_iter_hashes(self) -> impl Iterator<Item = B256> {
232        match self {
233            Self::Eth66(msg) => msg.into_iter(),
234            Self::Eth68(msg) => msg.hashes.into_iter(),
235        }
236    }
237
238    /// Shortens the number of hashes in the message, keeping the first `len` hashes and dropping
239    /// the rest. If `len` is greater than the number of hashes, this has no effect.
240    pub fn truncate(&mut self, len: usize) {
241        match self {
242            Self::Eth66(msg) => msg.truncate(len),
243            Self::Eth68(msg) => {
244                msg.types.truncate(len);
245                msg.sizes.truncate(len);
246                msg.hashes.truncate(len);
247            }
248        }
249    }
250
251    /// Returns true if the message is empty
252    pub const fn is_empty(&self) -> bool {
253        match self {
254            Self::Eth66(msg) => msg.0.is_empty(),
255            Self::Eth68(msg) => msg.hashes.is_empty(),
256        }
257    }
258
259    /// Returns the number of hashes in the message
260    pub const fn len(&self) -> usize {
261        match self {
262            Self::Eth66(msg) => msg.0.len(),
263            Self::Eth68(msg) => msg.hashes.len(),
264        }
265    }
266
267    /// Returns an immutable reference to the inner type if this is an eth68 announcement.
268    pub const fn as_eth68(&self) -> Option<&NewPooledTransactionHashes68> {
269        match self {
270            Self::Eth66(_) => None,
271            Self::Eth68(msg) => Some(msg),
272        }
273    }
274
275    /// Returns a mutable reference to the inner type if this is an eth68 announcement.
276    pub const fn as_eth68_mut(&mut self) -> Option<&mut NewPooledTransactionHashes68> {
277        match self {
278            Self::Eth66(_) => None,
279            Self::Eth68(msg) => Some(msg),
280        }
281    }
282
283    /// Returns a mutable reference to the inner type if this is an eth66 announcement.
284    pub const fn as_eth66_mut(&mut self) -> Option<&mut NewPooledTransactionHashes66> {
285        match self {
286            Self::Eth66(msg) => Some(msg),
287            Self::Eth68(_) => None,
288        }
289    }
290
291    /// Returns the inner type if this is an eth68 announcement.
292    pub fn take_eth68(&mut self) -> Option<NewPooledTransactionHashes68> {
293        match self {
294            Self::Eth66(_) => None,
295            Self::Eth68(msg) => Some(mem::take(msg)),
296        }
297    }
298
299    /// Returns the inner type if this is an eth66 announcement.
300    pub fn take_eth66(&mut self) -> Option<NewPooledTransactionHashes66> {
301        match self {
302            Self::Eth66(msg) => Some(mem::take(msg)),
303            Self::Eth68(_) => None,
304        }
305    }
306}
307
308impl<N: NetworkPrimitives> From<NewPooledTransactionHashes> for EthMessage<N> {
309    fn from(value: NewPooledTransactionHashes) -> Self {
310        match value {
311            NewPooledTransactionHashes::Eth66(msg) => Self::NewPooledTransactionHashes66(msg),
312            NewPooledTransactionHashes::Eth68(msg) => Self::NewPooledTransactionHashes68(msg),
313        }
314    }
315}
316
317impl From<NewPooledTransactionHashes66> for NewPooledTransactionHashes {
318    fn from(hashes: NewPooledTransactionHashes66) -> Self {
319        Self::Eth66(hashes)
320    }
321}
322
323impl From<NewPooledTransactionHashes68> for NewPooledTransactionHashes {
324    fn from(hashes: NewPooledTransactionHashes68) -> Self {
325        Self::Eth68(hashes)
326    }
327}
328
329/// This informs peers of transaction hashes for transactions that have appeared on the network,
330/// but have not been included in a block.
331#[derive(
332    Clone,
333    Debug,
334    PartialEq,
335    Eq,
336    RlpEncodableWrapper,
337    RlpDecodableWrapper,
338    Default,
339    Deref,
340    DerefMut,
341    IntoIterator,
342)]
343#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
344#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
345#[add_arbitrary_tests(rlp)]
346pub struct NewPooledTransactionHashes66(
347    /// Transaction hashes for new transactions that have appeared on the network.
348    /// Clients should request the transactions with the given hashes using a
349    /// [`GetPooledTransactions`](crate::GetPooledTransactions) message.
350    pub Vec<B256>,
351);
352
353impl From<Vec<B256>> for NewPooledTransactionHashes66 {
354    fn from(v: Vec<B256>) -> Self {
355        Self(v)
356    }
357}
358
359/// Same as [`NewPooledTransactionHashes66`] but extends that beside the transaction hashes,
360/// the node sends the transaction types and their sizes (as defined in EIP-2718) as well.
361#[derive(Clone, Debug, PartialEq, Eq, Default)]
362#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
363pub struct NewPooledTransactionHashes68 {
364    /// Transaction types for new transactions that have appeared on the network.
365    ///
366    /// ## Note on RLP encoding and decoding
367    ///
368    /// In the [eth/68 spec](https://eips.ethereum.org/EIPS/eip-5793#specification) this is defined
369    /// the following way:
370    ///  * `[type_0: B_1, type_1: B_1, ...]`
371    ///
372    /// This would make it seem like the [`Encodable`] and
373    /// [`Decodable`] implementations should directly use a `Vec<u8>` for
374    /// encoding and decoding, because it looks like this field should be encoded as a _list_ of
375    /// bytes.
376    ///
377    /// However, [this is implemented in geth as a `[]byte`
378    /// type](https://github.com/ethereum/go-ethereum/blob/82d934b1dd80cdd8190803ea9f73ed2c345e2576/eth/protocols/eth/protocol.go#L308-L313),
379    /// which [ends up being encoded as a RLP
380    /// string](https://github.com/ethereum/go-ethereum/blob/82d934b1dd80cdd8190803ea9f73ed2c345e2576/rlp/encode_test.go#L171-L176),
381    /// **not** a RLP list.
382    ///
383    /// Because of this, we do not directly use the `Vec<u8>` when encoding and decoding, and
384    /// instead use the [`Encodable`] and [`Decodable`]
385    /// implementations for `&[u8]` instead, which encodes into a RLP string, and expects an RLP
386    /// string when decoding.
387    pub types: Vec<u8>,
388    /// Transaction sizes for new transactions that have appeared on the network.
389    pub sizes: Vec<usize>,
390    /// Transaction hashes for new transactions that have appeared on the network.
391    pub hashes: Vec<B256>,
392}
393
394#[cfg(feature = "arbitrary")]
395impl proptest::prelude::Arbitrary for NewPooledTransactionHashes68 {
396    type Parameters = ();
397    fn arbitrary_with(_args: ()) -> Self::Strategy {
398        use proptest::{collection::vec, prelude::*};
399        // Generate a single random length for all vectors
400        let vec_length = any::<usize>().prop_map(|x| x % 100 + 1); // Lengths between 1 and 100
401
402        vec_length
403            .prop_flat_map(|len| {
404                // Use the generated length to create vectors of TxType, usize, and B256
405                let types_vec = vec(
406                    proptest_arbitrary_interop::arb::<reth_ethereum_primitives::TxType>()
407                        .prop_map(|ty| ty as u8),
408                    len..=len,
409                );
410
411                // Map the usize values to the range 0..131072(0x20000)
412                let sizes_vec = vec(proptest::num::usize::ANY.prop_map(|x| x % 131072), len..=len);
413                let hashes_vec = vec(any::<B256>(), len..=len);
414
415                (types_vec, sizes_vec, hashes_vec)
416            })
417            .prop_map(|(types, sizes, hashes)| Self { types, sizes, hashes })
418            .boxed()
419    }
420
421    type Strategy = proptest::prelude::BoxedStrategy<Self>;
422}
423
424impl NewPooledTransactionHashes68 {
425    /// Returns an iterator over tx hashes zipped with corresponding metadata.
426    pub fn metadata_iter(&self) -> impl Iterator<Item = (&B256, (u8, usize))> {
427        self.hashes.iter().zip(self.types.iter().copied().zip(self.sizes.iter().copied()))
428    }
429
430    /// Appends a transaction
431    pub fn push<T: SignedTransaction>(&mut self, tx: &T) {
432        self.hashes.push(*tx.tx_hash());
433        self.sizes.push(tx.encode_2718_len());
434        self.types.push(tx.ty());
435    }
436
437    /// Appends the provided transactions
438    pub fn extend<'a, T: SignedTransaction>(&mut self, txs: impl IntoIterator<Item = &'a T>) {
439        for tx in txs {
440            self.push(tx);
441        }
442    }
443
444    /// Shrinks the capacity of the message vectors as much as possible.
445    pub fn shrink_to_fit(&mut self) {
446        self.hashes.shrink_to_fit();
447        self.sizes.shrink_to_fit();
448        self.types.shrink_to_fit()
449    }
450
451    /// Consumes and appends a transaction
452    pub fn with_transaction<T: SignedTransaction>(mut self, tx: &T) -> Self {
453        self.push(tx);
454        self
455    }
456
457    /// Consumes and appends the provided transactions
458    pub fn with_transactions<'a, T: SignedTransaction>(
459        mut self,
460        txs: impl IntoIterator<Item = &'a T>,
461    ) -> Self {
462        self.extend(txs);
463        self
464    }
465}
466
467impl Encodable for NewPooledTransactionHashes68 {
468    fn encode(&self, out: &mut dyn bytes::BufMut) {
469        #[derive(RlpEncodable)]
470        struct EncodableNewPooledTransactionHashes68<'a> {
471            types: &'a [u8],
472            sizes: &'a Vec<usize>,
473            hashes: &'a Vec<B256>,
474        }
475
476        let encodable = EncodableNewPooledTransactionHashes68 {
477            types: &self.types[..],
478            sizes: &self.sizes,
479            hashes: &self.hashes,
480        };
481
482        encodable.encode(out);
483    }
484    fn length(&self) -> usize {
485        #[derive(RlpEncodable)]
486        struct EncodableNewPooledTransactionHashes68<'a> {
487            types: &'a [u8],
488            sizes: &'a Vec<usize>,
489            hashes: &'a Vec<B256>,
490        }
491
492        let encodable = EncodableNewPooledTransactionHashes68 {
493            types: &self.types[..],
494            sizes: &self.sizes,
495            hashes: &self.hashes,
496        };
497
498        encodable.length()
499    }
500}
501
502impl Decodable for NewPooledTransactionHashes68 {
503    fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
504        #[derive(RlpDecodable)]
505        struct EncodableNewPooledTransactionHashes68 {
506            types: Bytes,
507            sizes: Vec<usize>,
508            hashes: Vec<B256>,
509        }
510
511        let encodable = EncodableNewPooledTransactionHashes68::decode(buf)?;
512        let msg = Self {
513            types: encodable.types.into(),
514            sizes: encodable.sizes,
515            hashes: encodable.hashes,
516        };
517
518        if msg.hashes.len() != msg.types.len() {
519            return Err(alloy_rlp::Error::ListLengthMismatch {
520                expected: msg.hashes.len(),
521                got: msg.types.len(),
522            })
523        }
524        if msg.hashes.len() != msg.sizes.len() {
525            return Err(alloy_rlp::Error::ListLengthMismatch {
526                expected: msg.hashes.len(),
527                got: msg.sizes.len(),
528            })
529        }
530
531        Ok(msg)
532    }
533}
534
535/// Validation pass that checks for unique transaction hashes.
536pub trait DedupPayload {
537    /// Value type in [`PartiallyValidData`] map.
538    type Value;
539
540    /// The payload contains no entries.
541    fn is_empty(&self) -> bool;
542
543    /// Returns the number of entries.
544    fn len(&self) -> usize;
545
546    /// Consumes self, returning an iterator over hashes in payload.
547    fn dedup(self) -> PartiallyValidData<Self::Value>;
548}
549
550/// Value in [`PartiallyValidData`] map obtained from an announcement.
551pub type Eth68TxMetadata = Option<(u8, usize)>;
552
553impl DedupPayload for NewPooledTransactionHashes {
554    type Value = Eth68TxMetadata;
555
556    fn is_empty(&self) -> bool {
557        self.is_empty()
558    }
559
560    fn len(&self) -> usize {
561        self.len()
562    }
563
564    fn dedup(self) -> PartiallyValidData<Self::Value> {
565        match self {
566            Self::Eth66(msg) => msg.dedup(),
567            Self::Eth68(msg) => msg.dedup(),
568        }
569    }
570}
571
572impl DedupPayload for NewPooledTransactionHashes68 {
573    type Value = Eth68TxMetadata;
574
575    fn is_empty(&self) -> bool {
576        self.hashes.is_empty()
577    }
578
579    fn len(&self) -> usize {
580        self.hashes.len()
581    }
582
583    fn dedup(self) -> PartiallyValidData<Self::Value> {
584        let Self { hashes, mut sizes, mut types } = self;
585
586        let mut deduped_data = HashMap::with_capacity_and_hasher(hashes.len(), Default::default());
587
588        for hash in hashes.into_iter().rev() {
589            if let (Some(ty), Some(size)) = (types.pop(), sizes.pop()) {
590                deduped_data.insert(hash, Some((ty, size)));
591            }
592        }
593
594        PartiallyValidData::from_raw_data_eth68(deduped_data)
595    }
596}
597
598impl DedupPayload for NewPooledTransactionHashes66 {
599    type Value = Eth68TxMetadata;
600
601    fn is_empty(&self) -> bool {
602        self.0.is_empty()
603    }
604
605    fn len(&self) -> usize {
606        self.0.len()
607    }
608
609    fn dedup(self) -> PartiallyValidData<Self::Value> {
610        let Self(hashes) = self;
611
612        let mut deduped_data = HashMap::with_capacity_and_hasher(hashes.len(), Default::default());
613
614        let noop_value: Eth68TxMetadata = None;
615
616        for hash in hashes.into_iter().rev() {
617            deduped_data.insert(hash, noop_value);
618        }
619
620        PartiallyValidData::from_raw_data_eth66(deduped_data)
621    }
622}
623
624/// Interface for handling mempool message data. Used in various filters in pipelines in
625/// `TransactionsManager` and in queries to `TransactionPool`.
626pub trait HandleMempoolData {
627    /// The announcement contains no entries.
628    fn is_empty(&self) -> bool;
629
630    /// Returns the number of entries.
631    fn len(&self) -> usize;
632
633    /// Retain only entries for which the hash in the entry satisfies a given predicate.
634    fn retain_by_hash(&mut self, f: impl FnMut(&TxHash) -> bool);
635}
636
637/// Extension of [`HandleMempoolData`] interface, for mempool messages that are versioned.
638pub trait HandleVersionedMempoolData {
639    /// Returns the announcement version, either [`Eth66`](EthVersion::Eth66) or
640    /// [`Eth68`](EthVersion::Eth68).
641    fn msg_version(&self) -> EthVersion;
642}
643
644impl<T: SignedTransaction> HandleMempoolData for Vec<T> {
645    fn is_empty(&self) -> bool {
646        self.is_empty()
647    }
648
649    fn len(&self) -> usize {
650        self.len()
651    }
652
653    fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
654        self.retain(|tx| f(tx.tx_hash()))
655    }
656}
657
658macro_rules! handle_mempool_data_map_impl {
659    ($data_ty:ty, $(<$generic:ident>)?) => {
660        impl$(<$generic>)? HandleMempoolData for $data_ty {
661            fn is_empty(&self) -> bool {
662                self.data.is_empty()
663            }
664
665            fn len(&self) -> usize {
666                self.data.len()
667            }
668
669            fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
670                self.data.retain(|hash, _| f(hash));
671            }
672        }
673    };
674}
675
676/// Data that has passed an initial validation pass that is not specific to any mempool message
677/// type.
678#[derive(Debug, Deref, DerefMut, IntoIterator)]
679pub struct PartiallyValidData<V> {
680    #[deref]
681    #[deref_mut]
682    #[into_iterator]
683    data: HashMap<TxHash, V>,
684    version: Option<EthVersion>,
685}
686
687handle_mempool_data_map_impl!(PartiallyValidData<V>, <V>);
688
689impl<V> PartiallyValidData<V> {
690    /// Wraps raw data.
691    pub const fn from_raw_data(data: HashMap<TxHash, V>, version: Option<EthVersion>) -> Self {
692        Self { data, version }
693    }
694
695    /// Wraps raw data with version [`EthVersion::Eth68`].
696    pub const fn from_raw_data_eth68(data: HashMap<TxHash, V>) -> Self {
697        Self::from_raw_data(data, Some(EthVersion::Eth68))
698    }
699
700    /// Wraps raw data with version [`EthVersion::Eth66`].
701    pub const fn from_raw_data_eth66(data: HashMap<TxHash, V>) -> Self {
702        Self::from_raw_data(data, Some(EthVersion::Eth66))
703    }
704
705    /// Returns a new [`PartiallyValidData`] with empty data from an [`Eth68`](EthVersion::Eth68)
706    /// announcement.
707    pub fn empty_eth68() -> Self {
708        Self::from_raw_data_eth68(HashMap::default())
709    }
710
711    /// Returns a new [`PartiallyValidData`] with empty data from an [`Eth66`](EthVersion::Eth66)
712    /// announcement.
713    pub fn empty_eth66() -> Self {
714        Self::from_raw_data_eth66(HashMap::default())
715    }
716
717    /// Returns the version of the message this data was received in if different versions of the
718    /// message exists, either [`Eth66`](EthVersion::Eth66) or [`Eth68`](EthVersion::Eth68).
719    pub const fn msg_version(&self) -> Option<EthVersion> {
720        self.version
721    }
722
723    /// Destructs returning the validated data.
724    pub fn into_data(self) -> HashMap<TxHash, V> {
725        self.data
726    }
727}
728
729/// Partially validated data from an announcement or a
730/// [`PooledTransactions`](crate::PooledTransactions) response.
731#[derive(Debug, Deref, DerefMut, IntoIterator, From)]
732pub struct ValidAnnouncementData {
733    #[deref]
734    #[deref_mut]
735    #[into_iterator]
736    data: HashMap<TxHash, Eth68TxMetadata>,
737    version: EthVersion,
738}
739
740handle_mempool_data_map_impl!(ValidAnnouncementData,);
741
742impl ValidAnnouncementData {
743    /// Destructs returning only the valid hashes and the announcement message version. Caution! If
744    /// this is [`Eth68`](EthVersion::Eth68) announcement data, this drops the metadata.
745    pub fn into_request_hashes(self) -> (RequestTxHashes, EthVersion) {
746        let hashes = self.data.into_keys().collect::<HashSet<_>>();
747
748        (RequestTxHashes::new(hashes), self.version)
749    }
750
751    /// Conversion from [`PartiallyValidData`] from an announcement. Note! [`PartiallyValidData`]
752    /// from an announcement, should have some [`EthVersion`]. Panics if [`PartiallyValidData`] has
753    /// version set to `None`.
754    pub fn from_partially_valid_data(data: PartiallyValidData<Eth68TxMetadata>) -> Self {
755        let PartiallyValidData { data, version } = data;
756
757        let version = version.expect("should have eth version for conversion");
758
759        Self { data, version }
760    }
761
762    /// Destructs returning the validated data.
763    pub fn into_data(self) -> HashMap<TxHash, Eth68TxMetadata> {
764        self.data
765    }
766}
767
768impl HandleVersionedMempoolData for ValidAnnouncementData {
769    fn msg_version(&self) -> EthVersion {
770        self.version
771    }
772}
773
774/// Hashes to request from a peer.
775#[derive(Debug, Default, Deref, DerefMut, IntoIterator, Constructor)]
776pub struct RequestTxHashes {
777    #[deref]
778    #[deref_mut]
779    #[into_iterator(owned, ref)]
780    hashes: HashSet<TxHash>,
781}
782
783impl RequestTxHashes {
784    /// Returns a new [`RequestTxHashes`] with given capacity for hashes. Caution! Make sure to
785    /// call [`HashSet::shrink_to_fit`] on [`RequestTxHashes`] when full, especially where it will
786    /// be stored in its entirety like in the future waiting for a
787    /// [`GetPooledTransactions`](crate::GetPooledTransactions) request to resolve.
788    pub fn with_capacity(capacity: usize) -> Self {
789        Self::new(HashSet::with_capacity_and_hasher(capacity, Default::default()))
790    }
791
792    /// Returns a new empty instance.
793    fn empty() -> Self {
794        Self::new(HashSet::default())
795    }
796
797    /// Retains the given number of elements, returning an iterator over the rest.
798    pub fn retain_count(&mut self, count: usize) -> Self {
799        let rest_capacity = self.hashes.len().saturating_sub(count);
800        if rest_capacity == 0 {
801            return Self::empty()
802        }
803        let mut rest = Self::with_capacity(rest_capacity);
804
805        let mut i = 0;
806        self.hashes.retain(|hash| {
807            if i >= count {
808                rest.insert(*hash);
809                return false
810            }
811            i += 1;
812
813            true
814        });
815
816        rest
817    }
818}
819
820impl FromIterator<(TxHash, Eth68TxMetadata)> for RequestTxHashes {
821    fn from_iter<I: IntoIterator<Item = (TxHash, Eth68TxMetadata)>>(iter: I) -> Self {
822        Self::new(iter.into_iter().map(|(hash, _)| hash).collect())
823    }
824}
825
826/// The earliest block, the latest block and hash of the latest block which can be provided.
827/// See [BlockRangeUpdate](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#blockrangeupdate-0x11).
828#[derive(Clone, Debug, PartialEq, Eq, Default, RlpEncodable, RlpDecodable)]
829#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
830#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
831pub struct BlockRangeUpdate {
832    /// The earliest block which is available.
833    pub earliest: u64,
834    /// The latest block which is available.
835    pub latest: u64,
836    /// Latest available block's hash.
837    pub latest_hash: B256,
838}
839
840#[cfg(test)]
841mod tests {
842    use super::*;
843    use alloy_consensus::{transaction::TxHashRef, Typed2718};
844    use alloy_eips::eip2718::Encodable2718;
845    use alloy_primitives::{b256, hex, Signature, U256};
846    use reth_ethereum_primitives::{Transaction, TransactionSigned};
847    use std::str::FromStr;
848
849    /// Takes as input a struct / encoded hex message pair, ensuring that we encode to the exact hex
850    /// message, and decode to the exact struct.
851    fn test_encoding_vector<T: Encodable + Decodable + PartialEq + core::fmt::Debug>(
852        input: (T, &[u8]),
853    ) {
854        let (expected_decoded, expected_encoded) = input;
855        let mut encoded = Vec::new();
856        expected_decoded.encode(&mut encoded);
857
858        assert_eq!(hex::encode(&encoded), hex::encode(expected_encoded));
859
860        let decoded = T::decode(&mut encoded.as_ref()).unwrap();
861        assert_eq!(expected_decoded, decoded);
862    }
863
864    #[test]
865    fn can_return_latest_block() {
866        let mut blocks = NewBlockHashes(vec![BlockHashNumber { hash: B256::random(), number: 0 }]);
867        let latest = blocks.latest().unwrap();
868        assert_eq!(latest.number, 0);
869
870        blocks.push(BlockHashNumber { hash: B256::random(), number: 100 });
871        blocks.push(BlockHashNumber { hash: B256::random(), number: 2 });
872        let latest = blocks.latest().unwrap();
873        assert_eq!(latest.number, 100);
874    }
875
876    #[test]
877    fn eth_68_tx_hash_roundtrip() {
878        let vectors = vec![
879            (
880                NewPooledTransactionHashes68 { types: vec![], sizes: vec![], hashes: vec![] },
881                &hex!("c380c0c0")[..],
882            ),
883            (
884                NewPooledTransactionHashes68 {
885                    types: vec![0x00],
886                    sizes: vec![0x00],
887                    hashes: vec![
888                        B256::from_str(
889                            "0x0000000000000000000000000000000000000000000000000000000000000000",
890                        )
891                        .unwrap(),
892                    ],
893                },
894                &hex!(
895                    "e500c180e1a00000000000000000000000000000000000000000000000000000000000000000"
896                )[..],
897            ),
898            (
899                NewPooledTransactionHashes68 {
900                    types: vec![0x00, 0x00],
901                    sizes: vec![0x00, 0x00],
902                    hashes: vec![
903                        B256::from_str(
904                            "0x0000000000000000000000000000000000000000000000000000000000000000",
905                        )
906                        .unwrap(),
907                        B256::from_str(
908                            "0x0000000000000000000000000000000000000000000000000000000000000000",
909                        )
910                        .unwrap(),
911                    ],
912                },
913                &hex!(
914                    "f84a820000c28080f842a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000"
915                )[..],
916            ),
917            (
918                NewPooledTransactionHashes68 {
919                    types: vec![0x02],
920                    sizes: vec![0xb6],
921                    hashes: vec![
922                        B256::from_str(
923                            "0xfecbed04c7b88d8e7221a0a3f5dc33f220212347fc167459ea5cc9c3eb4c1124",
924                        )
925                        .unwrap(),
926                    ],
927                },
928                &hex!(
929                    "e602c281b6e1a0fecbed04c7b88d8e7221a0a3f5dc33f220212347fc167459ea5cc9c3eb4c1124"
930                )[..],
931            ),
932            (
933                NewPooledTransactionHashes68 {
934                    types: vec![0xff, 0xff],
935                    sizes: vec![0xffffffff, 0xffffffff],
936                    hashes: vec![
937                        B256::from_str(
938                            "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
939                        )
940                        .unwrap(),
941                        B256::from_str(
942                            "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
943                        )
944                        .unwrap(),
945                    ],
946                },
947                &hex!(
948                    "f85282ffffca84ffffffff84fffffffff842a0ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffa0ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"
949                )[..],
950            ),
951            (
952                NewPooledTransactionHashes68 {
953                    types: vec![0xff, 0xff],
954                    sizes: vec![0xffffffff, 0xffffffff],
955                    hashes: vec![
956                        B256::from_str(
957                            "0xbeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafe",
958                        )
959                        .unwrap(),
960                        B256::from_str(
961                            "0xbeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafe",
962                        )
963                        .unwrap(),
964                    ],
965                },
966                &hex!(
967                    "f85282ffffca84ffffffff84fffffffff842a0beefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafea0beefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafe"
968                )[..],
969            ),
970            (
971                NewPooledTransactionHashes68 {
972                    types: vec![0x10, 0x10],
973                    sizes: vec![0xdeadc0de, 0xdeadc0de],
974                    hashes: vec![
975                        B256::from_str(
976                            "0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e4d4e2",
977                        )
978                        .unwrap(),
979                        B256::from_str(
980                            "0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e4d4e2",
981                        )
982                        .unwrap(),
983                    ],
984                },
985                &hex!(
986                    "f852821010ca84deadc0de84deadc0def842a03b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e4d4e2a03b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e4d4e2"
987                )[..],
988            ),
989            (
990                NewPooledTransactionHashes68 {
991                    types: vec![0x6f, 0x6f],
992                    sizes: vec![0x7fffffff, 0x7fffffff],
993                    hashes: vec![
994                        B256::from_str(
995                            "0x0000000000000000000000000000000000000000000000000000000000000002",
996                        )
997                        .unwrap(),
998                        B256::from_str(
999                            "0x0000000000000000000000000000000000000000000000000000000000000002",
1000                        )
1001                        .unwrap(),
1002                    ],
1003                },
1004                &hex!(
1005                    "f852826f6fca847fffffff847ffffffff842a00000000000000000000000000000000000000000000000000000000000000002a00000000000000000000000000000000000000000000000000000000000000002"
1006                )[..],
1007            ),
1008        ];
1009
1010        for vector in vectors {
1011            test_encoding_vector(vector);
1012        }
1013    }
1014
1015    #[test]
1016    fn request_hashes_retain_count_keep_subset() {
1017        let mut hashes = RequestTxHashes::new(
1018            [
1019                b256!("0x0000000000000000000000000000000000000000000000000000000000000001"),
1020                b256!("0x0000000000000000000000000000000000000000000000000000000000000002"),
1021                b256!("0x0000000000000000000000000000000000000000000000000000000000000003"),
1022                b256!("0x0000000000000000000000000000000000000000000000000000000000000004"),
1023                b256!("0x0000000000000000000000000000000000000000000000000000000000000005"),
1024            ]
1025            .into_iter()
1026            .collect::<HashSet<_>>(),
1027        );
1028
1029        let rest = hashes.retain_count(3);
1030
1031        assert_eq!(3, hashes.len());
1032        assert_eq!(2, rest.len());
1033    }
1034
1035    #[test]
1036    fn request_hashes_retain_count_keep_all() {
1037        let mut hashes = RequestTxHashes::new(
1038            [
1039                b256!("0x0000000000000000000000000000000000000000000000000000000000000001"),
1040                b256!("0x0000000000000000000000000000000000000000000000000000000000000002"),
1041                b256!("0x0000000000000000000000000000000000000000000000000000000000000003"),
1042                b256!("0x0000000000000000000000000000000000000000000000000000000000000004"),
1043                b256!("0x0000000000000000000000000000000000000000000000000000000000000005"),
1044            ]
1045            .into_iter()
1046            .collect::<HashSet<_>>(),
1047        );
1048
1049        let _ = hashes.retain_count(6);
1050
1051        assert_eq!(5, hashes.len());
1052    }
1053
1054    #[test]
1055    fn split_request_hashes_keep_none() {
1056        let mut hashes = RequestTxHashes::new(
1057            [
1058                b256!("0x0000000000000000000000000000000000000000000000000000000000000001"),
1059                b256!("0x0000000000000000000000000000000000000000000000000000000000000002"),
1060                b256!("0x0000000000000000000000000000000000000000000000000000000000000003"),
1061                b256!("0x0000000000000000000000000000000000000000000000000000000000000004"),
1062                b256!("0x0000000000000000000000000000000000000000000000000000000000000005"),
1063            ]
1064            .into_iter()
1065            .collect::<HashSet<_>>(),
1066        );
1067
1068        let rest = hashes.retain_count(0);
1069
1070        assert_eq!(0, hashes.len());
1071        assert_eq!(5, rest.len());
1072    }
1073
1074    fn signed_transaction() -> impl SignedTransaction {
1075        TransactionSigned::new_unhashed(
1076            Transaction::Legacy(Default::default()),
1077            Signature::new(
1078                U256::from_str(
1079                    "0x64b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c12",
1080                )
1081                .unwrap(),
1082                U256::from_str(
1083                    "0x64b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c10",
1084                )
1085                .unwrap(),
1086                false,
1087            ),
1088        )
1089    }
1090
1091    #[test]
1092    fn test_pooled_tx_hashes_68_push() {
1093        let tx = signed_transaction();
1094        let mut tx_hashes =
1095            NewPooledTransactionHashes68 { types: vec![], sizes: vec![], hashes: vec![] };
1096        tx_hashes.push(&tx);
1097        assert_eq!(tx_hashes.types.len(), 1);
1098        assert_eq!(tx_hashes.sizes.len(), 1);
1099        assert_eq!(tx_hashes.hashes.len(), 1);
1100        assert_eq!(tx_hashes.types[0], tx.ty());
1101        assert_eq!(tx_hashes.sizes[0], tx.encode_2718_len());
1102        assert_eq!(tx_hashes.hashes[0], *tx.tx_hash());
1103    }
1104
1105    #[test]
1106    fn test_pooled_tx_hashes_68_extend() {
1107        let tx = signed_transaction();
1108        let txs = vec![tx.clone(), tx.clone()];
1109        let mut tx_hashes =
1110            NewPooledTransactionHashes68 { types: vec![], sizes: vec![], hashes: vec![] };
1111        tx_hashes.extend(&txs);
1112        assert_eq!(tx_hashes.types.len(), 2);
1113        assert_eq!(tx_hashes.sizes.len(), 2);
1114        assert_eq!(tx_hashes.hashes.len(), 2);
1115        assert_eq!(tx_hashes.types[0], tx.ty());
1116        assert_eq!(tx_hashes.sizes[0], tx.encode_2718_len());
1117        assert_eq!(tx_hashes.hashes[0], *tx.tx_hash());
1118        assert_eq!(tx_hashes.types[1], tx.ty());
1119        assert_eq!(tx_hashes.sizes[1], tx.encode_2718_len());
1120        assert_eq!(tx_hashes.hashes[1], *tx.tx_hash());
1121    }
1122
1123    #[test]
1124    fn test_pooled_tx_hashes_68_with_transaction() {
1125        let tx = signed_transaction();
1126        let tx_hashes =
1127            NewPooledTransactionHashes68 { types: vec![], sizes: vec![], hashes: vec![] }
1128                .with_transaction(&tx);
1129        assert_eq!(tx_hashes.types.len(), 1);
1130        assert_eq!(tx_hashes.sizes.len(), 1);
1131        assert_eq!(tx_hashes.hashes.len(), 1);
1132        assert_eq!(tx_hashes.types[0], tx.ty());
1133        assert_eq!(tx_hashes.sizes[0], tx.encode_2718_len());
1134        assert_eq!(tx_hashes.hashes[0], *tx.tx_hash());
1135    }
1136
1137    #[test]
1138    fn test_pooled_tx_hashes_68_with_transactions() {
1139        let tx = signed_transaction();
1140        let txs = vec![tx.clone(), tx.clone()];
1141        let tx_hashes =
1142            NewPooledTransactionHashes68 { types: vec![], sizes: vec![], hashes: vec![] }
1143                .with_transactions(&txs);
1144        assert_eq!(tx_hashes.types.len(), 2);
1145        assert_eq!(tx_hashes.sizes.len(), 2);
1146        assert_eq!(tx_hashes.hashes.len(), 2);
1147        assert_eq!(tx_hashes.types[0], tx.ty());
1148        assert_eq!(tx_hashes.sizes[0], tx.encode_2718_len());
1149        assert_eq!(tx_hashes.hashes[0], *tx.tx_hash());
1150        assert_eq!(tx_hashes.types[1], tx.ty());
1151        assert_eq!(tx_hashes.sizes[1], tx.encode_2718_len());
1152        assert_eq!(tx_hashes.hashes[1], *tx.tx_hash());
1153    }
1154}