Skip to main content

reth_eth_wire_types/
broadcast.rs

1//! Types for broadcasting new data.
2
3use crate::{EthMessage, EthVersion, NetworkPrimitives};
4use alloc::{sync::Arc, vec::Vec};
5use alloy_primitives::{
6    map::{HashMap, HashSet},
7    Bytes, TxHash, B256, U128,
8};
9use alloy_rlp::{
10    Decodable, Encodable, RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper,
11};
12use core::{fmt::Debug, mem};
13use derive_more::{Constructor, Deref, DerefMut, From, IntoIterator};
14use reth_codecs_derive::{add_arbitrary_tests, generate_tests};
15use reth_ethereum_primitives::TransactionSigned;
16use reth_primitives_traits::{Block, SignedTransaction};
17
18/// This informs peers of new blocks that have appeared on the network.
19#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
20#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
21#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
22#[add_arbitrary_tests(rlp)]
23pub struct NewBlockHashes(
24    /// New block hashes and the block number for each blockhash.
25    /// Clients should request blocks using a [`GetBlockBodies`](crate::GetBlockBodies) message.
26    pub Vec<BlockHashNumber>,
27);
28
29// === impl NewBlockHashes ===
30
31impl NewBlockHashes {
32    /// Returns the latest block in the list of blocks.
33    pub fn latest(&self) -> Option<&BlockHashNumber> {
34        self.0.iter().fold(None, |latest, block| {
35            if let Some(latest) = latest {
36                return if latest.number > block.number { Some(latest) } else { Some(block) }
37            }
38            Some(block)
39        })
40    }
41}
42
43/// A block hash _and_ a block number.
44#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable, Default)]
45#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
46#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
47#[add_arbitrary_tests(rlp)]
48pub struct BlockHashNumber {
49    /// The block hash
50    pub hash: B256,
51    /// The block number
52    pub number: u64,
53}
54
55impl From<Vec<BlockHashNumber>> for NewBlockHashes {
56    fn from(v: Vec<BlockHashNumber>) -> Self {
57        Self(v)
58    }
59}
60
61impl From<NewBlockHashes> for Vec<BlockHashNumber> {
62    fn from(v: NewBlockHashes) -> Self {
63        v.0
64    }
65}
66
67/// A trait for block payloads transmitted through p2p.
68pub trait NewBlockPayload:
69    Encodable + Decodable + Clone + Eq + Debug + Send + Sync + Unpin + 'static
70{
71    /// The block type.
72    type Block: Block;
73
74    /// Returns a reference to the block.
75    fn block(&self) -> &Self::Block;
76}
77
78/// A new block with the current total difficulty, which includes the difficulty of the returned
79/// block.
80#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable, Default)]
81#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
82#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
83pub struct NewBlock<B = reth_ethereum_primitives::Block> {
84    /// A new block.
85    pub block: B,
86    /// The current total difficulty.
87    pub td: U128,
88}
89
90impl<B: Block + 'static> NewBlockPayload for NewBlock<B> {
91    type Block = B;
92
93    fn block(&self) -> &Self::Block {
94        &self.block
95    }
96}
97
98generate_tests!(#[rlp, 25] NewBlock<reth_ethereum_primitives::Block>, EthNewBlockTests);
99
100/// This informs peers of transactions that have appeared on the network and are not yet included
101/// in a block.
102#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
103#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
104#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
105#[add_arbitrary_tests(rlp, 10)]
106pub struct Transactions<T = TransactionSigned>(
107    /// New transactions for the peer to include in its mempool.
108    pub Vec<T>,
109);
110
111impl<T: SignedTransaction> Transactions<T> {
112    /// Returns `true` if the list of transactions contains any blob transactions.
113    pub fn has_eip4844(&self) -> bool {
114        self.0.iter().any(|tx| tx.is_eip4844())
115    }
116}
117
118impl<T> From<Vec<T>> for Transactions<T> {
119    fn from(txs: Vec<T>) -> Self {
120        Self(txs)
121    }
122}
123
124impl<T> From<Transactions<T>> for Vec<T> {
125    fn from(txs: Transactions<T>) -> Self {
126        txs.0
127    }
128}
129
130/// Same as [`Transactions`] but this is intended as egress message send from local to _many_ peers.
131///
132/// The list of transactions is constructed on per-peers basis, but the underlying transaction
133/// objects are shared.
134#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper)]
135#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
136#[add_arbitrary_tests(rlp, 20)]
137pub struct SharedTransactions<T = TransactionSigned>(
138    /// New transactions for the peer to include in its mempool.
139    pub Vec<Arc<T>>,
140);
141
142/// A wrapper type for all different new pooled transaction types
143#[derive(Clone, Debug, PartialEq, Eq)]
144#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
145pub enum NewPooledTransactionHashes {
146    /// A list of transaction hashes valid for [66-68)
147    Eth66(NewPooledTransactionHashes66),
148    /// A list of transaction hashes valid from [68..]
149    ///
150    /// Note: it is assumed that the payload is valid (all vectors have the same length)
151    Eth68(NewPooledTransactionHashes68),
152}
153
154// === impl NewPooledTransactionHashes ===
155
156impl NewPooledTransactionHashes {
157    /// Returns the message [`EthVersion`].
158    pub const fn version(&self) -> EthVersion {
159        match self {
160            Self::Eth66(_) => EthVersion::Eth66,
161            Self::Eth68(_) => EthVersion::Eth68,
162        }
163    }
164
165    /// Returns `true` if the payload is valid for the given version
166    pub const fn is_valid_for_version(&self, version: EthVersion) -> bool {
167        match self {
168            Self::Eth66(_) => {
169                matches!(version, EthVersion::Eth67 | EthVersion::Eth66)
170            }
171            Self::Eth68(_) => {
172                matches!(
173                    version,
174                    EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71
175                )
176            }
177        }
178    }
179
180    /// Returns an iterator over all transaction hashes.
181    pub fn iter_hashes(&self) -> impl Iterator<Item = &B256> + '_ {
182        match self {
183            Self::Eth66(msg) => msg.0.iter(),
184            Self::Eth68(msg) => msg.hashes.iter(),
185        }
186    }
187
188    /// Returns an immutable reference to transaction hashes.
189    pub const fn hashes(&self) -> &Vec<B256> {
190        match self {
191            Self::Eth66(msg) => &msg.0,
192            Self::Eth68(msg) => &msg.hashes,
193        }
194    }
195
196    /// Returns a mutable reference to transaction hashes.
197    pub const fn hashes_mut(&mut self) -> &mut Vec<B256> {
198        match self {
199            Self::Eth66(msg) => &mut msg.0,
200            Self::Eth68(msg) => &mut msg.hashes,
201        }
202    }
203
204    /// Consumes the type and returns all hashes
205    pub fn into_hashes(self) -> Vec<B256> {
206        match self {
207            Self::Eth66(msg) => msg.0,
208            Self::Eth68(msg) => msg.hashes,
209        }
210    }
211
212    /// Returns an iterator over all transaction hashes.
213    pub fn into_iter_hashes(self) -> impl Iterator<Item = B256> {
214        match self {
215            Self::Eth66(msg) => msg.0.into_iter(),
216            Self::Eth68(msg) => msg.hashes.into_iter(),
217        }
218    }
219
220    /// Shortens the number of hashes in the message, keeping the first `len` hashes and dropping
221    /// the rest. If `len` is greater than the number of hashes, this has no effect.
222    pub fn truncate(&mut self, len: usize) {
223        match self {
224            Self::Eth66(msg) => msg.0.truncate(len),
225            Self::Eth68(msg) => {
226                msg.types.truncate(len);
227                msg.sizes.truncate(len);
228                msg.hashes.truncate(len);
229            }
230        }
231    }
232
233    /// Returns true if the message is empty
234    pub const fn is_empty(&self) -> bool {
235        match self {
236            Self::Eth66(msg) => msg.0.is_empty(),
237            Self::Eth68(msg) => msg.hashes.is_empty(),
238        }
239    }
240
241    /// Returns the number of hashes in the message
242    pub const fn len(&self) -> usize {
243        match self {
244            Self::Eth66(msg) => msg.0.len(),
245            Self::Eth68(msg) => msg.hashes.len(),
246        }
247    }
248
249    /// Returns an immutable reference to the inner type if this an eth68 announcement.
250    pub const fn as_eth68(&self) -> Option<&NewPooledTransactionHashes68> {
251        match self {
252            Self::Eth66(_) => None,
253            Self::Eth68(msg) => Some(msg),
254        }
255    }
256
257    /// Returns a mutable reference to the inner type if this an eth68 announcement.
258    pub const fn as_eth68_mut(&mut self) -> Option<&mut NewPooledTransactionHashes68> {
259        match self {
260            Self::Eth66(_) => None,
261            Self::Eth68(msg) => Some(msg),
262        }
263    }
264
265    /// Returns a mutable reference to the inner type if this an eth66 announcement.
266    pub const fn as_eth66_mut(&mut self) -> Option<&mut NewPooledTransactionHashes66> {
267        match self {
268            Self::Eth66(msg) => Some(msg),
269            Self::Eth68(_) => None,
270        }
271    }
272
273    /// Returns the inner type if this an eth68 announcement.
274    pub fn take_eth68(&mut self) -> Option<NewPooledTransactionHashes68> {
275        match self {
276            Self::Eth66(_) => None,
277            Self::Eth68(msg) => Some(mem::take(msg)),
278        }
279    }
280
281    /// Returns the inner type if this an eth66 announcement.
282    pub fn take_eth66(&mut self) -> Option<NewPooledTransactionHashes66> {
283        match self {
284            Self::Eth66(msg) => Some(mem::take(msg)),
285            Self::Eth68(_) => None,
286        }
287    }
288}
289
290impl<N: NetworkPrimitives> From<NewPooledTransactionHashes> for EthMessage<N> {
291    fn from(value: NewPooledTransactionHashes) -> Self {
292        match value {
293            NewPooledTransactionHashes::Eth66(msg) => Self::NewPooledTransactionHashes66(msg),
294            NewPooledTransactionHashes::Eth68(msg) => Self::NewPooledTransactionHashes68(msg),
295        }
296    }
297}
298
299impl From<NewPooledTransactionHashes66> for NewPooledTransactionHashes {
300    fn from(hashes: NewPooledTransactionHashes66) -> Self {
301        Self::Eth66(hashes)
302    }
303}
304
305impl From<NewPooledTransactionHashes68> for NewPooledTransactionHashes {
306    fn from(hashes: NewPooledTransactionHashes68) -> Self {
307        Self::Eth68(hashes)
308    }
309}
310
311/// This informs peers of transaction hashes for transactions that have appeared on the network,
312/// but have not been included in a block.
313#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
314#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
315#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
316#[add_arbitrary_tests(rlp)]
317pub struct NewPooledTransactionHashes66(
318    /// Transaction hashes for new transactions that have appeared on the network.
319    /// Clients should request the transactions with the given hashes using a
320    /// [`GetPooledTransactions`](crate::GetPooledTransactions) message.
321    pub Vec<B256>,
322);
323
324impl From<Vec<B256>> for NewPooledTransactionHashes66 {
325    fn from(v: Vec<B256>) -> Self {
326        Self(v)
327    }
328}
329
330/// Same as [`NewPooledTransactionHashes66`] but extends that beside the transaction hashes,
331/// the node sends the transaction types and their sizes (as defined in EIP-2718) as well.
332#[derive(Clone, Debug, PartialEq, Eq, Default)]
333#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
334pub struct NewPooledTransactionHashes68 {
335    /// Transaction types for new transactions that have appeared on the network.
336    ///
337    /// ## Note on RLP encoding and decoding
338    ///
339    /// In the [eth/68 spec](https://eips.ethereum.org/EIPS/eip-5793#specification) this is defined
340    /// the following way:
341    ///  * `[type_0: B_1, type_1: B_1, ...]`
342    ///
343    /// This would make it seem like the [`Encodable`] and
344    /// [`Decodable`] implementations should directly use a `Vec<u8>` for
345    /// encoding and decoding, because it looks like this field should be encoded as a _list_ of
346    /// bytes.
347    ///
348    /// However, [this is implemented in geth as a `[]byte`
349    /// type](https://github.com/ethereum/go-ethereum/blob/82d934b1dd80cdd8190803ea9f73ed2c345e2576/eth/protocols/eth/protocol.go#L308-L313),
350    /// which [ends up being encoded as a RLP
351    /// string](https://github.com/ethereum/go-ethereum/blob/82d934b1dd80cdd8190803ea9f73ed2c345e2576/rlp/encode_test.go#L171-L176),
352    /// **not** a RLP list.
353    ///
354    /// Because of this, we do not directly use the `Vec<u8>` when encoding and decoding, and
355    /// instead use the [`Encodable`] and [`Decodable`]
356    /// implementations for `&[u8]` instead, which encodes into a RLP string, and expects an RLP
357    /// string when decoding.
358    pub types: Vec<u8>,
359    /// Transaction sizes for new transactions that have appeared on the network.
360    pub sizes: Vec<usize>,
361    /// Transaction hashes for new transactions that have appeared on the network.
362    pub hashes: Vec<B256>,
363}
364
365#[cfg(feature = "arbitrary")]
366impl proptest::prelude::Arbitrary for NewPooledTransactionHashes68 {
367    type Parameters = ();
368    fn arbitrary_with(_args: ()) -> Self::Strategy {
369        use proptest::{collection::vec, prelude::*};
370        // Generate a single random length for all vectors
371        let vec_length = any::<usize>().prop_map(|x| x % 100 + 1); // Lengths between 1 and 100
372
373        vec_length
374            .prop_flat_map(|len| {
375                // Use the generated length to create vectors of TxType, usize, and B256
376                let types_vec = vec(
377                    proptest_arbitrary_interop::arb::<reth_ethereum_primitives::TxType>()
378                        .prop_map(|ty| ty as u8),
379                    len..=len,
380                );
381
382                // Map the usize values to the range 0..131072(0x20000)
383                let sizes_vec = vec(proptest::num::usize::ANY.prop_map(|x| x % 131072), len..=len);
384                let hashes_vec = vec(any::<B256>(), len..=len);
385
386                (types_vec, sizes_vec, hashes_vec)
387            })
388            .prop_map(|(types, sizes, hashes)| Self { types, sizes, hashes })
389            .boxed()
390    }
391
392    type Strategy = proptest::prelude::BoxedStrategy<Self>;
393}
394
395impl NewPooledTransactionHashes68 {
396    /// Returns an iterator over tx hashes zipped with corresponding metadata.
397    pub fn metadata_iter(&self) -> impl Iterator<Item = (&B256, (u8, usize))> {
398        self.hashes.iter().zip(self.types.iter().copied().zip(self.sizes.iter().copied()))
399    }
400
401    /// Appends a transaction
402    pub fn push<T: SignedTransaction>(&mut self, tx: &T) {
403        self.hashes.push(*tx.tx_hash());
404        self.sizes.push(tx.encode_2718_len());
405        self.types.push(tx.ty());
406    }
407
408    /// Appends the provided transactions
409    pub fn extend<'a, T: SignedTransaction>(&mut self, txs: impl IntoIterator<Item = &'a T>) {
410        for tx in txs {
411            self.push(tx);
412        }
413    }
414
415    /// Shrinks the capacity of the message vectors as much as possible.
416    pub fn shrink_to_fit(&mut self) {
417        self.hashes.shrink_to_fit();
418        self.sizes.shrink_to_fit();
419        self.types.shrink_to_fit()
420    }
421
422    /// Consumes and appends a transaction
423    pub fn with_transaction<T: SignedTransaction>(mut self, tx: &T) -> Self {
424        self.push(tx);
425        self
426    }
427
428    /// Consumes and appends the provided transactions
429    pub fn with_transactions<'a, T: SignedTransaction>(
430        mut self,
431        txs: impl IntoIterator<Item = &'a T>,
432    ) -> Self {
433        self.extend(txs);
434        self
435    }
436}
437
438impl Encodable for NewPooledTransactionHashes68 {
439    fn encode(&self, out: &mut dyn bytes::BufMut) {
440        #[derive(RlpEncodable)]
441        struct EncodableNewPooledTransactionHashes68<'a> {
442            types: &'a [u8],
443            sizes: &'a Vec<usize>,
444            hashes: &'a Vec<B256>,
445        }
446
447        let encodable = EncodableNewPooledTransactionHashes68 {
448            types: &self.types[..],
449            sizes: &self.sizes,
450            hashes: &self.hashes,
451        };
452
453        encodable.encode(out);
454    }
455    fn length(&self) -> usize {
456        #[derive(RlpEncodable)]
457        struct EncodableNewPooledTransactionHashes68<'a> {
458            types: &'a [u8],
459            sizes: &'a Vec<usize>,
460            hashes: &'a Vec<B256>,
461        }
462
463        let encodable = EncodableNewPooledTransactionHashes68 {
464            types: &self.types[..],
465            sizes: &self.sizes,
466            hashes: &self.hashes,
467        };
468
469        encodable.length()
470    }
471}
472
473impl Decodable for NewPooledTransactionHashes68 {
474    fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
475        #[derive(RlpDecodable)]
476        struct EncodableNewPooledTransactionHashes68 {
477            types: Bytes,
478            sizes: Vec<usize>,
479            hashes: Vec<B256>,
480        }
481
482        let encodable = EncodableNewPooledTransactionHashes68::decode(buf)?;
483        let msg = Self {
484            types: encodable.types.into(),
485            sizes: encodable.sizes,
486            hashes: encodable.hashes,
487        };
488
489        if msg.hashes.len() != msg.types.len() {
490            return Err(alloy_rlp::Error::ListLengthMismatch {
491                expected: msg.hashes.len(),
492                got: msg.types.len(),
493            })
494        }
495        if msg.hashes.len() != msg.sizes.len() {
496            return Err(alloy_rlp::Error::ListLengthMismatch {
497                expected: msg.hashes.len(),
498                got: msg.sizes.len(),
499            })
500        }
501
502        Ok(msg)
503    }
504}
505
506/// Validation pass that checks for unique transaction hashes.
507pub trait DedupPayload {
508    /// Value type in [`PartiallyValidData`] map.
509    type Value;
510
511    /// The payload contains no entries.
512    fn is_empty(&self) -> bool;
513
514    /// Returns the number of entries.
515    fn len(&self) -> usize;
516
517    /// Consumes self, returning an iterator over hashes in payload.
518    fn dedup(self) -> PartiallyValidData<Self::Value>;
519}
520
521/// Value in [`PartiallyValidData`] map obtained from an announcement.
522pub type Eth68TxMetadata = Option<(u8, usize)>;
523
524impl DedupPayload for NewPooledTransactionHashes {
525    type Value = Eth68TxMetadata;
526
527    fn is_empty(&self) -> bool {
528        self.is_empty()
529    }
530
531    fn len(&self) -> usize {
532        self.len()
533    }
534
535    fn dedup(self) -> PartiallyValidData<Self::Value> {
536        match self {
537            Self::Eth66(msg) => msg.dedup(),
538            Self::Eth68(msg) => msg.dedup(),
539        }
540    }
541}
542
543impl DedupPayload for NewPooledTransactionHashes68 {
544    type Value = Eth68TxMetadata;
545
546    fn is_empty(&self) -> bool {
547        self.hashes.is_empty()
548    }
549
550    fn len(&self) -> usize {
551        self.hashes.len()
552    }
553
554    fn dedup(self) -> PartiallyValidData<Self::Value> {
555        let Self { hashes, mut sizes, mut types } = self;
556
557        let mut deduped_data = HashMap::with_capacity_and_hasher(hashes.len(), Default::default());
558
559        for hash in hashes.into_iter().rev() {
560            if let (Some(ty), Some(size)) = (types.pop(), sizes.pop()) {
561                deduped_data.insert(hash, Some((ty, size)));
562            }
563        }
564
565        PartiallyValidData::from_raw_data_eth68(deduped_data)
566    }
567}
568
569impl DedupPayload for NewPooledTransactionHashes66 {
570    type Value = Eth68TxMetadata;
571
572    fn is_empty(&self) -> bool {
573        self.0.is_empty()
574    }
575
576    fn len(&self) -> usize {
577        self.0.len()
578    }
579
580    fn dedup(self) -> PartiallyValidData<Self::Value> {
581        let Self(hashes) = self;
582
583        let mut deduped_data = HashMap::with_capacity_and_hasher(hashes.len(), Default::default());
584
585        let noop_value: Eth68TxMetadata = None;
586
587        for hash in hashes.into_iter().rev() {
588            deduped_data.insert(hash, noop_value);
589        }
590
591        PartiallyValidData::from_raw_data_eth66(deduped_data)
592    }
593}
594
595/// Interface for handling mempool message data. Used in various filters in pipelines in
596/// `TransactionsManager` and in queries to `TransactionPool`.
597pub trait HandleMempoolData {
598    /// The announcement contains no entries.
599    fn is_empty(&self) -> bool;
600
601    /// Returns the number of entries.
602    fn len(&self) -> usize;
603
604    /// Retain only entries for which the hash in the entry satisfies a given predicate.
605    fn retain_by_hash(&mut self, f: impl FnMut(&TxHash) -> bool);
606}
607
608/// Extension of [`HandleMempoolData`] interface, for mempool messages that are versioned.
609pub trait HandleVersionedMempoolData {
610    /// Returns the announcement version, either [`Eth66`](EthVersion::Eth66) or
611    /// [`Eth68`](EthVersion::Eth68).
612    fn msg_version(&self) -> EthVersion;
613}
614
615impl<T: SignedTransaction> HandleMempoolData for Vec<T> {
616    fn is_empty(&self) -> bool {
617        self.is_empty()
618    }
619
620    fn len(&self) -> usize {
621        self.len()
622    }
623
624    fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
625        self.retain(|tx| f(tx.tx_hash()))
626    }
627}
628
629macro_rules! handle_mempool_data_map_impl {
630    ($data_ty:ty, $(<$generic:ident>)?) => {
631        impl$(<$generic>)? HandleMempoolData for $data_ty {
632            fn is_empty(&self) -> bool {
633                self.data.is_empty()
634            }
635
636            fn len(&self) -> usize {
637                self.data.len()
638            }
639
640            fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
641                self.data.retain(|hash, _| f(hash));
642            }
643        }
644    };
645}
646
647/// Data that has passed an initial validation pass that is not specific to any mempool message
648/// type.
649#[derive(Debug, Deref, DerefMut, IntoIterator)]
650pub struct PartiallyValidData<V> {
651    #[deref]
652    #[deref_mut]
653    #[into_iterator]
654    data: HashMap<TxHash, V>,
655    version: Option<EthVersion>,
656}
657
658handle_mempool_data_map_impl!(PartiallyValidData<V>, <V>);
659
660impl<V> PartiallyValidData<V> {
661    /// Wraps raw data.
662    pub const fn from_raw_data(data: HashMap<TxHash, V>, version: Option<EthVersion>) -> Self {
663        Self { data, version }
664    }
665
666    /// Wraps raw data with version [`EthVersion::Eth68`].
667    pub const fn from_raw_data_eth68(data: HashMap<TxHash, V>) -> Self {
668        Self::from_raw_data(data, Some(EthVersion::Eth68))
669    }
670
671    /// Wraps raw data with version [`EthVersion::Eth66`].
672    pub const fn from_raw_data_eth66(data: HashMap<TxHash, V>) -> Self {
673        Self::from_raw_data(data, Some(EthVersion::Eth66))
674    }
675
676    /// Returns a new [`PartiallyValidData`] with empty data from an [`Eth68`](EthVersion::Eth68)
677    /// announcement.
678    pub fn empty_eth68() -> Self {
679        Self::from_raw_data_eth68(HashMap::default())
680    }
681
682    /// Returns a new [`PartiallyValidData`] with empty data from an [`Eth66`](EthVersion::Eth66)
683    /// announcement.
684    pub fn empty_eth66() -> Self {
685        Self::from_raw_data_eth66(HashMap::default())
686    }
687
688    /// Returns the version of the message this data was received in if different versions of the
689    /// message exists, either [`Eth66`](EthVersion::Eth66) or [`Eth68`](EthVersion::Eth68).
690    pub const fn msg_version(&self) -> Option<EthVersion> {
691        self.version
692    }
693
694    /// Destructs returning the validated data.
695    pub fn into_data(self) -> HashMap<TxHash, V> {
696        self.data
697    }
698}
699
700/// Partially validated data from an announcement or a
701/// [`PooledTransactions`](crate::PooledTransactions) response.
702#[derive(Debug, Deref, DerefMut, IntoIterator, From)]
703pub struct ValidAnnouncementData {
704    #[deref]
705    #[deref_mut]
706    #[into_iterator]
707    data: HashMap<TxHash, Eth68TxMetadata>,
708    version: EthVersion,
709}
710
711handle_mempool_data_map_impl!(ValidAnnouncementData,);
712
713impl ValidAnnouncementData {
714    /// Destructs returning only the valid hashes and the announcement message version. Caution! If
715    /// this is [`Eth68`](EthVersion::Eth68) announcement data, this drops the metadata.
716    pub fn into_request_hashes(self) -> (RequestTxHashes, EthVersion) {
717        let hashes = self.data.into_keys().collect::<HashSet<_>>();
718
719        (RequestTxHashes::new(hashes), self.version)
720    }
721
722    /// Conversion from [`PartiallyValidData`] from an announcement. Note! [`PartiallyValidData`]
723    /// from an announcement, should have some [`EthVersion`]. Panics if [`PartiallyValidData`] has
724    /// version set to `None`.
725    pub fn from_partially_valid_data(data: PartiallyValidData<Eth68TxMetadata>) -> Self {
726        let PartiallyValidData { data, version } = data;
727
728        let version = version.expect("should have eth version for conversion");
729
730        Self { data, version }
731    }
732
733    /// Destructs returning the validated data.
734    pub fn into_data(self) -> HashMap<TxHash, Eth68TxMetadata> {
735        self.data
736    }
737}
738
739impl HandleVersionedMempoolData for ValidAnnouncementData {
740    fn msg_version(&self) -> EthVersion {
741        self.version
742    }
743}
744
745/// Hashes to request from a peer.
746#[derive(Debug, Default, Deref, DerefMut, IntoIterator, Constructor)]
747pub struct RequestTxHashes {
748    #[deref]
749    #[deref_mut]
750    #[into_iterator(owned, ref)]
751    hashes: HashSet<TxHash>,
752}
753
754impl RequestTxHashes {
755    /// Returns a new [`RequestTxHashes`] with given capacity for hashes. Caution! Make sure to
756    /// call [`HashSet::shrink_to_fit`] on [`RequestTxHashes`] when full, especially where it will
757    /// be stored in its entirety like in the future waiting for a
758    /// [`GetPooledTransactions`](crate::GetPooledTransactions) request to resolve.
759    pub fn with_capacity(capacity: usize) -> Self {
760        Self::new(HashSet::with_capacity_and_hasher(capacity, Default::default()))
761    }
762
763    /// Returns a new empty instance.
764    fn empty() -> Self {
765        Self::new(HashSet::default())
766    }
767
768    /// Retains the given number of elements, returning an iterator over the rest.
769    pub fn retain_count(&mut self, count: usize) -> Self {
770        let rest_capacity = self.hashes.len().saturating_sub(count);
771        if rest_capacity == 0 {
772            return Self::empty()
773        }
774        let mut rest = Self::with_capacity(rest_capacity);
775
776        let mut i = 0;
777        self.hashes.retain(|hash| {
778            if i >= count {
779                rest.insert(*hash);
780                return false
781            }
782            i += 1;
783
784            true
785        });
786
787        rest
788    }
789}
790
791impl FromIterator<(TxHash, Eth68TxMetadata)> for RequestTxHashes {
792    fn from_iter<I: IntoIterator<Item = (TxHash, Eth68TxMetadata)>>(iter: I) -> Self {
793        Self::new(iter.into_iter().map(|(hash, _)| hash).collect())
794    }
795}
796
797/// The earliest block, the latest block and hash of the latest block which can be provided.
798/// See [BlockRangeUpdate](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#blockrangeupdate-0x11).
799#[derive(Clone, Debug, PartialEq, Eq, Default, RlpEncodable, RlpDecodable)]
800#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
801#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
802pub struct BlockRangeUpdate {
803    /// The earliest block which is available.
804    pub earliest: u64,
805    /// The latest block which is available.
806    pub latest: u64,
807    /// Latest available block's hash.
808    pub latest_hash: B256,
809}
810
811#[cfg(test)]
812mod tests {
813    use super::*;
814    use alloy_consensus::{transaction::TxHashRef, Typed2718};
815    use alloy_eips::eip2718::Encodable2718;
816    use alloy_primitives::{b256, hex, Signature, U256};
817    use reth_ethereum_primitives::{Transaction, TransactionSigned};
818    use std::str::FromStr;
819
820    /// Takes as input a struct / encoded hex message pair, ensuring that we encode to the exact hex
821    /// message, and decode to the exact struct.
822    fn test_encoding_vector<T: Encodable + Decodable + PartialEq + core::fmt::Debug>(
823        input: (T, &[u8]),
824    ) {
825        let (expected_decoded, expected_encoded) = input;
826        let mut encoded = Vec::new();
827        expected_decoded.encode(&mut encoded);
828
829        assert_eq!(hex::encode(&encoded), hex::encode(expected_encoded));
830
831        let decoded = T::decode(&mut encoded.as_ref()).unwrap();
832        assert_eq!(expected_decoded, decoded);
833    }
834
835    #[test]
836    fn can_return_latest_block() {
837        let mut blocks = NewBlockHashes(vec![BlockHashNumber { hash: B256::random(), number: 0 }]);
838        let latest = blocks.latest().unwrap();
839        assert_eq!(latest.number, 0);
840
841        blocks.0.push(BlockHashNumber { hash: B256::random(), number: 100 });
842        blocks.0.push(BlockHashNumber { hash: B256::random(), number: 2 });
843        let latest = blocks.latest().unwrap();
844        assert_eq!(latest.number, 100);
845    }
846
847    #[test]
848    fn eth_68_tx_hash_roundtrip() {
849        let vectors = vec![
850            (
851                NewPooledTransactionHashes68 { types: vec![], sizes: vec![], hashes: vec![] },
852                &hex!("c380c0c0")[..],
853            ),
854            (
855                NewPooledTransactionHashes68 {
856                    types: vec![0x00],
857                    sizes: vec![0x00],
858                    hashes: vec![
859                        B256::from_str(
860                            "0x0000000000000000000000000000000000000000000000000000000000000000",
861                        )
862                        .unwrap(),
863                    ],
864                },
865                &hex!(
866                    "e500c180e1a00000000000000000000000000000000000000000000000000000000000000000"
867                )[..],
868            ),
869            (
870                NewPooledTransactionHashes68 {
871                    types: vec![0x00, 0x00],
872                    sizes: vec![0x00, 0x00],
873                    hashes: vec![
874                        B256::from_str(
875                            "0x0000000000000000000000000000000000000000000000000000000000000000",
876                        )
877                        .unwrap(),
878                        B256::from_str(
879                            "0x0000000000000000000000000000000000000000000000000000000000000000",
880                        )
881                        .unwrap(),
882                    ],
883                },
884                &hex!(
885                    "f84a820000c28080f842a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000"
886                )[..],
887            ),
888            (
889                NewPooledTransactionHashes68 {
890                    types: vec![0x02],
891                    sizes: vec![0xb6],
892                    hashes: vec![
893                        B256::from_str(
894                            "0xfecbed04c7b88d8e7221a0a3f5dc33f220212347fc167459ea5cc9c3eb4c1124",
895                        )
896                        .unwrap(),
897                    ],
898                },
899                &hex!(
900                    "e602c281b6e1a0fecbed04c7b88d8e7221a0a3f5dc33f220212347fc167459ea5cc9c3eb4c1124"
901                )[..],
902            ),
903            (
904                NewPooledTransactionHashes68 {
905                    types: vec![0xff, 0xff],
906                    sizes: vec![0xffffffff, 0xffffffff],
907                    hashes: vec![
908                        B256::from_str(
909                            "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
910                        )
911                        .unwrap(),
912                        B256::from_str(
913                            "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
914                        )
915                        .unwrap(),
916                    ],
917                },
918                &hex!(
919                    "f85282ffffca84ffffffff84fffffffff842a0ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffa0ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"
920                )[..],
921            ),
922            (
923                NewPooledTransactionHashes68 {
924                    types: vec![0xff, 0xff],
925                    sizes: vec![0xffffffff, 0xffffffff],
926                    hashes: vec![
927                        B256::from_str(
928                            "0xbeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafe",
929                        )
930                        .unwrap(),
931                        B256::from_str(
932                            "0xbeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafe",
933                        )
934                        .unwrap(),
935                    ],
936                },
937                &hex!(
938                    "f85282ffffca84ffffffff84fffffffff842a0beefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafea0beefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafe"
939                )[..],
940            ),
941            (
942                NewPooledTransactionHashes68 {
943                    types: vec![0x10, 0x10],
944                    sizes: vec![0xdeadc0de, 0xdeadc0de],
945                    hashes: vec![
946                        B256::from_str(
947                            "0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e4d4e2",
948                        )
949                        .unwrap(),
950                        B256::from_str(
951                            "0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e4d4e2",
952                        )
953                        .unwrap(),
954                    ],
955                },
956                &hex!(
957                    "f852821010ca84deadc0de84deadc0def842a03b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e4d4e2a03b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e4d4e2"
958                )[..],
959            ),
960            (
961                NewPooledTransactionHashes68 {
962                    types: vec![0x6f, 0x6f],
963                    sizes: vec![0x7fffffff, 0x7fffffff],
964                    hashes: vec![
965                        B256::from_str(
966                            "0x0000000000000000000000000000000000000000000000000000000000000002",
967                        )
968                        .unwrap(),
969                        B256::from_str(
970                            "0x0000000000000000000000000000000000000000000000000000000000000002",
971                        )
972                        .unwrap(),
973                    ],
974                },
975                &hex!(
976                    "f852826f6fca847fffffff847ffffffff842a00000000000000000000000000000000000000000000000000000000000000002a00000000000000000000000000000000000000000000000000000000000000002"
977                )[..],
978            ),
979        ];
980
981        for vector in vectors {
982            test_encoding_vector(vector);
983        }
984    }
985
986    #[test]
987    fn request_hashes_retain_count_keep_subset() {
988        let mut hashes = RequestTxHashes::new(
989            [
990                b256!("0x0000000000000000000000000000000000000000000000000000000000000001"),
991                b256!("0x0000000000000000000000000000000000000000000000000000000000000002"),
992                b256!("0x0000000000000000000000000000000000000000000000000000000000000003"),
993                b256!("0x0000000000000000000000000000000000000000000000000000000000000004"),
994                b256!("0x0000000000000000000000000000000000000000000000000000000000000005"),
995            ]
996            .into_iter()
997            .collect::<HashSet<_>>(),
998        );
999
1000        let rest = hashes.retain_count(3);
1001
1002        assert_eq!(3, hashes.len());
1003        assert_eq!(2, rest.len());
1004    }
1005
1006    #[test]
1007    fn request_hashes_retain_count_keep_all() {
1008        let mut hashes = RequestTxHashes::new(
1009            [
1010                b256!("0x0000000000000000000000000000000000000000000000000000000000000001"),
1011                b256!("0x0000000000000000000000000000000000000000000000000000000000000002"),
1012                b256!("0x0000000000000000000000000000000000000000000000000000000000000003"),
1013                b256!("0x0000000000000000000000000000000000000000000000000000000000000004"),
1014                b256!("0x0000000000000000000000000000000000000000000000000000000000000005"),
1015            ]
1016            .into_iter()
1017            .collect::<HashSet<_>>(),
1018        );
1019
1020        let _ = hashes.retain_count(6);
1021
1022        assert_eq!(5, hashes.len());
1023    }
1024
1025    #[test]
1026    fn split_request_hashes_keep_none() {
1027        let mut hashes = RequestTxHashes::new(
1028            [
1029                b256!("0x0000000000000000000000000000000000000000000000000000000000000001"),
1030                b256!("0x0000000000000000000000000000000000000000000000000000000000000002"),
1031                b256!("0x0000000000000000000000000000000000000000000000000000000000000003"),
1032                b256!("0x0000000000000000000000000000000000000000000000000000000000000004"),
1033                b256!("0x0000000000000000000000000000000000000000000000000000000000000005"),
1034            ]
1035            .into_iter()
1036            .collect::<HashSet<_>>(),
1037        );
1038
1039        let rest = hashes.retain_count(0);
1040
1041        assert_eq!(0, hashes.len());
1042        assert_eq!(5, rest.len());
1043    }
1044
1045    fn signed_transaction() -> impl SignedTransaction {
1046        TransactionSigned::new_unhashed(
1047            Transaction::Legacy(Default::default()),
1048            Signature::new(
1049                U256::from_str(
1050                    "0x64b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c12",
1051                )
1052                .unwrap(),
1053                U256::from_str(
1054                    "0x64b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c10",
1055                )
1056                .unwrap(),
1057                false,
1058            ),
1059        )
1060    }
1061
1062    #[test]
1063    fn test_pooled_tx_hashes_68_push() {
1064        let tx = signed_transaction();
1065        let mut tx_hashes =
1066            NewPooledTransactionHashes68 { types: vec![], sizes: vec![], hashes: vec![] };
1067        tx_hashes.push(&tx);
1068        assert_eq!(tx_hashes.types.len(), 1);
1069        assert_eq!(tx_hashes.sizes.len(), 1);
1070        assert_eq!(tx_hashes.hashes.len(), 1);
1071        assert_eq!(tx_hashes.types[0], tx.ty());
1072        assert_eq!(tx_hashes.sizes[0], tx.encode_2718_len());
1073        assert_eq!(tx_hashes.hashes[0], *tx.tx_hash());
1074    }
1075
1076    #[test]
1077    fn test_pooled_tx_hashes_68_extend() {
1078        let tx = signed_transaction();
1079        let txs = vec![tx.clone(), tx.clone()];
1080        let mut tx_hashes =
1081            NewPooledTransactionHashes68 { types: vec![], sizes: vec![], hashes: vec![] };
1082        tx_hashes.extend(&txs);
1083        assert_eq!(tx_hashes.types.len(), 2);
1084        assert_eq!(tx_hashes.sizes.len(), 2);
1085        assert_eq!(tx_hashes.hashes.len(), 2);
1086        assert_eq!(tx_hashes.types[0], tx.ty());
1087        assert_eq!(tx_hashes.sizes[0], tx.encode_2718_len());
1088        assert_eq!(tx_hashes.hashes[0], *tx.tx_hash());
1089        assert_eq!(tx_hashes.types[1], tx.ty());
1090        assert_eq!(tx_hashes.sizes[1], tx.encode_2718_len());
1091        assert_eq!(tx_hashes.hashes[1], *tx.tx_hash());
1092    }
1093
1094    #[test]
1095    fn test_pooled_tx_hashes_68_with_transaction() {
1096        let tx = signed_transaction();
1097        let tx_hashes =
1098            NewPooledTransactionHashes68 { types: vec![], sizes: vec![], hashes: vec![] }
1099                .with_transaction(&tx);
1100        assert_eq!(tx_hashes.types.len(), 1);
1101        assert_eq!(tx_hashes.sizes.len(), 1);
1102        assert_eq!(tx_hashes.hashes.len(), 1);
1103        assert_eq!(tx_hashes.types[0], tx.ty());
1104        assert_eq!(tx_hashes.sizes[0], tx.encode_2718_len());
1105        assert_eq!(tx_hashes.hashes[0], *tx.tx_hash());
1106    }
1107
1108    #[test]
1109    fn test_pooled_tx_hashes_68_with_transactions() {
1110        let tx = signed_transaction();
1111        let txs = vec![tx.clone(), tx.clone()];
1112        let tx_hashes =
1113            NewPooledTransactionHashes68 { types: vec![], sizes: vec![], hashes: vec![] }
1114                .with_transactions(&txs);
1115        assert_eq!(tx_hashes.types.len(), 2);
1116        assert_eq!(tx_hashes.sizes.len(), 2);
1117        assert_eq!(tx_hashes.hashes.len(), 2);
1118        assert_eq!(tx_hashes.types[0], tx.ty());
1119        assert_eq!(tx_hashes.sizes[0], tx.encode_2718_len());
1120        assert_eq!(tx_hashes.hashes[0], *tx.tx_hash());
1121        assert_eq!(tx_hashes.types[1], tx.ty());
1122        assert_eq!(tx_hashes.sizes[1], tx.encode_2718_len());
1123        assert_eq!(tx_hashes.hashes[1], *tx.tx_hash());
1124    }
1125}