Skip to main content

reth_transaction_pool/pool/
parked.rs

1use crate::{
2    identifier::{SenderId, TransactionId},
3    pool::size::SizeTracker,
4    PoolTransaction, SubPoolLimit, ValidPoolTransaction, TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER,
5};
6use rustc_hash::FxHashMap;
7use smallvec::SmallVec;
8use std::{
9    cmp::Ordering,
10    collections::{hash_map::Entry, BTreeMap, BTreeSet},
11    ops::{Bound::Unbounded, Deref},
12    sync::Arc,
13};
14
15/// A pool of transactions that are currently parked and are waiting for external changes (e.g.
16/// basefee, ancestor transactions, balance) that eventually move the transaction into the pending
17/// pool.
18///
19/// Note: This type is generic over [`ParkedPool`] which enforces that the underlying transaction
20/// type is [`ValidPoolTransaction`] wrapped in an [Arc].
21#[derive(Debug, Clone)]
22pub struct ParkedPool<T: ParkedOrd> {
23    /// Keeps track of transactions inserted in the pool.
24    ///
25    /// This way we can determine when transactions were submitted to the pool.
26    submission_id: u64,
27    /// _All_ Transactions that are currently inside the pool grouped by their identifier.
28    by_id: BTreeMap<TransactionId, ParkedPoolTransaction<T>>,
29    /// Keeps track of last submission id for each sender.
30    ///
31    /// This are sorted in reverse order, so the last (highest) submission id is first, and the
32    /// lowest (oldest) is the last.
33    last_sender_submission: BTreeSet<SubmissionSenderId>,
34    /// Keeps track of the number of transactions in the pool by the sender and the last submission
35    /// id.
36    sender_transaction_count: FxHashMap<SenderId, SenderTransactionCount>,
37    /// Keeps track of the size of this pool.
38    ///
39    /// See also [`reth_primitives_traits::InMemorySize::size`].
40    size_of: SizeTracker,
41}
42
43// === impl ParkedPool ===
44
45impl<T: ParkedOrd> ParkedPool<T> {
46    /// Adds a new transactions to the pending queue.
47    pub fn add_transaction(&mut self, tx: Arc<ValidPoolTransaction<T::Transaction>>) {
48        let id = *tx.id();
49        debug_assert!(
50            !self.contains(&id),
51            "transaction already included {:?}",
52            self.get(&id).unwrap().transaction.transaction
53        );
54        let submission_id = self.next_id();
55
56        // keep track of size
57        self.size_of += tx.size();
58
59        // update or create sender entry
60        self.add_sender_count(tx.sender_id(), submission_id);
61        let transaction = ParkedPoolTransaction { submission_id, transaction: tx.into() };
62
63        self.by_id.insert(id, transaction);
64    }
65
66    /// Increments the count of transactions for the given sender and updates the tracked submission
67    /// id.
68    fn add_sender_count(&mut self, sender: SenderId, submission_id: u64) {
69        match self.sender_transaction_count.entry(sender) {
70            Entry::Occupied(mut entry) => {
71                let value = entry.get_mut();
72                // remove the __currently__ tracked submission id
73                self.last_sender_submission
74                    .remove(&SubmissionSenderId::new(sender, value.last_submission_id));
75
76                value.count += 1;
77                value.last_submission_id = submission_id;
78            }
79            Entry::Vacant(entry) => {
80                entry
81                    .insert(SenderTransactionCount { count: 1, last_submission_id: submission_id });
82            }
83        }
84        // insert a new entry
85        self.last_sender_submission.insert(SubmissionSenderId::new(sender, submission_id));
86    }
87
88    /// Decrements the count of transactions for the given sender.
89    ///
90    /// If the count reaches zero, the sender is removed from the map.
91    ///
92    /// Note: this does not update the tracked submission id for the sender, because we're only
93    /// interested in the __last__ submission id when truncating the pool.
94    fn remove_sender_count(&mut self, sender_id: SenderId) {
95        let removed_sender = match self.sender_transaction_count.entry(sender_id) {
96            Entry::Occupied(mut entry) => {
97                let value = entry.get_mut();
98                value.count -= 1;
99                if value.count == 0 {
100                    entry.remove()
101                } else {
102                    return
103                }
104            }
105            Entry::Vacant(_) => {
106                // This should never happen because the bisection between the two maps
107                unreachable!("sender count not found {:?}", sender_id);
108            }
109        };
110
111        // all transactions for this sender have been removed
112        assert!(
113            self.last_sender_submission
114                .remove(&SubmissionSenderId::new(sender_id, removed_sender.last_submission_id)),
115            "last sender transaction not found {sender_id:?}"
116        );
117    }
118
119    /// Returns an iterator over all transactions in the pool
120    pub(crate) fn all(
121        &self,
122    ) -> impl ExactSizeIterator<Item = Arc<ValidPoolTransaction<T::Transaction>>> + '_ {
123        self.by_id.values().map(|tx| tx.transaction.clone().into())
124    }
125
126    /// Removes the transaction from the pool
127    pub(crate) fn remove_transaction(
128        &mut self,
129        id: &TransactionId,
130    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
131        // remove from queues
132        let tx = self.by_id.remove(id)?;
133        self.remove_sender_count(tx.transaction.sender_id());
134
135        // keep track of size
136        self.size_of -= tx.transaction.size();
137
138        Some(tx.transaction.into())
139    }
140
141    /// Retrieves transactions by sender, using `SmallVec` to efficiently handle up to
142    /// `TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER` transactions.
143    pub(crate) fn get_txs_by_sender(
144        &self,
145        sender: SenderId,
146    ) -> SmallVec<[TransactionId; TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER]> {
147        self.by_id
148            .range((sender.start_bound(), Unbounded))
149            .take_while(move |(other, _)| sender == other.sender)
150            .map(|(tx_id, _)| *tx_id)
151            .collect()
152    }
153
154    /// Returns all transactions for the given sender, using a `BTree` range query.
155    pub(crate) fn txs_by_sender(
156        &self,
157        sender: SenderId,
158    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
159        self.by_id
160            .range((sender.start_bound(), Unbounded))
161            .take_while(move |(other, _)| sender == other.sender)
162            .map(|(_, tx)| Arc::clone(&tx.transaction))
163            .collect()
164    }
165
166    #[cfg(test)]
167    pub(crate) fn get_senders_by_submission_id(
168        &self,
169    ) -> impl Iterator<Item = SubmissionSenderId> + '_ {
170        self.last_sender_submission.iter().copied()
171    }
172
173    /// Truncates the pool by removing transactions, until the given [`SubPoolLimit`] has been met.
174    ///
175    /// This is done by first ordering senders by the last time they have submitted a transaction
176    ///
177    /// Uses sender ids sorted by each sender's last submission id. Senders with older last
178    /// submission ids are first. Note that _last_ submission ids are the newest submission id for
179    /// that sender, so this sorts senders by the last time they submitted a transaction in
180    /// descending order. Senders that have least recently submitted a transaction are first.
181    ///
182    /// Then, for each sender, all transactions for that sender are removed, until the pool limits
183    /// have been met.
184    ///
185    /// Any removed transactions are returned.
186    pub fn truncate_pool(
187        &mut self,
188        limit: SubPoolLimit,
189    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
190        if !self.exceeds(&limit) {
191            // if we are below the limits, we don't need to drop anything
192            return Vec::new()
193        }
194
195        let mut removed = Vec::with_capacity(limit.tx_excess(self.len()).unwrap_or(1));
196
197        while !self.last_sender_submission.is_empty() && limit.is_exceeded(self.len(), self.size())
198        {
199            // NOTE: This will not panic due to `!last_sender_transaction.is_empty()`
200            let sender_id = self.last_sender_submission.last().unwrap().sender_id;
201
202            // Drop transactions from this sender until the pool is under limits
203            while let Some((tx_id, _)) = self.by_id.range(sender_id.range()).next_back() {
204                let tx_id = *tx_id;
205                if let Some(tx) = self.remove_transaction(&tx_id) {
206                    removed.push(tx);
207                }
208
209                if !self.exceeds(&limit) {
210                    break
211                }
212            }
213        }
214
215        removed
216    }
217
218    const fn next_id(&mut self) -> u64 {
219        let id = self.submission_id;
220        self.submission_id = self.submission_id.wrapping_add(1);
221        id
222    }
223
224    /// The reported size of all transactions in this pool.
225    pub(crate) fn size(&self) -> usize {
226        self.size_of.into()
227    }
228
229    /// Number of transactions in the entire pool
230    pub(crate) fn len(&self) -> usize {
231        self.by_id.len()
232    }
233
234    /// Returns true if the pool exceeds the given limit
235    #[inline]
236    pub(crate) fn exceeds(&self, limit: &SubPoolLimit) -> bool {
237        limit.is_exceeded(self.len(), self.size())
238    }
239
240    /// Returns whether the pool is empty
241    #[cfg(test)]
242    pub(crate) fn is_empty(&self) -> bool {
243        self.by_id.is_empty()
244    }
245
246    /// Returns `true` if the transaction with the given id is already included in this pool.
247    pub(crate) fn contains(&self, id: &TransactionId) -> bool {
248        self.by_id.contains_key(id)
249    }
250
251    /// Retrieves a transaction with the given ID from the pool, if it exists.
252    fn get(&self, id: &TransactionId) -> Option<&ParkedPoolTransaction<T>> {
253        self.by_id.get(id)
254    }
255
256    /// Asserts that all subpool invariants
257    #[cfg(any(test, feature = "test-utils"))]
258    pub(crate) fn assert_invariants(&self) {
259        assert_eq!(
260            self.last_sender_submission.len(),
261            self.sender_transaction_count.len(),
262            "last_sender_submission.len() != sender_transaction_count.len()"
263        );
264    }
265}
266
267impl<T: PoolTransaction> ParkedPool<BasefeeOrd<T>> {
268    /// Returns all transactions that satisfy the given basefee.
269    ///
270    /// Note: this does _not_ remove the transactions
271    pub(crate) fn satisfy_base_fee_transactions(
272        &self,
273        basefee: u64,
274    ) -> Vec<Arc<ValidPoolTransaction<T>>> {
275        let mut txs = Vec::new();
276        self.satisfy_base_fee_ids(basefee as u128, |tx| {
277            txs.push(tx.clone());
278        });
279        txs
280    }
281
282    /// Returns all transactions that satisfy the given basefee.
283    fn satisfy_base_fee_ids<F>(&self, basefee: u128, mut tx_handler: F)
284    where
285        F: FnMut(&Arc<ValidPoolTransaction<T>>),
286    {
287        let mut iter = self.by_id.iter().peekable();
288
289        while let Some((id, tx)) = iter.next() {
290            if tx.transaction.max_fee_per_gas() < basefee {
291                // still parked -> skip descendant transactions
292                'this: while let Some((peek, _)) = iter.peek() {
293                    if peek.sender != id.sender {
294                        break 'this
295                    }
296                    iter.next();
297                }
298            } else {
299                tx_handler(&tx.transaction);
300            }
301        }
302    }
303
304    /// Removes all transactions from this subpool that can afford the given basefee,
305    /// invoking the provided handler for each transaction as it is removed.
306    ///
307    /// This method enforces the basefee constraint by identifying transactions that now
308    /// satisfy the basefee requirement (typically after a basefee decrease) and processing
309    /// them via the provided transaction handler closure.
310    ///
311    /// Respects per-sender nonce ordering: if the lowest-nonce transaction for a sender
312    /// still cannot afford the basefee, higher-nonce transactions from that sender are skipped.
313    ///
314    /// Note: the transactions are not returned in a particular order.
315    pub(crate) fn enforce_basefee_with<F>(&mut self, basefee: u64, mut tx_handler: F)
316    where
317        F: FnMut(Arc<ValidPoolTransaction<T>>),
318    {
319        let mut to_remove = Vec::new();
320        self.satisfy_base_fee_ids(basefee as u128, |tx| {
321            to_remove.push(*tx.id());
322        });
323
324        for id in to_remove {
325            if let Some(tx) = self.remove_transaction(&id) {
326                tx_handler(tx);
327            }
328        }
329    }
330
331    /// Removes all transactions and their dependent transaction from the subpool that no longer
332    /// satisfy the given basefee.
333    ///
334    /// Legacy method maintained for compatibility with read-only queries.
335    /// For basefee enforcement, prefer `enforce_basefee_with` for better performance.
336    ///
337    /// Note: the transactions are not returned in a particular order.
338    #[cfg(test)]
339    pub(crate) fn enforce_basefee(&mut self, basefee: u64) -> Vec<Arc<ValidPoolTransaction<T>>> {
340        let mut removed = Vec::new();
341        self.enforce_basefee_with(basefee, |tx| {
342            removed.push(tx);
343        });
344        removed
345    }
346}
347
348impl<T: ParkedOrd> Default for ParkedPool<T> {
349    fn default() -> Self {
350        Self {
351            submission_id: 0,
352            by_id: Default::default(),
353            last_sender_submission: Default::default(),
354            sender_transaction_count: Default::default(),
355            size_of: Default::default(),
356        }
357    }
358}
359
360/// Keeps track of the number of transactions and the latest submission id for each sender.
361#[derive(Debug, Clone, Default, PartialEq, Eq)]
362struct SenderTransactionCount {
363    count: u64,
364    last_submission_id: u64,
365}
366
367/// Represents a transaction in this pool.
368#[derive(Debug)]
369struct ParkedPoolTransaction<T: ParkedOrd> {
370    /// Identifier that tags when transaction was submitted in the pool.
371    submission_id: u64,
372    /// Actual transaction.
373    transaction: T,
374}
375
376impl<T: ParkedOrd> Clone for ParkedPoolTransaction<T> {
377    fn clone(&self) -> Self {
378        Self { submission_id: self.submission_id, transaction: self.transaction.clone() }
379    }
380}
381
382impl<T: ParkedOrd> Eq for ParkedPoolTransaction<T> {}
383
384impl<T: ParkedOrd> PartialEq<Self> for ParkedPoolTransaction<T> {
385    fn eq(&self, other: &Self) -> bool {
386        self.cmp(other) == Ordering::Equal
387    }
388}
389
390impl<T: ParkedOrd> PartialOrd<Self> for ParkedPoolTransaction<T> {
391    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
392        Some(self.cmp(other))
393    }
394}
395
396impl<T: ParkedOrd> Ord for ParkedPoolTransaction<T> {
397    fn cmp(&self, other: &Self) -> Ordering {
398        // This compares by the transactions first, and only if two tx are equal this compares
399        // the unique `submission_id`.
400        // "better" transactions are Greater
401        self.transaction
402            .cmp(&other.transaction)
403            .then_with(|| other.submission_id.cmp(&self.submission_id))
404    }
405}
406
407/// Includes a [`SenderId`] and `submission_id`. This is used to sort senders by their last
408/// submission id.
409#[derive(Debug, PartialEq, Eq, Copy, Clone)]
410pub(crate) struct SubmissionSenderId {
411    /// The sender id
412    pub(crate) sender_id: SenderId,
413    /// The submission id
414    pub(crate) submission_id: u64,
415}
416
417impl SubmissionSenderId {
418    /// Creates a new [`SubmissionSenderId`] based on the [`SenderId`] and `submission_id`.
419    const fn new(sender_id: SenderId, submission_id: u64) -> Self {
420        Self { sender_id, submission_id }
421    }
422}
423
424impl Ord for SubmissionSenderId {
425    fn cmp(&self, other: &Self) -> Ordering {
426        // Reverse ordering for `submission_id`
427        other.submission_id.cmp(&self.submission_id)
428    }
429}
430
431impl PartialOrd for SubmissionSenderId {
432    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
433        Some(self.cmp(other))
434    }
435}
436
437/// Helper trait used for custom `Ord` wrappers around a transaction.
438///
439/// This is effectively a wrapper for `Arc<ValidPoolTransaction>` with custom `Ord` implementation.
440pub trait ParkedOrd:
441    Ord
442    + Clone
443    + From<Arc<ValidPoolTransaction<Self::Transaction>>>
444    + Into<Arc<ValidPoolTransaction<Self::Transaction>>>
445    + Deref<Target = Arc<ValidPoolTransaction<Self::Transaction>>>
446{
447    /// The wrapper transaction type.
448    type Transaction: PoolTransaction;
449}
450
451/// Helper macro to implement necessary conversions for `ParkedOrd` trait
452macro_rules! impl_ord_wrapper {
453    ($name:ident) => {
454        impl<T: PoolTransaction> Clone for $name<T> {
455            fn clone(&self) -> Self {
456                Self(self.0.clone())
457            }
458        }
459
460        impl<T: PoolTransaction> Eq for $name<T> {}
461
462        impl<T: PoolTransaction> PartialEq<Self> for $name<T> {
463            fn eq(&self, other: &Self) -> bool {
464                self.cmp(other) == Ordering::Equal
465            }
466        }
467
468        impl<T: PoolTransaction> PartialOrd<Self> for $name<T> {
469            fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
470                Some(self.cmp(other))
471            }
472        }
473        impl<T: PoolTransaction> Deref for $name<T> {
474            type Target = Arc<ValidPoolTransaction<T>>;
475
476            fn deref(&self) -> &Self::Target {
477                &self.0
478            }
479        }
480
481        impl<T: PoolTransaction> ParkedOrd for $name<T> {
482            type Transaction = T;
483        }
484
485        impl<T: PoolTransaction> From<Arc<ValidPoolTransaction<T>>> for $name<T> {
486            fn from(value: Arc<ValidPoolTransaction<T>>) -> Self {
487                Self(value)
488            }
489        }
490
491        impl<T: PoolTransaction> From<$name<T>> for Arc<ValidPoolTransaction<T>> {
492            fn from(value: $name<T>) -> Arc<ValidPoolTransaction<T>> {
493                value.0
494            }
495        }
496    };
497}
498
499/// A new type wrapper for [`ValidPoolTransaction`]
500///
501/// This sorts transactions by their base fee.
502///
503/// Caution: This assumes all transaction in the `BaseFee` sub-pool have a fee value.
504#[derive(Debug)]
505pub struct BasefeeOrd<T: PoolTransaction>(Arc<ValidPoolTransaction<T>>);
506
507impl_ord_wrapper!(BasefeeOrd);
508
509impl<T: PoolTransaction> Ord for BasefeeOrd<T> {
510    fn cmp(&self, other: &Self) -> Ordering {
511        self.0.transaction.max_fee_per_gas().cmp(&other.0.transaction.max_fee_per_gas())
512    }
513}
514
515/// A new type wrapper for [`ValidPoolTransaction`]
516///
517/// This sorts transactions by their distance.
518///
519/// `Queued` transactions are transactions that are currently blocked by other parked (basefee,
520/// queued) or missing transactions.
521///
522/// The primary order function always compares the transaction costs first. In case these
523/// are equal, it compares the timestamps when the transactions were created.
524#[derive(Debug)]
525pub struct QueuedOrd<T: PoolTransaction>(Arc<ValidPoolTransaction<T>>);
526
527impl_ord_wrapper!(QueuedOrd);
528
529impl<T: PoolTransaction> Ord for QueuedOrd<T> {
530    fn cmp(&self, other: &Self) -> Ordering {
531        // Higher fee is better
532        self.max_fee_per_gas().cmp(&other.max_fee_per_gas()).then_with(||
533            // Lower timestamp is better
534            other.timestamp.cmp(&self.timestamp))
535    }
536}
537
538#[cfg(test)]
539mod tests {
540    use super::*;
541    use crate::test_utils::{MockTransaction, MockTransactionFactory, MockTransactionSet};
542    use alloy_consensus::{Transaction, TxType};
543    use alloy_primitives::address;
544    use std::collections::HashSet;
545
546    #[test]
547    fn test_enforce_parked_basefee() {
548        let mut f = MockTransactionFactory::default();
549        let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
550        let tx = f.validated_arc(MockTransaction::eip1559().inc_price());
551        pool.add_transaction(tx.clone());
552
553        assert!(pool.contains(tx.id()));
554        assert_eq!(pool.len(), 1);
555
556        let removed = pool.enforce_basefee(u64::MAX);
557        assert!(removed.is_empty());
558
559        let removed = pool.enforce_basefee((tx.max_fee_per_gas() - 1) as u64);
560        assert_eq!(removed.len(), 1);
561        assert!(pool.is_empty());
562    }
563
564    #[test]
565    fn test_enforce_parked_basefee_descendant() {
566        let mut f = MockTransactionFactory::default();
567        let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
568        let t = MockTransaction::eip1559().inc_price_by(10);
569        let root_tx = f.validated_arc(t.clone());
570        pool.add_transaction(root_tx.clone());
571
572        let descendant_tx = f.validated_arc(t.inc_nonce().decr_price());
573        pool.add_transaction(descendant_tx.clone());
574
575        assert!(pool.contains(root_tx.id()));
576        assert!(pool.contains(descendant_tx.id()));
577        assert_eq!(pool.len(), 2);
578
579        let removed = pool.enforce_basefee(u64::MAX);
580        assert!(removed.is_empty());
581        assert_eq!(pool.len(), 2);
582        // two dependent tx in the pool with decreasing fee
583
584        {
585            // TODO: test change might not be intended, re review
586            let mut pool2 = pool.clone();
587            let removed = pool2.enforce_basefee(root_tx.max_fee_per_gas() as u64);
588            assert_eq!(removed.len(), 1);
589            assert_eq!(pool2.len(), 1);
590            // root got popped - descendant should be skipped
591            assert!(!pool2.contains(root_tx.id()));
592            assert!(pool2.contains(descendant_tx.id()));
593        }
594
595        // remove root transaction via descendant tx fee
596        let removed = pool.enforce_basefee(descendant_tx.max_fee_per_gas() as u64);
597        assert_eq!(removed.len(), 2);
598        assert!(pool.is_empty());
599    }
600
601    #[test]
602    fn truncate_parked_by_submission_id() {
603        // this test ensures that we evict from the pending pool by sender
604        let mut f = MockTransactionFactory::default();
605        let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
606
607        let a_sender = address!("0x000000000000000000000000000000000000000a");
608        let b_sender = address!("0x000000000000000000000000000000000000000b");
609        let c_sender = address!("0x000000000000000000000000000000000000000c");
610        let d_sender = address!("0x000000000000000000000000000000000000000d");
611
612        // create a chain of transactions by sender A, B, C
613        let mut tx_set = MockTransactionSet::dependent(a_sender, 0, 4, TxType::Eip1559);
614        let a = tx_set.clone().into_vec();
615
616        let b = MockTransactionSet::dependent(b_sender, 0, 3, TxType::Eip1559).into_vec();
617        tx_set.extend(b.clone());
618
619        // C has the same number of txs as B
620        let c = MockTransactionSet::dependent(c_sender, 0, 3, TxType::Eip1559).into_vec();
621        tx_set.extend(c.clone());
622
623        let d = MockTransactionSet::dependent(d_sender, 0, 1, TxType::Eip1559).into_vec();
624        tx_set.extend(d.clone());
625
626        let all_txs = tx_set.into_vec();
627
628        // just construct a list of all txs to add
629        let expected_parked = vec![c[0].clone(), c[1].clone(), c[2].clone(), d[0].clone()]
630            .into_iter()
631            .map(|tx| (tx.sender(), tx.nonce()))
632            .collect::<HashSet<_>>();
633
634        // we expect the truncate operation to go through the senders with the most txs, removing
635        // txs based on when they were submitted, removing the oldest txs first, until the pool is
636        // not over the limit
637        let expected_removed = vec![
638            a[0].clone(),
639            a[1].clone(),
640            a[2].clone(),
641            a[3].clone(),
642            b[0].clone(),
643            b[1].clone(),
644            b[2].clone(),
645        ]
646        .into_iter()
647        .map(|tx| (tx.sender(), tx.nonce()))
648        .collect::<HashSet<_>>();
649
650        // add all the transactions to the pool
651        for tx in all_txs {
652            pool.add_transaction(f.validated_arc(tx));
653        }
654
655        // we should end up with the most recently submitted transactions
656        let pool_limit = SubPoolLimit { max_txs: 4, max_size: usize::MAX };
657
658        // truncate the pool
659        let removed = pool.truncate_pool(pool_limit);
660        assert_eq!(removed.len(), expected_removed.len());
661
662        // get the inner txs from the removed txs
663        let removed =
664            removed.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::<HashSet<_>>();
665        assert_eq!(removed, expected_removed);
666
667        // get the parked pool
668        let parked = pool.all().collect::<Vec<_>>();
669        assert_eq!(parked.len(), expected_parked.len());
670
671        // get the inner txs from the parked txs
672        let parked = parked.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::<HashSet<_>>();
673        assert_eq!(parked, expected_parked);
674    }
675
676    #[test]
677    fn test_truncate_parked_with_large_tx() {
678        let mut f = MockTransactionFactory::default();
679        let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
680        let default_limits = SubPoolLimit::default();
681
682        // create a chain of transactions by sender A
683        // make sure they are all one over half the limit
684        let a_sender = address!("0x000000000000000000000000000000000000000a");
685
686        // 2 txs, that should put the pool over the size limit but not max txs
687        let a_txs = MockTransactionSet::dependent(a_sender, 0, 2, TxType::Eip1559)
688            .into_iter()
689            .map(|mut tx| {
690                tx.set_size(default_limits.max_size / 2 + 1);
691                tx
692            })
693            .collect::<Vec<_>>();
694
695        // add all the transactions to the pool
696        for tx in a_txs {
697            pool.add_transaction(f.validated_arc(tx));
698        }
699
700        // truncate the pool, it should remove at least one transaction
701        let removed = pool.truncate_pool(default_limits);
702        assert_eq!(removed.len(), 1);
703    }
704
705    #[test]
706    fn test_senders_by_submission_id() {
707        // this test ensures that we evict from the pending pool by sender
708        let mut f = MockTransactionFactory::default();
709        let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
710
711        let a_sender = address!("0x000000000000000000000000000000000000000a");
712        let b_sender = address!("0x000000000000000000000000000000000000000b");
713        let c_sender = address!("0x000000000000000000000000000000000000000c");
714        let d_sender = address!("0x000000000000000000000000000000000000000d");
715
716        // create a chain of transactions by sender A, B, C
717        let mut tx_set = MockTransactionSet::dependent(a_sender, 0, 4, TxType::Eip1559);
718        let a = tx_set.clone().into_vec();
719
720        let b = MockTransactionSet::dependent(b_sender, 0, 3, TxType::Eip1559).into_vec();
721        tx_set.extend(b.clone());
722
723        // C has the same number of txs as B
724        let c = MockTransactionSet::dependent(c_sender, 0, 3, TxType::Eip1559).into_vec();
725        tx_set.extend(c.clone());
726
727        let d = MockTransactionSet::dependent(d_sender, 0, 1, TxType::Eip1559).into_vec();
728        tx_set.extend(d.clone());
729
730        let all_txs = tx_set.into_vec();
731
732        // add all the transactions to the pool
733        for tx in all_txs {
734            pool.add_transaction(f.validated_arc(tx));
735        }
736
737        // get senders by submission id - a4, b3, c3, d1, reversed
738        let senders = pool.get_senders_by_submission_id().map(|s| s.sender_id).collect::<Vec<_>>();
739        assert_eq!(senders.len(), 4);
740        let expected_senders = vec![d_sender, c_sender, b_sender, a_sender]
741            .into_iter()
742            .map(|s| f.ids.sender_id(&s).unwrap())
743            .collect::<Vec<_>>();
744        assert_eq!(senders, expected_senders);
745
746        // manually order the txs
747        let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
748        let all_txs = vec![d[0].clone(), b[0].clone(), c[0].clone(), a[0].clone()];
749
750        // add all the transactions to the pool
751        for tx in all_txs {
752            pool.add_transaction(f.validated_arc(tx));
753        }
754
755        let senders = pool.get_senders_by_submission_id().map(|s| s.sender_id).collect::<Vec<_>>();
756        assert_eq!(senders.len(), 4);
757        let expected_senders = vec![a_sender, c_sender, b_sender, d_sender]
758            .into_iter()
759            .map(|s| f.ids.sender_id(&s).unwrap())
760            .collect::<Vec<_>>();
761        assert_eq!(senders, expected_senders);
762    }
763
764    #[test]
765    fn test_add_sender_count_new_sender() {
766        // Initialize a mock transaction factory
767        let mut f = MockTransactionFactory::default();
768        // Create an empty transaction pool
769        let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
770        // Generate a validated transaction and add it to the pool
771        let tx = f.validated_arc(MockTransaction::eip1559().inc_price());
772        pool.add_transaction(tx);
773
774        // Define a new sender ID and submission ID
775        let sender: SenderId = 11.into();
776        let submission_id = 1;
777
778        // Add the sender count to the pool
779        pool.add_sender_count(sender, submission_id);
780
781        // Assert that the sender transaction count is updated correctly
782        assert_eq!(pool.sender_transaction_count.len(), 2);
783        let sender_info = pool.sender_transaction_count.get(&sender).unwrap();
784        assert_eq!(sender_info.count, 1);
785        assert_eq!(sender_info.last_submission_id, submission_id);
786
787        // Assert that the last sender submission is updated correctly
788        assert_eq!(pool.last_sender_submission.len(), 2);
789        let submission_info = pool.last_sender_submission.iter().next().unwrap();
790        assert_eq!(submission_info.sender_id, sender);
791        assert_eq!(submission_info.submission_id, submission_id);
792    }
793
794    #[test]
795    fn test_add_sender_count_existing_sender() {
796        // Initialize a mock transaction factory
797        let mut f = MockTransactionFactory::default();
798        // Create an empty transaction pool
799        let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
800        // Generate a validated transaction and add it to the pool
801        let tx = f.validated_arc(MockTransaction::eip1559().inc_price());
802        pool.add_transaction(tx);
803
804        // Define a sender ID and initial submission ID
805        let sender: SenderId = 11.into();
806        let initial_submission_id = 1;
807
808        // Add the sender count to the pool with the initial submission ID
809        pool.add_sender_count(sender, initial_submission_id);
810
811        // Define a new submission ID
812        let new_submission_id = 2;
813        // Add the sender count to the pool with the new submission ID
814        pool.add_sender_count(sender, new_submission_id);
815
816        // Assert that the sender transaction count is updated correctly
817        assert_eq!(pool.sender_transaction_count.len(), 2);
818        let sender_info = pool.sender_transaction_count.get(&sender).unwrap();
819        assert_eq!(sender_info.count, 2);
820        assert_eq!(sender_info.last_submission_id, new_submission_id);
821
822        // Assert that the last sender submission is updated correctly
823        assert_eq!(pool.last_sender_submission.len(), 2);
824        let submission_info = pool.last_sender_submission.iter().next().unwrap();
825        assert_eq!(submission_info.sender_id, sender);
826        assert_eq!(submission_info.submission_id, new_submission_id);
827    }
828
829    #[test]
830    fn test_add_sender_count_multiple_senders() {
831        // Initialize a mock transaction factory
832        let mut f = MockTransactionFactory::default();
833        // Create an empty transaction pool
834        let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
835        // Generate two validated transactions and add them to the pool
836        let tx1 = f.validated_arc(MockTransaction::eip1559().inc_price());
837        let tx2 = f.validated_arc(MockTransaction::eip1559().inc_price());
838        pool.add_transaction(tx1);
839        pool.add_transaction(tx2);
840
841        // Define two different sender IDs and their corresponding submission IDs
842        let sender1: SenderId = 11.into();
843        let sender2: SenderId = 22.into();
844
845        // Add the sender counts to the pool
846        pool.add_sender_count(sender1, 1);
847        pool.add_sender_count(sender2, 2);
848
849        // Assert that the sender transaction counts are updated correctly
850        assert_eq!(pool.sender_transaction_count.len(), 4);
851
852        let sender1_info = pool.sender_transaction_count.get(&sender1).unwrap();
853        assert_eq!(sender1_info.count, 1);
854        assert_eq!(sender1_info.last_submission_id, 1);
855
856        let sender2_info = pool.sender_transaction_count.get(&sender2).unwrap();
857        assert_eq!(sender2_info.count, 1);
858        assert_eq!(sender2_info.last_submission_id, 2);
859
860        // Assert that the last sender submission is updated correctly
861        assert_eq!(pool.last_sender_submission.len(), 3);
862
863        // Verify that sender 1 is not in the last sender submission
864        let submission_info1 =
865            pool.last_sender_submission.iter().find(|info| info.sender_id == sender1);
866        assert!(submission_info1.is_none());
867
868        // Verify that sender 2 is in the last sender submission
869        let submission_info2 =
870            pool.last_sender_submission.iter().find(|info| info.sender_id == sender2).unwrap();
871        assert_eq!(submission_info2.sender_id, sender2);
872        assert_eq!(submission_info2.submission_id, 2);
873    }
874
875    #[test]
876    fn test_remove_sender_count() {
877        // Initialize a mock transaction factory
878        let mut f = MockTransactionFactory::default();
879        // Create an empty transaction pool
880        let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
881        // Generate two validated transactions and add them to the pool
882        let tx1 = f.validated_arc(MockTransaction::eip1559().inc_price());
883        let tx2 = f.validated_arc(MockTransaction::eip1559().inc_price());
884        pool.add_transaction(tx1);
885        pool.add_transaction(tx2);
886
887        // Define two different sender IDs and their corresponding submission IDs
888        let sender1: SenderId = 11.into();
889        let sender2: SenderId = 22.into();
890
891        // Add the sender counts to the pool
892        pool.add_sender_count(sender1, 1);
893
894        // We add sender 2 multiple times to test the removal of sender counts
895        pool.add_sender_count(sender2, 2);
896        pool.add_sender_count(sender2, 3);
897
898        // Before removing the sender count we should have 4 sender transaction counts
899        assert_eq!(pool.sender_transaction_count.len(), 4);
900        assert!(pool.sender_transaction_count.contains_key(&sender1));
901
902        // We should have 1 sender transaction count for sender 1 before removing the sender count
903        assert_eq!(pool.sender_transaction_count.get(&sender1).unwrap().count, 1);
904
905        // Remove the sender count for sender 1
906        pool.remove_sender_count(sender1);
907
908        // After removing the sender count we should have 3 sender transaction counts remaining
909        assert_eq!(pool.sender_transaction_count.len(), 3);
910        assert!(!pool.sender_transaction_count.contains_key(&sender1));
911
912        // Check the sender transaction count for sender 2 before removing the sender count
913        assert_eq!(
914            *pool.sender_transaction_count.get(&sender2).unwrap(),
915            SenderTransactionCount { count: 2, last_submission_id: 3 }
916        );
917
918        // Remove the sender count for sender 2
919        pool.remove_sender_count(sender2);
920
921        // After removing the sender count for sender 2, we still have 3 sender transaction counts
922        // remaining.
923        //
924        // This is because we added sender 2 multiple times and we only removed the last submission.
925        assert_eq!(pool.sender_transaction_count.len(), 3);
926        assert!(pool.sender_transaction_count.contains_key(&sender2));
927
928        // Sender transaction count for sender 2 should be updated correctly
929        assert_eq!(
930            *pool.sender_transaction_count.get(&sender2).unwrap(),
931            SenderTransactionCount { count: 1, last_submission_id: 3 }
932        );
933    }
934
935    #[test]
936    fn test_pool_size() {
937        let mut f = MockTransactionFactory::default();
938        let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
939
940        // Create a transaction with a specific size and add it to the pool
941        let tx = f.validated_arc(MockTransaction::eip1559().set_size(1024).clone());
942        pool.add_transaction(tx);
943
944        // Assert that the reported size of the pool is correct
945        assert_eq!(pool.size(), 1024);
946    }
947
948    #[test]
949    fn test_pool_len() {
950        let mut f = MockTransactionFactory::default();
951        let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
952
953        // Initially, the pool should have zero transactions
954        assert_eq!(pool.len(), 0);
955
956        // Add a transaction to the pool and check the length
957        let tx = f.validated_arc(MockTransaction::eip1559());
958        pool.add_transaction(tx);
959        assert_eq!(pool.len(), 1);
960    }
961
962    #[test]
963    fn test_pool_contains() {
964        let mut f = MockTransactionFactory::default();
965        let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
966
967        // Create a transaction and get its ID
968        let tx = f.validated_arc(MockTransaction::eip1559());
969        let tx_id = *tx.id();
970
971        // Before adding, the transaction should not be in the pool
972        assert!(!pool.contains(&tx_id));
973
974        // After adding, the transaction should be present in the pool
975        pool.add_transaction(tx);
976        assert!(pool.contains(&tx_id));
977    }
978
979    #[test]
980    fn test_get_transaction() {
981        let mut f = MockTransactionFactory::default();
982        let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
983
984        // Add a transaction to the pool and get its ID
985        let tx = f.validated_arc(MockTransaction::eip1559());
986        let tx_id = *tx.id();
987        pool.add_transaction(tx.clone());
988
989        // Retrieve the transaction using `get()` and assert it matches the added transaction
990        let retrieved = pool.get(&tx_id).expect("Transaction should exist in the pool");
991        assert_eq!(retrieved.transaction.id(), tx.id());
992    }
993
994    #[test]
995    fn test_all_transactions() {
996        let mut f = MockTransactionFactory::default();
997        let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
998
999        // Add two transactions to the pool
1000        let tx1 = f.validated_arc(MockTransaction::eip1559());
1001        let tx2 = f.validated_arc(MockTransaction::eip1559());
1002        pool.add_transaction(tx1.clone());
1003        pool.add_transaction(tx2.clone());
1004
1005        // Collect all transaction IDs from the pool
1006        let all_txs: Vec<_> = pool.all().map(|tx| *tx.id()).collect();
1007        assert_eq!(all_txs.len(), 2);
1008
1009        // Check that the IDs of both transactions are present
1010        assert!(all_txs.contains(tx1.id()));
1011        assert!(all_txs.contains(tx2.id()));
1012    }
1013
1014    #[test]
1015    fn test_truncate_pool_edge_case() {
1016        let mut f = MockTransactionFactory::default();
1017        let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
1018
1019        // Add two transactions to the pool
1020        let tx1 = f.validated_arc(MockTransaction::eip1559());
1021        let tx2 = f.validated_arc(MockTransaction::eip1559());
1022        pool.add_transaction(tx1);
1023        pool.add_transaction(tx2);
1024
1025        // Set a limit that matches the current number of transactions
1026        let limit = SubPoolLimit { max_txs: 2, max_size: usize::MAX };
1027        let removed = pool.truncate_pool(limit);
1028
1029        // No transactions should be removed
1030        assert!(removed.is_empty());
1031
1032        // Set a stricter limit that requires truncating one transaction
1033        let limit = SubPoolLimit { max_txs: 1, max_size: usize::MAX };
1034        let removed = pool.truncate_pool(limit);
1035
1036        // One transaction should be removed, and the pool should have one left
1037        assert_eq!(removed.len(), 1);
1038        assert_eq!(pool.len(), 1);
1039    }
1040
1041    #[test]
1042    fn test_satisfy_base_fee_transactions() {
1043        let mut f = MockTransactionFactory::default();
1044        let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
1045
1046        // Add two transactions with different max fees
1047        let tx1 = f.validated_arc(MockTransaction::eip1559().set_max_fee(100).clone());
1048        let tx2 = f.validated_arc(MockTransaction::eip1559().set_max_fee(200).clone());
1049        pool.add_transaction(tx1);
1050        pool.add_transaction(tx2.clone());
1051
1052        // Check that only the second transaction satisfies the base fee requirement
1053        let satisfied = pool.satisfy_base_fee_transactions(150);
1054        assert_eq!(satisfied.len(), 1);
1055        assert_eq!(satisfied[0].id(), tx2.id())
1056    }
1057
1058    #[test]
1059    fn test_remove_transaction() {
1060        let mut f = MockTransactionFactory::default();
1061        let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
1062
1063        // Add a transaction to the pool and get its ID
1064        let tx = f.validated_arc(MockTransaction::eip1559());
1065        let tx_id = *tx.id();
1066        pool.add_transaction(tx);
1067
1068        // Ensure the transaction is in the pool before removal
1069        assert!(pool.contains(&tx_id));
1070
1071        // Remove the transaction and check that it is no longer in the pool
1072        let removed = pool.remove_transaction(&tx_id);
1073        assert!(removed.is_some());
1074        assert!(!pool.contains(&tx_id));
1075    }
1076
1077    #[test]
1078    fn test_enforce_basefee_with_handler_zero_allocation() {
1079        let mut f = MockTransactionFactory::default();
1080        let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
1081
1082        // Add multiple transactions across different fee ranges
1083        let sender_a = address!("0x000000000000000000000000000000000000000a");
1084        let sender_b = address!("0x000000000000000000000000000000000000000b");
1085
1086        // Add transactions where nonce ordering allows proper processing:
1087        // Sender A: both transactions can afford basefee (500 >= 400, 600 >= 400)
1088        // Sender B: transaction cannot afford basefee (300 < 400)
1089        let txs = vec![
1090            f.validated_arc(
1091                MockTransaction::eip1559()
1092                    .set_sender(sender_a)
1093                    .set_nonce(0)
1094                    .set_max_fee(500)
1095                    .clone(),
1096            ),
1097            f.validated_arc(
1098                MockTransaction::eip1559()
1099                    .set_sender(sender_a)
1100                    .set_nonce(1)
1101                    .set_max_fee(600)
1102                    .clone(),
1103            ),
1104            f.validated_arc(
1105                MockTransaction::eip1559()
1106                    .set_sender(sender_b)
1107                    .set_nonce(0)
1108                    .set_max_fee(300)
1109                    .clone(),
1110            ),
1111        ];
1112
1113        let expected_affordable = vec![txs[0].clone(), txs[1].clone()]; // Both sender A txs
1114        for tx in txs {
1115            pool.add_transaction(tx);
1116        }
1117
1118        // Test the handler approach with zero allocations
1119        let mut processed_txs = Vec::new();
1120        let mut handler_call_count = 0;
1121
1122        pool.enforce_basefee_with(400, |tx| {
1123            processed_txs.push(tx);
1124            handler_call_count += 1;
1125        });
1126
1127        // Verify correct number of transactions processed
1128        assert_eq!(handler_call_count, 2);
1129        assert_eq!(processed_txs.len(), 2);
1130
1131        // Verify the correct transactions were processed (those with fee >= 400)
1132        let processed_ids: Vec<_> = processed_txs.iter().map(|tx| *tx.id()).collect();
1133        for expected_tx in expected_affordable {
1134            assert!(processed_ids.contains(expected_tx.id()));
1135        }
1136
1137        // Verify transactions were removed from pool
1138        assert_eq!(pool.len(), 1); // Only the 300 fee tx should remain
1139    }
1140}