Skip to main content

reth_eth_wire_types/
broadcast.rs

1//! Types for broadcasting new data.
2
3use crate::{EthMessage, EthVersion, NetworkPrimitives};
4use alloc::{sync::Arc, vec::Vec};
5use alloy_primitives::{
6    map::{HashMap, HashSet},
7    Bytes, TxHash, B256, U128,
8};
9use alloy_rlp::{
10    Decodable, Encodable, RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper,
11};
12use core::{fmt::Debug, mem};
13use derive_more::{Constructor, Deref, DerefMut, From, IntoIterator};
14use reth_codecs_derive::{add_arbitrary_tests, generate_tests};
15use reth_ethereum_primitives::TransactionSigned;
16use reth_primitives_traits::{Block, SignedTransaction};
17
18/// This informs peers of new blocks that have appeared on the network.
19#[derive(
20    Clone,
21    Debug,
22    PartialEq,
23    Eq,
24    RlpEncodableWrapper,
25    RlpDecodableWrapper,
26    Default,
27    Deref,
28    IntoIterator,
29)]
30#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
31#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
32#[add_arbitrary_tests(rlp)]
33pub struct NewBlockHashes(
34    /// New block hashes and the block number for each blockhash.
35    /// Clients should request blocks using a [`GetBlockBodies`](crate::GetBlockBodies) message.
36    pub Vec<BlockHashNumber>,
37);
38
39// === impl NewBlockHashes ===
40
41impl NewBlockHashes {
42    /// Returns the latest block in the list of blocks.
43    pub fn latest(&self) -> Option<&BlockHashNumber> {
44        self.iter().max_by_key(|b| b.number)
45    }
46}
47
48/// A block hash _and_ a block number.
49#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable, Default)]
50#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
51#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
52#[add_arbitrary_tests(rlp)]
53pub struct BlockHashNumber {
54    /// The block hash
55    pub hash: B256,
56    /// The block number
57    pub number: u64,
58}
59
60impl From<Vec<BlockHashNumber>> for NewBlockHashes {
61    fn from(v: Vec<BlockHashNumber>) -> Self {
62        Self(v)
63    }
64}
65
66impl From<NewBlockHashes> for Vec<BlockHashNumber> {
67    fn from(v: NewBlockHashes) -> Self {
68        v.0
69    }
70}
71
72/// A trait for block payloads transmitted through p2p.
73pub trait NewBlockPayload:
74    Encodable + Decodable + Clone + Eq + Debug + Send + Sync + Unpin + 'static
75{
76    /// The block type.
77    type Block: Block;
78
79    /// Returns a reference to the block.
80    fn block(&self) -> &Self::Block;
81}
82
83/// A new block with the current total difficulty, which includes the difficulty of the returned
84/// block.
85#[derive(Clone, Debug, PartialEq, Eq, RlpEncodable, RlpDecodable, Default)]
86#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
87#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
88pub struct NewBlock<B = reth_ethereum_primitives::Block> {
89    /// A new block.
90    pub block: B,
91    /// The current total difficulty.
92    pub td: U128,
93}
94
95impl<B: Block + 'static> NewBlockPayload for NewBlock<B> {
96    type Block = B;
97
98    fn block(&self) -> &Self::Block {
99        &self.block
100    }
101}
102
103generate_tests!(#[rlp, 25] NewBlock<reth_ethereum_primitives::Block>, EthNewBlockTests);
104
105/// This informs peers of transactions that have appeared on the network and are not yet included
106/// in a block.
107#[derive(
108    Clone,
109    Debug,
110    PartialEq,
111    Eq,
112    RlpEncodableWrapper,
113    RlpDecodableWrapper,
114    Default,
115    Deref,
116    IntoIterator,
117)]
118#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
119#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
120#[add_arbitrary_tests(rlp, 10)]
121pub struct Transactions<T = TransactionSigned>(
122    /// New transactions for the peer to include in its mempool.
123    pub Vec<T>,
124);
125
126impl<T: SignedTransaction> Transactions<T> {
127    /// Returns `true` if the list of transactions contains any blob transactions.
128    pub fn has_eip4844(&self) -> bool {
129        self.iter().any(|tx| tx.is_eip4844())
130    }
131}
132
133impl<T> From<Vec<T>> for Transactions<T> {
134    fn from(txs: Vec<T>) -> Self {
135        Self(txs)
136    }
137}
138
139impl<T> From<Transactions<T>> for Vec<T> {
140    fn from(txs: Transactions<T>) -> Self {
141        txs.0
142    }
143}
144
145/// Same as [`Transactions`] but this is intended as egress message send from local to _many_ peers.
146///
147/// The list of transactions is constructed on per-peers basis, but the underlying transaction
148/// objects are shared.
149#[derive(
150    Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Deref, IntoIterator,
151)]
152#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
153#[add_arbitrary_tests(rlp, 20)]
154pub struct SharedTransactions<T = TransactionSigned>(
155    /// New transactions for the peer to include in its mempool.
156    pub Vec<Arc<T>>,
157);
158
159/// A wrapper type for all different new pooled transaction types
160#[derive(Clone, Debug, PartialEq, Eq)]
161#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
162pub enum NewPooledTransactionHashes {
163    /// A list of transaction hashes valid for [66-68)
164    Eth66(NewPooledTransactionHashes66),
165    /// A list of transaction hashes valid from [68..]
166    ///
167    /// Note: it is assumed that the payload is valid (all vectors have the same length)
168    Eth68(NewPooledTransactionHashes68),
169}
170
171// === impl NewPooledTransactionHashes ===
172
173impl NewPooledTransactionHashes {
174    /// Returns the message [`EthVersion`].
175    pub const fn version(&self) -> EthVersion {
176        match self {
177            Self::Eth66(_) => EthVersion::Eth66,
178            Self::Eth68(_) => EthVersion::Eth68,
179        }
180    }
181
182    /// Returns `true` if the payload is valid for the given version
183    pub const fn is_valid_for_version(&self, version: EthVersion) -> bool {
184        match self {
185            Self::Eth66(_) => {
186                matches!(version, EthVersion::Eth67 | EthVersion::Eth66)
187            }
188            Self::Eth68(_) => {
189                matches!(
190                    version,
191                    EthVersion::Eth68 | EthVersion::Eth69 | EthVersion::Eth70 | EthVersion::Eth71
192                )
193            }
194        }
195    }
196
197    /// Returns an iterator over all transaction hashes.
198    pub fn iter_hashes(&self) -> impl Iterator<Item = &B256> + '_ {
199        match self {
200            Self::Eth66(msg) => msg.iter(),
201            Self::Eth68(msg) => msg.hashes.iter(),
202        }
203    }
204
205    /// Returns an immutable reference to transaction hashes.
206    pub const fn hashes(&self) -> &Vec<B256> {
207        match self {
208            Self::Eth66(msg) => &msg.0,
209            Self::Eth68(msg) => &msg.hashes,
210        }
211    }
212
213    /// Returns a mutable reference to transaction hashes.
214    pub const fn hashes_mut(&mut self) -> &mut Vec<B256> {
215        match self {
216            Self::Eth66(msg) => &mut msg.0,
217            Self::Eth68(msg) => &mut msg.hashes,
218        }
219    }
220
221    /// Consumes the type and returns all hashes
222    pub fn into_hashes(self) -> Vec<B256> {
223        match self {
224            Self::Eth66(msg) => msg.0,
225            Self::Eth68(msg) => msg.hashes,
226        }
227    }
228
229    /// Returns an iterator over all transaction hashes.
230    pub fn into_iter_hashes(self) -> impl Iterator<Item = B256> {
231        match self {
232            Self::Eth66(msg) => msg.into_iter(),
233            Self::Eth68(msg) => msg.hashes.into_iter(),
234        }
235    }
236
237    /// Shortens the number of hashes in the message, keeping the first `len` hashes and dropping
238    /// the rest. If `len` is greater than the number of hashes, this has no effect.
239    pub fn truncate(&mut self, len: usize) {
240        match self {
241            Self::Eth66(msg) => msg.0.truncate(len),
242            Self::Eth68(msg) => {
243                msg.types.truncate(len);
244                msg.sizes.truncate(len);
245                msg.hashes.truncate(len);
246            }
247        }
248    }
249
250    /// Returns true if the message is empty
251    pub const fn is_empty(&self) -> bool {
252        match self {
253            Self::Eth66(msg) => msg.0.is_empty(),
254            Self::Eth68(msg) => msg.hashes.is_empty(),
255        }
256    }
257
258    /// Returns the number of hashes in the message
259    pub const fn len(&self) -> usize {
260        match self {
261            Self::Eth66(msg) => msg.0.len(),
262            Self::Eth68(msg) => msg.hashes.len(),
263        }
264    }
265
266    /// Returns an immutable reference to the inner type if this an eth68 announcement.
267    pub const fn as_eth68(&self) -> Option<&NewPooledTransactionHashes68> {
268        match self {
269            Self::Eth66(_) => None,
270            Self::Eth68(msg) => Some(msg),
271        }
272    }
273
274    /// Returns a mutable reference to the inner type if this an eth68 announcement.
275    pub const fn as_eth68_mut(&mut self) -> Option<&mut NewPooledTransactionHashes68> {
276        match self {
277            Self::Eth66(_) => None,
278            Self::Eth68(msg) => Some(msg),
279        }
280    }
281
282    /// Returns a mutable reference to the inner type if this an eth66 announcement.
283    pub const fn as_eth66_mut(&mut self) -> Option<&mut NewPooledTransactionHashes66> {
284        match self {
285            Self::Eth66(msg) => Some(msg),
286            Self::Eth68(_) => None,
287        }
288    }
289
290    /// Returns the inner type if this an eth68 announcement.
291    pub fn take_eth68(&mut self) -> Option<NewPooledTransactionHashes68> {
292        match self {
293            Self::Eth66(_) => None,
294            Self::Eth68(msg) => Some(mem::take(msg)),
295        }
296    }
297
298    /// Returns the inner type if this an eth66 announcement.
299    pub fn take_eth66(&mut self) -> Option<NewPooledTransactionHashes66> {
300        match self {
301            Self::Eth66(msg) => Some(mem::take(msg)),
302            Self::Eth68(_) => None,
303        }
304    }
305}
306
307impl<N: NetworkPrimitives> From<NewPooledTransactionHashes> for EthMessage<N> {
308    fn from(value: NewPooledTransactionHashes) -> Self {
309        match value {
310            NewPooledTransactionHashes::Eth66(msg) => Self::NewPooledTransactionHashes66(msg),
311            NewPooledTransactionHashes::Eth68(msg) => Self::NewPooledTransactionHashes68(msg),
312        }
313    }
314}
315
316impl From<NewPooledTransactionHashes66> for NewPooledTransactionHashes {
317    fn from(hashes: NewPooledTransactionHashes66) -> Self {
318        Self::Eth66(hashes)
319    }
320}
321
322impl From<NewPooledTransactionHashes68> for NewPooledTransactionHashes {
323    fn from(hashes: NewPooledTransactionHashes68) -> Self {
324        Self::Eth68(hashes)
325    }
326}
327
328/// This informs peers of transaction hashes for transactions that have appeared on the network,
329/// but have not been included in a block.
330#[derive(
331    Clone,
332    Debug,
333    PartialEq,
334    Eq,
335    RlpEncodableWrapper,
336    RlpDecodableWrapper,
337    Default,
338    Deref,
339    IntoIterator,
340)]
341#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
342#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
343#[add_arbitrary_tests(rlp)]
344pub struct NewPooledTransactionHashes66(
345    /// Transaction hashes for new transactions that have appeared on the network.
346    /// Clients should request the transactions with the given hashes using a
347    /// [`GetPooledTransactions`](crate::GetPooledTransactions) message.
348    pub Vec<B256>,
349);
350
351impl From<Vec<B256>> for NewPooledTransactionHashes66 {
352    fn from(v: Vec<B256>) -> Self {
353        Self(v)
354    }
355}
356
357/// Same as [`NewPooledTransactionHashes66`] but extends that beside the transaction hashes,
358/// the node sends the transaction types and their sizes (as defined in EIP-2718) as well.
359#[derive(Clone, Debug, PartialEq, Eq, Default)]
360#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
361pub struct NewPooledTransactionHashes68 {
362    /// Transaction types for new transactions that have appeared on the network.
363    ///
364    /// ## Note on RLP encoding and decoding
365    ///
366    /// In the [eth/68 spec](https://eips.ethereum.org/EIPS/eip-5793#specification) this is defined
367    /// the following way:
368    ///  * `[type_0: B_1, type_1: B_1, ...]`
369    ///
370    /// This would make it seem like the [`Encodable`] and
371    /// [`Decodable`] implementations should directly use a `Vec<u8>` for
372    /// encoding and decoding, because it looks like this field should be encoded as a _list_ of
373    /// bytes.
374    ///
375    /// However, [this is implemented in geth as a `[]byte`
376    /// type](https://github.com/ethereum/go-ethereum/blob/82d934b1dd80cdd8190803ea9f73ed2c345e2576/eth/protocols/eth/protocol.go#L308-L313),
377    /// which [ends up being encoded as a RLP
378    /// string](https://github.com/ethereum/go-ethereum/blob/82d934b1dd80cdd8190803ea9f73ed2c345e2576/rlp/encode_test.go#L171-L176),
379    /// **not** a RLP list.
380    ///
381    /// Because of this, we do not directly use the `Vec<u8>` when encoding and decoding, and
382    /// instead use the [`Encodable`] and [`Decodable`]
383    /// implementations for `&[u8]` instead, which encodes into a RLP string, and expects an RLP
384    /// string when decoding.
385    pub types: Vec<u8>,
386    /// Transaction sizes for new transactions that have appeared on the network.
387    pub sizes: Vec<usize>,
388    /// Transaction hashes for new transactions that have appeared on the network.
389    pub hashes: Vec<B256>,
390}
391
392#[cfg(feature = "arbitrary")]
393impl proptest::prelude::Arbitrary for NewPooledTransactionHashes68 {
394    type Parameters = ();
395    fn arbitrary_with(_args: ()) -> Self::Strategy {
396        use proptest::{collection::vec, prelude::*};
397        // Generate a single random length for all vectors
398        let vec_length = any::<usize>().prop_map(|x| x % 100 + 1); // Lengths between 1 and 100
399
400        vec_length
401            .prop_flat_map(|len| {
402                // Use the generated length to create vectors of TxType, usize, and B256
403                let types_vec = vec(
404                    proptest_arbitrary_interop::arb::<reth_ethereum_primitives::TxType>()
405                        .prop_map(|ty| ty as u8),
406                    len..=len,
407                );
408
409                // Map the usize values to the range 0..131072(0x20000)
410                let sizes_vec = vec(proptest::num::usize::ANY.prop_map(|x| x % 131072), len..=len);
411                let hashes_vec = vec(any::<B256>(), len..=len);
412
413                (types_vec, sizes_vec, hashes_vec)
414            })
415            .prop_map(|(types, sizes, hashes)| Self { types, sizes, hashes })
416            .boxed()
417    }
418
419    type Strategy = proptest::prelude::BoxedStrategy<Self>;
420}
421
422impl NewPooledTransactionHashes68 {
423    /// Returns an iterator over tx hashes zipped with corresponding metadata.
424    pub fn metadata_iter(&self) -> impl Iterator<Item = (&B256, (u8, usize))> {
425        self.hashes.iter().zip(self.types.iter().copied().zip(self.sizes.iter().copied()))
426    }
427
428    /// Appends a transaction
429    pub fn push<T: SignedTransaction>(&mut self, tx: &T) {
430        self.hashes.push(*tx.tx_hash());
431        self.sizes.push(tx.encode_2718_len());
432        self.types.push(tx.ty());
433    }
434
435    /// Appends the provided transactions
436    pub fn extend<'a, T: SignedTransaction>(&mut self, txs: impl IntoIterator<Item = &'a T>) {
437        for tx in txs {
438            self.push(tx);
439        }
440    }
441
442    /// Shrinks the capacity of the message vectors as much as possible.
443    pub fn shrink_to_fit(&mut self) {
444        self.hashes.shrink_to_fit();
445        self.sizes.shrink_to_fit();
446        self.types.shrink_to_fit()
447    }
448
449    /// Consumes and appends a transaction
450    pub fn with_transaction<T: SignedTransaction>(mut self, tx: &T) -> Self {
451        self.push(tx);
452        self
453    }
454
455    /// Consumes and appends the provided transactions
456    pub fn with_transactions<'a, T: SignedTransaction>(
457        mut self,
458        txs: impl IntoIterator<Item = &'a T>,
459    ) -> Self {
460        self.extend(txs);
461        self
462    }
463}
464
465impl Encodable for NewPooledTransactionHashes68 {
466    fn encode(&self, out: &mut dyn bytes::BufMut) {
467        #[derive(RlpEncodable)]
468        struct EncodableNewPooledTransactionHashes68<'a> {
469            types: &'a [u8],
470            sizes: &'a Vec<usize>,
471            hashes: &'a Vec<B256>,
472        }
473
474        let encodable = EncodableNewPooledTransactionHashes68 {
475            types: &self.types[..],
476            sizes: &self.sizes,
477            hashes: &self.hashes,
478        };
479
480        encodable.encode(out);
481    }
482    fn length(&self) -> usize {
483        #[derive(RlpEncodable)]
484        struct EncodableNewPooledTransactionHashes68<'a> {
485            types: &'a [u8],
486            sizes: &'a Vec<usize>,
487            hashes: &'a Vec<B256>,
488        }
489
490        let encodable = EncodableNewPooledTransactionHashes68 {
491            types: &self.types[..],
492            sizes: &self.sizes,
493            hashes: &self.hashes,
494        };
495
496        encodable.length()
497    }
498}
499
500impl Decodable for NewPooledTransactionHashes68 {
501    fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
502        #[derive(RlpDecodable)]
503        struct EncodableNewPooledTransactionHashes68 {
504            types: Bytes,
505            sizes: Vec<usize>,
506            hashes: Vec<B256>,
507        }
508
509        let encodable = EncodableNewPooledTransactionHashes68::decode(buf)?;
510        let msg = Self {
511            types: encodable.types.into(),
512            sizes: encodable.sizes,
513            hashes: encodable.hashes,
514        };
515
516        if msg.hashes.len() != msg.types.len() {
517            return Err(alloy_rlp::Error::ListLengthMismatch {
518                expected: msg.hashes.len(),
519                got: msg.types.len(),
520            })
521        }
522        if msg.hashes.len() != msg.sizes.len() {
523            return Err(alloy_rlp::Error::ListLengthMismatch {
524                expected: msg.hashes.len(),
525                got: msg.sizes.len(),
526            })
527        }
528
529        Ok(msg)
530    }
531}
532
533/// Validation pass that checks for unique transaction hashes.
534pub trait DedupPayload {
535    /// Value type in [`PartiallyValidData`] map.
536    type Value;
537
538    /// The payload contains no entries.
539    fn is_empty(&self) -> bool;
540
541    /// Returns the number of entries.
542    fn len(&self) -> usize;
543
544    /// Consumes self, returning an iterator over hashes in payload.
545    fn dedup(self) -> PartiallyValidData<Self::Value>;
546}
547
548/// Value in [`PartiallyValidData`] map obtained from an announcement.
549pub type Eth68TxMetadata = Option<(u8, usize)>;
550
551impl DedupPayload for NewPooledTransactionHashes {
552    type Value = Eth68TxMetadata;
553
554    fn is_empty(&self) -> bool {
555        self.is_empty()
556    }
557
558    fn len(&self) -> usize {
559        self.len()
560    }
561
562    fn dedup(self) -> PartiallyValidData<Self::Value> {
563        match self {
564            Self::Eth66(msg) => msg.dedup(),
565            Self::Eth68(msg) => msg.dedup(),
566        }
567    }
568}
569
570impl DedupPayload for NewPooledTransactionHashes68 {
571    type Value = Eth68TxMetadata;
572
573    fn is_empty(&self) -> bool {
574        self.hashes.is_empty()
575    }
576
577    fn len(&self) -> usize {
578        self.hashes.len()
579    }
580
581    fn dedup(self) -> PartiallyValidData<Self::Value> {
582        let Self { hashes, mut sizes, mut types } = self;
583
584        let mut deduped_data = HashMap::with_capacity_and_hasher(hashes.len(), Default::default());
585
586        for hash in hashes.into_iter().rev() {
587            if let (Some(ty), Some(size)) = (types.pop(), sizes.pop()) {
588                deduped_data.insert(hash, Some((ty, size)));
589            }
590        }
591
592        PartiallyValidData::from_raw_data_eth68(deduped_data)
593    }
594}
595
596impl DedupPayload for NewPooledTransactionHashes66 {
597    type Value = Eth68TxMetadata;
598
599    fn is_empty(&self) -> bool {
600        self.0.is_empty()
601    }
602
603    fn len(&self) -> usize {
604        self.0.len()
605    }
606
607    fn dedup(self) -> PartiallyValidData<Self::Value> {
608        let Self(hashes) = self;
609
610        let mut deduped_data = HashMap::with_capacity_and_hasher(hashes.len(), Default::default());
611
612        let noop_value: Eth68TxMetadata = None;
613
614        for hash in hashes.into_iter().rev() {
615            deduped_data.insert(hash, noop_value);
616        }
617
618        PartiallyValidData::from_raw_data_eth66(deduped_data)
619    }
620}
621
622/// Interface for handling mempool message data. Used in various filters in pipelines in
623/// `TransactionsManager` and in queries to `TransactionPool`.
624pub trait HandleMempoolData {
625    /// The announcement contains no entries.
626    fn is_empty(&self) -> bool;
627
628    /// Returns the number of entries.
629    fn len(&self) -> usize;
630
631    /// Retain only entries for which the hash in the entry satisfies a given predicate.
632    fn retain_by_hash(&mut self, f: impl FnMut(&TxHash) -> bool);
633}
634
635/// Extension of [`HandleMempoolData`] interface, for mempool messages that are versioned.
636pub trait HandleVersionedMempoolData {
637    /// Returns the announcement version, either [`Eth66`](EthVersion::Eth66) or
638    /// [`Eth68`](EthVersion::Eth68).
639    fn msg_version(&self) -> EthVersion;
640}
641
642impl<T: SignedTransaction> HandleMempoolData for Vec<T> {
643    fn is_empty(&self) -> bool {
644        self.is_empty()
645    }
646
647    fn len(&self) -> usize {
648        self.len()
649    }
650
651    fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
652        self.retain(|tx| f(tx.tx_hash()))
653    }
654}
655
656macro_rules! handle_mempool_data_map_impl {
657    ($data_ty:ty, $(<$generic:ident>)?) => {
658        impl$(<$generic>)? HandleMempoolData for $data_ty {
659            fn is_empty(&self) -> bool {
660                self.data.is_empty()
661            }
662
663            fn len(&self) -> usize {
664                self.data.len()
665            }
666
667            fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) {
668                self.data.retain(|hash, _| f(hash));
669            }
670        }
671    };
672}
673
674/// Data that has passed an initial validation pass that is not specific to any mempool message
675/// type.
676#[derive(Debug, Deref, DerefMut, IntoIterator)]
677pub struct PartiallyValidData<V> {
678    #[deref]
679    #[deref_mut]
680    #[into_iterator]
681    data: HashMap<TxHash, V>,
682    version: Option<EthVersion>,
683}
684
685handle_mempool_data_map_impl!(PartiallyValidData<V>, <V>);
686
687impl<V> PartiallyValidData<V> {
688    /// Wraps raw data.
689    pub const fn from_raw_data(data: HashMap<TxHash, V>, version: Option<EthVersion>) -> Self {
690        Self { data, version }
691    }
692
693    /// Wraps raw data with version [`EthVersion::Eth68`].
694    pub const fn from_raw_data_eth68(data: HashMap<TxHash, V>) -> Self {
695        Self::from_raw_data(data, Some(EthVersion::Eth68))
696    }
697
698    /// Wraps raw data with version [`EthVersion::Eth66`].
699    pub const fn from_raw_data_eth66(data: HashMap<TxHash, V>) -> Self {
700        Self::from_raw_data(data, Some(EthVersion::Eth66))
701    }
702
703    /// Returns a new [`PartiallyValidData`] with empty data from an [`Eth68`](EthVersion::Eth68)
704    /// announcement.
705    pub fn empty_eth68() -> Self {
706        Self::from_raw_data_eth68(HashMap::default())
707    }
708
709    /// Returns a new [`PartiallyValidData`] with empty data from an [`Eth66`](EthVersion::Eth66)
710    /// announcement.
711    pub fn empty_eth66() -> Self {
712        Self::from_raw_data_eth66(HashMap::default())
713    }
714
715    /// Returns the version of the message this data was received in if different versions of the
716    /// message exists, either [`Eth66`](EthVersion::Eth66) or [`Eth68`](EthVersion::Eth68).
717    pub const fn msg_version(&self) -> Option<EthVersion> {
718        self.version
719    }
720
721    /// Destructs returning the validated data.
722    pub fn into_data(self) -> HashMap<TxHash, V> {
723        self.data
724    }
725}
726
727/// Partially validated data from an announcement or a
728/// [`PooledTransactions`](crate::PooledTransactions) response.
729#[derive(Debug, Deref, DerefMut, IntoIterator, From)]
730pub struct ValidAnnouncementData {
731    #[deref]
732    #[deref_mut]
733    #[into_iterator]
734    data: HashMap<TxHash, Eth68TxMetadata>,
735    version: EthVersion,
736}
737
738handle_mempool_data_map_impl!(ValidAnnouncementData,);
739
740impl ValidAnnouncementData {
741    /// Destructs returning only the valid hashes and the announcement message version. Caution! If
742    /// this is [`Eth68`](EthVersion::Eth68) announcement data, this drops the metadata.
743    pub fn into_request_hashes(self) -> (RequestTxHashes, EthVersion) {
744        let hashes = self.data.into_keys().collect::<HashSet<_>>();
745
746        (RequestTxHashes::new(hashes), self.version)
747    }
748
749    /// Conversion from [`PartiallyValidData`] from an announcement. Note! [`PartiallyValidData`]
750    /// from an announcement, should have some [`EthVersion`]. Panics if [`PartiallyValidData`] has
751    /// version set to `None`.
752    pub fn from_partially_valid_data(data: PartiallyValidData<Eth68TxMetadata>) -> Self {
753        let PartiallyValidData { data, version } = data;
754
755        let version = version.expect("should have eth version for conversion");
756
757        Self { data, version }
758    }
759
760    /// Destructs returning the validated data.
761    pub fn into_data(self) -> HashMap<TxHash, Eth68TxMetadata> {
762        self.data
763    }
764}
765
766impl HandleVersionedMempoolData for ValidAnnouncementData {
767    fn msg_version(&self) -> EthVersion {
768        self.version
769    }
770}
771
772/// Hashes to request from a peer.
773#[derive(Debug, Default, Deref, DerefMut, IntoIterator, Constructor)]
774pub struct RequestTxHashes {
775    #[deref]
776    #[deref_mut]
777    #[into_iterator(owned, ref)]
778    hashes: HashSet<TxHash>,
779}
780
781impl RequestTxHashes {
782    /// Returns a new [`RequestTxHashes`] with given capacity for hashes. Caution! Make sure to
783    /// call [`HashSet::shrink_to_fit`] on [`RequestTxHashes`] when full, especially where it will
784    /// be stored in its entirety like in the future waiting for a
785    /// [`GetPooledTransactions`](crate::GetPooledTransactions) request to resolve.
786    pub fn with_capacity(capacity: usize) -> Self {
787        Self::new(HashSet::with_capacity_and_hasher(capacity, Default::default()))
788    }
789
790    /// Returns a new empty instance.
791    fn empty() -> Self {
792        Self::new(HashSet::default())
793    }
794
795    /// Retains the given number of elements, returning an iterator over the rest.
796    pub fn retain_count(&mut self, count: usize) -> Self {
797        let rest_capacity = self.hashes.len().saturating_sub(count);
798        if rest_capacity == 0 {
799            return Self::empty()
800        }
801        let mut rest = Self::with_capacity(rest_capacity);
802
803        let mut i = 0;
804        self.hashes.retain(|hash| {
805            if i >= count {
806                rest.insert(*hash);
807                return false
808            }
809            i += 1;
810
811            true
812        });
813
814        rest
815    }
816}
817
818impl FromIterator<(TxHash, Eth68TxMetadata)> for RequestTxHashes {
819    fn from_iter<I: IntoIterator<Item = (TxHash, Eth68TxMetadata)>>(iter: I) -> Self {
820        Self::new(iter.into_iter().map(|(hash, _)| hash).collect())
821    }
822}
823
824/// The earliest block, the latest block and hash of the latest block which can be provided.
825/// See [BlockRangeUpdate](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#blockrangeupdate-0x11).
826#[derive(Clone, Debug, PartialEq, Eq, Default, RlpEncodable, RlpDecodable)]
827#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
828#[cfg_attr(feature = "serde", serde(rename_all = "camelCase"))]
829pub struct BlockRangeUpdate {
830    /// The earliest block which is available.
831    pub earliest: u64,
832    /// The latest block which is available.
833    pub latest: u64,
834    /// Latest available block's hash.
835    pub latest_hash: B256,
836}
837
838#[cfg(test)]
839mod tests {
840    use super::*;
841    use alloy_consensus::{transaction::TxHashRef, Typed2718};
842    use alloy_eips::eip2718::Encodable2718;
843    use alloy_primitives::{b256, hex, Signature, U256};
844    use reth_ethereum_primitives::{Transaction, TransactionSigned};
845    use std::str::FromStr;
846
847    /// Takes as input a struct / encoded hex message pair, ensuring that we encode to the exact hex
848    /// message, and decode to the exact struct.
849    fn test_encoding_vector<T: Encodable + Decodable + PartialEq + core::fmt::Debug>(
850        input: (T, &[u8]),
851    ) {
852        let (expected_decoded, expected_encoded) = input;
853        let mut encoded = Vec::new();
854        expected_decoded.encode(&mut encoded);
855
856        assert_eq!(hex::encode(&encoded), hex::encode(expected_encoded));
857
858        let decoded = T::decode(&mut encoded.as_ref()).unwrap();
859        assert_eq!(expected_decoded, decoded);
860    }
861
862    #[test]
863    fn can_return_latest_block() {
864        let mut blocks = NewBlockHashes(vec![BlockHashNumber { hash: B256::random(), number: 0 }]);
865        let latest = blocks.latest().unwrap();
866        assert_eq!(latest.number, 0);
867
868        blocks.0.push(BlockHashNumber { hash: B256::random(), number: 100 });
869        blocks.0.push(BlockHashNumber { hash: B256::random(), number: 2 });
870        let latest = blocks.latest().unwrap();
871        assert_eq!(latest.number, 100);
872    }
873
874    #[test]
875    fn eth_68_tx_hash_roundtrip() {
876        let vectors = vec![
877            (
878                NewPooledTransactionHashes68 { types: vec![], sizes: vec![], hashes: vec![] },
879                &hex!("c380c0c0")[..],
880            ),
881            (
882                NewPooledTransactionHashes68 {
883                    types: vec![0x00],
884                    sizes: vec![0x00],
885                    hashes: vec![
886                        B256::from_str(
887                            "0x0000000000000000000000000000000000000000000000000000000000000000",
888                        )
889                        .unwrap(),
890                    ],
891                },
892                &hex!(
893                    "e500c180e1a00000000000000000000000000000000000000000000000000000000000000000"
894                )[..],
895            ),
896            (
897                NewPooledTransactionHashes68 {
898                    types: vec![0x00, 0x00],
899                    sizes: vec![0x00, 0x00],
900                    hashes: vec![
901                        B256::from_str(
902                            "0x0000000000000000000000000000000000000000000000000000000000000000",
903                        )
904                        .unwrap(),
905                        B256::from_str(
906                            "0x0000000000000000000000000000000000000000000000000000000000000000",
907                        )
908                        .unwrap(),
909                    ],
910                },
911                &hex!(
912                    "f84a820000c28080f842a00000000000000000000000000000000000000000000000000000000000000000a00000000000000000000000000000000000000000000000000000000000000000"
913                )[..],
914            ),
915            (
916                NewPooledTransactionHashes68 {
917                    types: vec![0x02],
918                    sizes: vec![0xb6],
919                    hashes: vec![
920                        B256::from_str(
921                            "0xfecbed04c7b88d8e7221a0a3f5dc33f220212347fc167459ea5cc9c3eb4c1124",
922                        )
923                        .unwrap(),
924                    ],
925                },
926                &hex!(
927                    "e602c281b6e1a0fecbed04c7b88d8e7221a0a3f5dc33f220212347fc167459ea5cc9c3eb4c1124"
928                )[..],
929            ),
930            (
931                NewPooledTransactionHashes68 {
932                    types: vec![0xff, 0xff],
933                    sizes: vec![0xffffffff, 0xffffffff],
934                    hashes: vec![
935                        B256::from_str(
936                            "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
937                        )
938                        .unwrap(),
939                        B256::from_str(
940                            "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff",
941                        )
942                        .unwrap(),
943                    ],
944                },
945                &hex!(
946                    "f85282ffffca84ffffffff84fffffffff842a0ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffa0ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"
947                )[..],
948            ),
949            (
950                NewPooledTransactionHashes68 {
951                    types: vec![0xff, 0xff],
952                    sizes: vec![0xffffffff, 0xffffffff],
953                    hashes: vec![
954                        B256::from_str(
955                            "0xbeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafe",
956                        )
957                        .unwrap(),
958                        B256::from_str(
959                            "0xbeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafe",
960                        )
961                        .unwrap(),
962                    ],
963                },
964                &hex!(
965                    "f85282ffffca84ffffffff84fffffffff842a0beefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafea0beefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafebeefcafe"
966                )[..],
967            ),
968            (
969                NewPooledTransactionHashes68 {
970                    types: vec![0x10, 0x10],
971                    sizes: vec![0xdeadc0de, 0xdeadc0de],
972                    hashes: vec![
973                        B256::from_str(
974                            "0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e4d4e2",
975                        )
976                        .unwrap(),
977                        B256::from_str(
978                            "0x3b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e4d4e2",
979                        )
980                        .unwrap(),
981                    ],
982                },
983                &hex!(
984                    "f852821010ca84deadc0de84deadc0def842a03b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e4d4e2a03b9aca00f0671c9a2a1b817a0a78d3fe0c0f776cccb2a8c3c1b412a4f4e4d4e2"
985                )[..],
986            ),
987            (
988                NewPooledTransactionHashes68 {
989                    types: vec![0x6f, 0x6f],
990                    sizes: vec![0x7fffffff, 0x7fffffff],
991                    hashes: vec![
992                        B256::from_str(
993                            "0x0000000000000000000000000000000000000000000000000000000000000002",
994                        )
995                        .unwrap(),
996                        B256::from_str(
997                            "0x0000000000000000000000000000000000000000000000000000000000000002",
998                        )
999                        .unwrap(),
1000                    ],
1001                },
1002                &hex!(
1003                    "f852826f6fca847fffffff847ffffffff842a00000000000000000000000000000000000000000000000000000000000000002a00000000000000000000000000000000000000000000000000000000000000002"
1004                )[..],
1005            ),
1006        ];
1007
1008        for vector in vectors {
1009            test_encoding_vector(vector);
1010        }
1011    }
1012
1013    #[test]
1014    fn request_hashes_retain_count_keep_subset() {
1015        let mut hashes = RequestTxHashes::new(
1016            [
1017                b256!("0x0000000000000000000000000000000000000000000000000000000000000001"),
1018                b256!("0x0000000000000000000000000000000000000000000000000000000000000002"),
1019                b256!("0x0000000000000000000000000000000000000000000000000000000000000003"),
1020                b256!("0x0000000000000000000000000000000000000000000000000000000000000004"),
1021                b256!("0x0000000000000000000000000000000000000000000000000000000000000005"),
1022            ]
1023            .into_iter()
1024            .collect::<HashSet<_>>(),
1025        );
1026
1027        let rest = hashes.retain_count(3);
1028
1029        assert_eq!(3, hashes.len());
1030        assert_eq!(2, rest.len());
1031    }
1032
1033    #[test]
1034    fn request_hashes_retain_count_keep_all() {
1035        let mut hashes = RequestTxHashes::new(
1036            [
1037                b256!("0x0000000000000000000000000000000000000000000000000000000000000001"),
1038                b256!("0x0000000000000000000000000000000000000000000000000000000000000002"),
1039                b256!("0x0000000000000000000000000000000000000000000000000000000000000003"),
1040                b256!("0x0000000000000000000000000000000000000000000000000000000000000004"),
1041                b256!("0x0000000000000000000000000000000000000000000000000000000000000005"),
1042            ]
1043            .into_iter()
1044            .collect::<HashSet<_>>(),
1045        );
1046
1047        let _ = hashes.retain_count(6);
1048
1049        assert_eq!(5, hashes.len());
1050    }
1051
1052    #[test]
1053    fn split_request_hashes_keep_none() {
1054        let mut hashes = RequestTxHashes::new(
1055            [
1056                b256!("0x0000000000000000000000000000000000000000000000000000000000000001"),
1057                b256!("0x0000000000000000000000000000000000000000000000000000000000000002"),
1058                b256!("0x0000000000000000000000000000000000000000000000000000000000000003"),
1059                b256!("0x0000000000000000000000000000000000000000000000000000000000000004"),
1060                b256!("0x0000000000000000000000000000000000000000000000000000000000000005"),
1061            ]
1062            .into_iter()
1063            .collect::<HashSet<_>>(),
1064        );
1065
1066        let rest = hashes.retain_count(0);
1067
1068        assert_eq!(0, hashes.len());
1069        assert_eq!(5, rest.len());
1070    }
1071
1072    fn signed_transaction() -> impl SignedTransaction {
1073        TransactionSigned::new_unhashed(
1074            Transaction::Legacy(Default::default()),
1075            Signature::new(
1076                U256::from_str(
1077                    "0x64b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c12",
1078                )
1079                .unwrap(),
1080                U256::from_str(
1081                    "0x64b1702d9298fee62dfeccc57d322a463ad55ca201256d01f62b45b2e1c21c10",
1082                )
1083                .unwrap(),
1084                false,
1085            ),
1086        )
1087    }
1088
1089    #[test]
1090    fn test_pooled_tx_hashes_68_push() {
1091        let tx = signed_transaction();
1092        let mut tx_hashes =
1093            NewPooledTransactionHashes68 { types: vec![], sizes: vec![], hashes: vec![] };
1094        tx_hashes.push(&tx);
1095        assert_eq!(tx_hashes.types.len(), 1);
1096        assert_eq!(tx_hashes.sizes.len(), 1);
1097        assert_eq!(tx_hashes.hashes.len(), 1);
1098        assert_eq!(tx_hashes.types[0], tx.ty());
1099        assert_eq!(tx_hashes.sizes[0], tx.encode_2718_len());
1100        assert_eq!(tx_hashes.hashes[0], *tx.tx_hash());
1101    }
1102
1103    #[test]
1104    fn test_pooled_tx_hashes_68_extend() {
1105        let tx = signed_transaction();
1106        let txs = vec![tx.clone(), tx.clone()];
1107        let mut tx_hashes =
1108            NewPooledTransactionHashes68 { types: vec![], sizes: vec![], hashes: vec![] };
1109        tx_hashes.extend(&txs);
1110        assert_eq!(tx_hashes.types.len(), 2);
1111        assert_eq!(tx_hashes.sizes.len(), 2);
1112        assert_eq!(tx_hashes.hashes.len(), 2);
1113        assert_eq!(tx_hashes.types[0], tx.ty());
1114        assert_eq!(tx_hashes.sizes[0], tx.encode_2718_len());
1115        assert_eq!(tx_hashes.hashes[0], *tx.tx_hash());
1116        assert_eq!(tx_hashes.types[1], tx.ty());
1117        assert_eq!(tx_hashes.sizes[1], tx.encode_2718_len());
1118        assert_eq!(tx_hashes.hashes[1], *tx.tx_hash());
1119    }
1120
1121    #[test]
1122    fn test_pooled_tx_hashes_68_with_transaction() {
1123        let tx = signed_transaction();
1124        let tx_hashes =
1125            NewPooledTransactionHashes68 { types: vec![], sizes: vec![], hashes: vec![] }
1126                .with_transaction(&tx);
1127        assert_eq!(tx_hashes.types.len(), 1);
1128        assert_eq!(tx_hashes.sizes.len(), 1);
1129        assert_eq!(tx_hashes.hashes.len(), 1);
1130        assert_eq!(tx_hashes.types[0], tx.ty());
1131        assert_eq!(tx_hashes.sizes[0], tx.encode_2718_len());
1132        assert_eq!(tx_hashes.hashes[0], *tx.tx_hash());
1133    }
1134
1135    #[test]
1136    fn test_pooled_tx_hashes_68_with_transactions() {
1137        let tx = signed_transaction();
1138        let txs = vec![tx.clone(), tx.clone()];
1139        let tx_hashes =
1140            NewPooledTransactionHashes68 { types: vec![], sizes: vec![], hashes: vec![] }
1141                .with_transactions(&txs);
1142        assert_eq!(tx_hashes.types.len(), 2);
1143        assert_eq!(tx_hashes.sizes.len(), 2);
1144        assert_eq!(tx_hashes.hashes.len(), 2);
1145        assert_eq!(tx_hashes.types[0], tx.ty());
1146        assert_eq!(tx_hashes.sizes[0], tx.encode_2718_len());
1147        assert_eq!(tx_hashes.hashes[0], *tx.tx_hash());
1148        assert_eq!(tx_hashes.types[1], tx.ty());
1149        assert_eq!(tx_hashes.sizes[1], tx.encode_2718_len());
1150        assert_eq!(tx_hashes.hashes[1], *tx.tx_hash());
1151    }
1152}