reth_transaction_pool/pool/
pending.rs

1use crate::{
2    identifier::{SenderId, TransactionId},
3    pool::{
4        best::{BestTransactions, BestTransactionsWithFees},
5        size::SizeTracker,
6    },
7    Priority, SubPoolLimit, TransactionOrdering, ValidPoolTransaction,
8};
9use rustc_hash::{FxHashMap, FxHashSet};
10use std::{
11    cmp::Ordering,
12    collections::{hash_map::Entry, BTreeMap},
13    ops::Bound::Unbounded,
14    sync::Arc,
15};
16use tokio::sync::broadcast;
17
18/// A pool of validated and gapless transactions that are ready to be executed on the current state
19/// and are waiting to be included in a block.
20///
21/// This pool distinguishes between `independent` transactions and pending transactions. A
22/// transaction is `independent`, if it is in the pending pool, and it has the current on chain
23/// nonce of the sender. Meaning `independent` transactions can be executed right away, other
24/// pending transactions depend on at least one `independent` transaction.
25///
26/// Once an `independent` transaction was executed it *unlocks* the next nonce, if this transaction
27/// is also pending, then this will be moved to the `independent` queue.
28#[derive(Debug, Clone)]
29pub struct PendingPool<T: TransactionOrdering> {
30    /// How to order transactions.
31    ordering: T,
32    /// Keeps track of transactions inserted in the pool.
33    ///
34    /// This way we can determine when transactions were submitted to the pool.
35    submission_id: u64,
36    /// _All_ Transactions that are currently inside the pool grouped by their identifier.
37    by_id: BTreeMap<TransactionId, PendingTransaction<T>>,
38    /// The highest nonce transactions for each sender - like the `independent` set, but the
39    /// highest instead of lowest nonce.
40    highest_nonces: FxHashMap<SenderId, PendingTransaction<T>>,
41    /// Independent transactions that can be included directly and don't require other
42    /// transactions.
43    independent_transactions: FxHashMap<SenderId, PendingTransaction<T>>,
44    /// Keeps track of the size of this pool.
45    ///
46    /// See also [`reth_primitives_traits::InMemorySize::size`].
47    size_of: SizeTracker,
48    /// Used to broadcast new transactions that have been added to the `PendingPool` to existing
49    /// `static_files` of this pool.
50    new_transaction_notifier: broadcast::Sender<PendingTransaction<T>>,
51}
52
53// === impl PendingPool ===
54
55impl<T: TransactionOrdering> PendingPool<T> {
56    /// Create a new pool instance.
57    pub fn new(ordering: T) -> Self {
58        let (new_transaction_notifier, _) = broadcast::channel(200);
59        Self {
60            ordering,
61            submission_id: 0,
62            by_id: Default::default(),
63            independent_transactions: Default::default(),
64            highest_nonces: Default::default(),
65            size_of: Default::default(),
66            new_transaction_notifier,
67        }
68    }
69
70    /// Clear all transactions from the pool without resetting other values.
71    /// Used for atomic reordering during basefee update.
72    ///
73    /// # Returns
74    ///
75    /// Returns all transactions by id.
76    fn clear_transactions(&mut self) -> BTreeMap<TransactionId, PendingTransaction<T>> {
77        self.independent_transactions.clear();
78        self.highest_nonces.clear();
79        self.size_of.reset();
80        std::mem::take(&mut self.by_id)
81    }
82
83    /// Returns an iterator over all transactions that are _currently_ ready.
84    ///
85    /// 1. The iterator _always_ returns transactions in order: it never returns a transaction with
86    ///    an unsatisfied dependency and only returns them if dependency transaction were yielded
87    ///    previously. In other words: the nonces of transactions with the same sender will _always_
88    ///    increase by exactly 1.
89    ///
90    /// The order of transactions which satisfy (1.) is determined by their computed priority: a
91    /// transaction with a higher priority is returned before a transaction with a lower priority.
92    ///
93    /// If two transactions have the same priority score, then the transactions which spent more
94    /// time in pool (were added earlier) are returned first.
95    ///
96    /// NOTE: while this iterator returns transaction that pool considers valid at this point, they
97    /// could potentially be become invalid at point of execution. Therefore, this iterator
98    /// provides a way to mark transactions that the consumer of this iterator considers invalid. In
99    /// which case the transaction's subgraph is also automatically marked invalid, See (1.).
100    /// Invalid transactions are skipped.
101    pub fn best(&self) -> BestTransactions<T> {
102        BestTransactions {
103            all: self.by_id.clone(),
104            independent: self.independent_transactions.values().cloned().collect(),
105            invalid: Default::default(),
106            new_transaction_receiver: Some(self.new_transaction_notifier.subscribe()),
107            skip_blobs: false,
108        }
109    }
110
111    /// Same as `best` but only returns transactions that satisfy the given basefee and blobfee.
112    pub(crate) fn best_with_basefee_and_blobfee(
113        &self,
114        base_fee: u64,
115        base_fee_per_blob_gas: u64,
116    ) -> BestTransactionsWithFees<T> {
117        BestTransactionsWithFees { best: self.best(), base_fee, base_fee_per_blob_gas }
118    }
119
120    /// Same as `best` but also includes the given unlocked transactions.
121    ///
122    /// This mimics the [`Self::add_transaction`] method, but does not insert the transactions into
123    /// pool but only into the returned iterator.
124    ///
125    /// Note: this does not insert the unlocked transactions into the pool.
126    ///
127    /// # Panics
128    ///
129    /// if the transaction is already included
130    pub(crate) fn best_with_unlocked(
131        &self,
132        unlocked: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
133        base_fee: u64,
134    ) -> BestTransactions<T> {
135        let mut best = self.best();
136        let mut submission_id = self.submission_id;
137        for tx in unlocked {
138            submission_id += 1;
139            debug_assert!(!best.all.contains_key(tx.id()), "transaction already included");
140            let priority = self.ordering.priority(&tx.transaction, base_fee);
141            let tx_id = *tx.id();
142            let transaction = PendingTransaction { submission_id, transaction: tx, priority };
143            if best.ancestor(&tx_id).is_none() {
144                best.independent.insert(transaction.clone());
145            }
146            best.all.insert(tx_id, transaction);
147        }
148
149        best
150    }
151
152    /// Returns an iterator over all transactions in the pool
153    pub(crate) fn all(
154        &self,
155    ) -> impl Iterator<Item = Arc<ValidPoolTransaction<T::Transaction>>> + '_ {
156        self.by_id.values().map(|tx| tx.transaction.clone())
157    }
158
159    /// Updates the pool with the new blob fee. Removes
160    /// from the subpool all transactions and their dependents that no longer satisfy the given
161    /// blob fee (`tx.max_blob_fee < blob_fee`).
162    ///
163    /// Note: the transactions are not returned in a particular order.
164    ///
165    /// # Returns
166    ///
167    /// Removed transactions that no longer satisfy the blob fee.
168    pub(crate) fn update_blob_fee(
169        &mut self,
170        blob_fee: u128,
171    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
172        // Create a collection for removed transactions.
173        let mut removed = Vec::new();
174
175        // Drain and iterate over all transactions.
176        let mut transactions_iter = self.clear_transactions().into_iter().peekable();
177        while let Some((id, tx)) = transactions_iter.next() {
178            if tx.transaction.max_fee_per_blob_gas() < Some(blob_fee) {
179                // Add this tx to the removed collection since it no longer satisfies the blob fee
180                // condition. Decrease the total pool size.
181                removed.push(Arc::clone(&tx.transaction));
182
183                // Remove all dependent transactions.
184                'this: while let Some((next_id, next_tx)) = transactions_iter.peek() {
185                    if next_id.sender != id.sender {
186                        break 'this
187                    }
188                    removed.push(Arc::clone(&next_tx.transaction));
189                    transactions_iter.next();
190                }
191            } else {
192                self.size_of += tx.transaction.size();
193                self.update_independents_and_highest_nonces(&tx);
194                self.by_id.insert(id, tx);
195            }
196        }
197
198        removed
199    }
200
201    /// Updates the pool with the new base fee. Reorders transactions by new priorities. Removes
202    /// from the subpool all transactions and their dependents that no longer satisfy the given
203    /// base fee (`tx.fee < base_fee`).
204    ///
205    /// Note: the transactions are not returned in a particular order.
206    ///
207    /// # Returns
208    ///
209    /// Removed transactions that no longer satisfy the base fee.
210    pub(crate) fn update_base_fee(
211        &mut self,
212        base_fee: u64,
213    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
214        // Create a collection for removed transactions.
215        let mut removed = Vec::new();
216
217        // Drain and iterate over all transactions.
218        let mut transactions_iter = self.clear_transactions().into_iter().peekable();
219        while let Some((id, mut tx)) = transactions_iter.next() {
220            if tx.transaction.max_fee_per_gas() < base_fee as u128 {
221                // Add this tx to the removed collection since it no longer satisfies the base fee
222                // condition. Decrease the total pool size.
223                removed.push(Arc::clone(&tx.transaction));
224
225                // Remove all dependent transactions.
226                'this: while let Some((next_id, next_tx)) = transactions_iter.peek() {
227                    if next_id.sender != id.sender {
228                        break 'this
229                    }
230                    removed.push(Arc::clone(&next_tx.transaction));
231                    transactions_iter.next();
232                }
233            } else {
234                // Re-insert the transaction with new priority.
235                tx.priority = self.ordering.priority(&tx.transaction.transaction, base_fee);
236
237                self.size_of += tx.transaction.size();
238                self.update_independents_and_highest_nonces(&tx);
239                self.by_id.insert(id, tx);
240            }
241        }
242
243        removed
244    }
245
246    /// Updates the independent transaction and highest nonces set, assuming the given transaction
247    /// is being _added_ to the pool.
248    fn update_independents_and_highest_nonces(&mut self, tx: &PendingTransaction<T>) {
249        match self.highest_nonces.entry(tx.transaction.sender_id()) {
250            Entry::Occupied(mut entry) => {
251                if entry.get().transaction.nonce() < tx.transaction.nonce() {
252                    *entry.get_mut() = tx.clone();
253                }
254            }
255            Entry::Vacant(entry) => {
256                entry.insert(tx.clone());
257            }
258        }
259        match self.independent_transactions.entry(tx.transaction.sender_id()) {
260            Entry::Occupied(mut entry) => {
261                if entry.get().transaction.nonce() > tx.transaction.nonce() {
262                    *entry.get_mut() = tx.clone();
263                }
264            }
265            Entry::Vacant(entry) => {
266                entry.insert(tx.clone());
267            }
268        }
269    }
270
271    /// Returns the ancestor the given transaction, the transaction with `nonce - 1`.
272    ///
273    /// Note: for a transaction with nonce higher than the current on chain nonce this will always
274    /// return an ancestor since all transaction in this pool are gapless.
275    fn ancestor(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
276        self.get(&id.unchecked_ancestor()?)
277    }
278
279    /// Adds a new transactions to the pending queue.
280    ///
281    /// # Panics
282    ///
283    /// if the transaction is already included
284    pub fn add_transaction(
285        &mut self,
286        tx: Arc<ValidPoolTransaction<T::Transaction>>,
287        base_fee: u64,
288    ) {
289        assert!(
290            !self.contains(tx.id()),
291            "transaction already included {:?}",
292            self.get(tx.id()).unwrap().transaction
293        );
294
295        // keep track of size
296        self.size_of += tx.size();
297
298        let tx_id = *tx.id();
299
300        let submission_id = self.next_id();
301        let priority = self.ordering.priority(&tx.transaction, base_fee);
302        let tx = PendingTransaction { submission_id, transaction: tx, priority };
303
304        self.update_independents_and_highest_nonces(&tx);
305
306        // send the new transaction to any existing pendingpool static file iterators
307        if self.new_transaction_notifier.receiver_count() > 0 {
308            let _ = self.new_transaction_notifier.send(tx.clone());
309        }
310
311        self.by_id.insert(tx_id, tx);
312    }
313
314    /// Removes the transaction from the pool.
315    ///
316    /// Note: If the transaction has a descendant transaction
317    /// it will advance it to the best queue.
318    pub(crate) fn remove_transaction(
319        &mut self,
320        id: &TransactionId,
321    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
322        if let Some(lowest) = self.independent_transactions.get(&id.sender) {
323            if lowest.transaction.nonce() == id.nonce {
324                self.independent_transactions.remove(&id.sender);
325                // mark the next as independent if it exists
326                if let Some(unlocked) = self.get(&id.descendant()) {
327                    self.independent_transactions.insert(id.sender, unlocked.clone());
328                }
329            }
330        }
331
332        let tx = self.by_id.remove(id)?;
333        self.size_of -= tx.transaction.size();
334
335        if let Some(highest) = self.highest_nonces.get(&id.sender) {
336            if highest.transaction.nonce() == id.nonce {
337                self.highest_nonces.remove(&id.sender);
338            }
339            if let Some(ancestor) = self.ancestor(id) {
340                self.highest_nonces.insert(id.sender, ancestor.clone());
341            }
342        }
343        Some(tx.transaction)
344    }
345
346    fn next_id(&mut self) -> u64 {
347        let id = self.submission_id;
348        self.submission_id = self.submission_id.wrapping_add(1);
349        id
350    }
351
352    /// Traverses the pool, starting at the highest nonce set, removing the transactions which
353    /// would put the pool under the specified limits.
354    ///
355    /// This attempts to remove transactions by roughly the same amount for each sender. This is
356    /// done by removing the highest-nonce transactions for each sender.
357    ///
358    /// If the `remove_locals` flag is unset, transactions will be removed per-sender until a
359    /// local transaction is the highest nonce transaction for that sender. If all senders have a
360    /// local highest-nonce transaction, the pool will not be truncated further.
361    ///
362    /// Otherwise, if the `remove_locals` flag is set, transactions will be removed per-sender
363    /// until the pool is under the given limits.
364    ///
365    /// Any removed transactions will be added to the `end_removed` vector.
366    pub fn remove_to_limit(
367        &mut self,
368        limit: &SubPoolLimit,
369        remove_locals: bool,
370        end_removed: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
371    ) {
372        // This serves as a termination condition for the loop - it represents the number of
373        // _valid_ unique senders that might have descendants in the pool.
374        //
375        // If `remove_locals` is false, a value of zero means that there are no non-local txs in the
376        // pool that can be removed.
377        //
378        // If `remove_locals` is true, a value of zero means that there are no txs in the pool that
379        // can be removed.
380        let mut non_local_senders = self.highest_nonces.len();
381
382        // keeps track of unique senders from previous iterations, to understand how many unique
383        // senders were removed in the last iteration
384        let mut unique_senders = self.highest_nonces.len();
385
386        // keeps track of which senders we've marked as local
387        let mut local_senders = FxHashSet::default();
388
389        // keep track of transactions to remove and how many have been removed so far
390        let original_length = self.len();
391        let mut removed = Vec::new();
392        let mut total_removed = 0;
393
394        // track total `size` of transactions to remove
395        let original_size = self.size();
396        let mut total_size = 0;
397
398        loop {
399            // check how many unique senders were removed last iteration
400            let unique_removed = unique_senders - self.highest_nonces.len();
401
402            // the new number of unique senders
403            unique_senders = self.highest_nonces.len();
404            non_local_senders -= unique_removed;
405
406            // we can reuse the temp array
407            removed.clear();
408
409            // we prefer removing transactions with lower ordering
410            let mut worst_transactions = self.highest_nonces.values().collect::<Vec<_>>();
411            worst_transactions.sort();
412
413            // loop through the highest nonces set, removing transactions until we reach the limit
414            for tx in worst_transactions {
415                // return early if the pool is under limits
416                if !limit.is_exceeded(original_length - total_removed, original_size - total_size) ||
417                    non_local_senders == 0
418                {
419                    // need to remove remaining transactions before exiting
420                    for id in &removed {
421                        if let Some(tx) = self.remove_transaction(id) {
422                            end_removed.push(tx);
423                        }
424                    }
425
426                    return
427                }
428
429                if !remove_locals && tx.transaction.is_local() {
430                    let sender_id = tx.transaction.sender_id();
431                    if local_senders.insert(sender_id) {
432                        non_local_senders -= 1;
433                    }
434                    continue
435                }
436
437                total_size += tx.transaction.size();
438                total_removed += 1;
439                removed.push(*tx.transaction.id());
440            }
441
442            // remove the transactions from this iteration
443            for id in &removed {
444                if let Some(tx) = self.remove_transaction(id) {
445                    end_removed.push(tx);
446                }
447            }
448
449            // return if either the pool is under limits or there are no more _eligible_
450            // transactions to remove
451            if !self.exceeds(limit) || non_local_senders == 0 {
452                return
453            }
454        }
455    }
456
457    /// Truncates the pool to the given [`SubPoolLimit`], removing transactions until the subpool
458    /// limits are met.
459    ///
460    /// This attempts to remove transactions by roughly the same amount for each sender. For more
461    /// information on this exact process see docs for
462    /// [`remove_to_limit`](PendingPool::remove_to_limit).
463    ///
464    /// This first truncates all of the non-local transactions in the pool. If the subpool is still
465    /// not under the limit, this truncates the entire pool, including non-local transactions. The
466    /// removed transactions are returned.
467    pub fn truncate_pool(
468        &mut self,
469        limit: SubPoolLimit,
470    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
471        let mut removed = Vec::new();
472        // return early if the pool is already under the limits
473        if !self.exceeds(&limit) {
474            return removed
475        }
476
477        // first truncate only non-local transactions, returning if the pool end up under the limit
478        self.remove_to_limit(&limit, false, &mut removed);
479        if !self.exceeds(&limit) {
480            return removed
481        }
482
483        // now repeat for local transactions, since local transactions must be removed now for the
484        // pool to be under the limit
485        self.remove_to_limit(&limit, true, &mut removed);
486
487        removed
488    }
489
490    /// Returns true if the pool exceeds the given limit
491    #[inline]
492    pub(crate) fn exceeds(&self, limit: &SubPoolLimit) -> bool {
493        limit.is_exceeded(self.len(), self.size())
494    }
495
496    /// The reported size of all transactions in this pool.
497    pub(crate) fn size(&self) -> usize {
498        self.size_of.into()
499    }
500
501    /// Number of transactions in the entire pool
502    pub(crate) fn len(&self) -> usize {
503        self.by_id.len()
504    }
505
506    /// Whether the pool is empty
507    #[cfg(test)]
508    pub(crate) fn is_empty(&self) -> bool {
509        self.by_id.is_empty()
510    }
511
512    /// Returns `true` if the transaction with the given id is already included in this pool.
513    pub(crate) fn contains(&self, id: &TransactionId) -> bool {
514        self.by_id.contains_key(id)
515    }
516
517    /// Get transactions by sender
518    pub(crate) fn get_txs_by_sender(&self, sender: SenderId) -> Vec<TransactionId> {
519        self.by_id
520            .range((sender.start_bound(), Unbounded))
521            .take_while(move |(other, _)| sender == other.sender)
522            .map(|(tx_id, _)| *tx_id)
523            .collect()
524    }
525
526    /// Retrieves a transaction with the given ID from the pool, if it exists.
527    fn get(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
528        self.by_id.get(id)
529    }
530
531    /// Returns a reference to the independent transactions in the pool
532    #[cfg(test)]
533    pub(crate) const fn independent(&self) -> &FxHashMap<SenderId, PendingTransaction<T>> {
534        &self.independent_transactions
535    }
536
537    /// Asserts that the bijection between `by_id` and `all` is valid.
538    #[cfg(any(test, feature = "test-utils"))]
539    pub(crate) fn assert_invariants(&self) {
540        assert!(
541            self.independent_transactions.len() <= self.by_id.len(),
542            "independent.len() > all.len()"
543        );
544        assert!(
545            self.highest_nonces.len() <= self.by_id.len(),
546            "independent_descendants.len() > all.len()"
547        );
548        assert_eq!(
549            self.highest_nonces.len(),
550            self.independent_transactions.len(),
551            "independent.len() = independent_descendants.len()"
552        );
553    }
554}
555
556/// A transaction that is ready to be included in a block.
557#[derive(Debug)]
558pub(crate) struct PendingTransaction<T: TransactionOrdering> {
559    /// Identifier that tags when transaction was submitted in the pool.
560    pub(crate) submission_id: u64,
561    /// Actual transaction.
562    pub(crate) transaction: Arc<ValidPoolTransaction<T::Transaction>>,
563    /// The priority value assigned by the used `Ordering` function.
564    pub(crate) priority: Priority<T::PriorityValue>,
565}
566
567impl<T: TransactionOrdering> PendingTransaction<T> {
568    /// The next transaction of the sender: `nonce + 1`
569    pub(crate) fn unlocks(&self) -> TransactionId {
570        self.transaction.transaction_id.descendant()
571    }
572}
573
574impl<T: TransactionOrdering> Clone for PendingTransaction<T> {
575    fn clone(&self) -> Self {
576        Self {
577            submission_id: self.submission_id,
578            transaction: Arc::clone(&self.transaction),
579            priority: self.priority.clone(),
580        }
581    }
582}
583
584impl<T: TransactionOrdering> Eq for PendingTransaction<T> {}
585
586impl<T: TransactionOrdering> PartialEq<Self> for PendingTransaction<T> {
587    fn eq(&self, other: &Self) -> bool {
588        self.cmp(other) == Ordering::Equal
589    }
590}
591
592impl<T: TransactionOrdering> PartialOrd<Self> for PendingTransaction<T> {
593    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
594        Some(self.cmp(other))
595    }
596}
597
598impl<T: TransactionOrdering> Ord for PendingTransaction<T> {
599    fn cmp(&self, other: &Self) -> Ordering {
600        // This compares by `priority` and only if two tx have the exact same priority this compares
601        // the unique `submission_id`. This ensures that transactions with same priority are not
602        // equal, so they're not replaced in the set
603        self.priority
604            .cmp(&other.priority)
605            .then_with(|| other.submission_id.cmp(&self.submission_id))
606    }
607}
608
609#[cfg(test)]
610mod tests {
611    use super::*;
612    use crate::{
613        test_utils::{MockOrdering, MockTransaction, MockTransactionFactory, MockTransactionSet},
614        PoolTransaction,
615    };
616    use alloy_consensus::{Transaction, TxType};
617    use alloy_primitives::address;
618    use std::collections::HashSet;
619
620    #[test]
621    fn test_enforce_basefee() {
622        let mut f = MockTransactionFactory::default();
623        let mut pool = PendingPool::new(MockOrdering::default());
624        let tx = f.validated_arc(MockTransaction::eip1559().inc_price());
625        pool.add_transaction(tx.clone(), 0);
626
627        assert!(pool.contains(tx.id()));
628        assert_eq!(pool.len(), 1);
629
630        let removed = pool.update_base_fee(0);
631        assert!(removed.is_empty());
632
633        let removed = pool.update_base_fee((tx.max_fee_per_gas() + 1) as u64);
634        assert_eq!(removed.len(), 1);
635        assert!(pool.is_empty());
636    }
637
638    #[test]
639    fn test_enforce_basefee_descendant() {
640        let mut f = MockTransactionFactory::default();
641        let mut pool = PendingPool::new(MockOrdering::default());
642        let t = MockTransaction::eip1559().inc_price_by(10);
643        let root_tx = f.validated_arc(t.clone());
644        pool.add_transaction(root_tx.clone(), 0);
645
646        let descendant_tx = f.validated_arc(t.inc_nonce().decr_price());
647        pool.add_transaction(descendant_tx.clone(), 0);
648
649        assert!(pool.contains(root_tx.id()));
650        assert!(pool.contains(descendant_tx.id()));
651        assert_eq!(pool.len(), 2);
652
653        assert_eq!(pool.independent_transactions.len(), 1);
654        assert_eq!(pool.highest_nonces.len(), 1);
655
656        let removed = pool.update_base_fee(0);
657        assert!(removed.is_empty());
658
659        // two dependent tx in the pool with decreasing fee
660
661        {
662            let mut pool2 = pool.clone();
663            let removed = pool2.update_base_fee((descendant_tx.max_fee_per_gas() + 1) as u64);
664            assert_eq!(removed.len(), 1);
665            assert_eq!(pool2.len(), 1);
666            // descendant got popped
667            assert!(pool2.contains(root_tx.id()));
668            assert!(!pool2.contains(descendant_tx.id()));
669        }
670
671        // remove root transaction via fee
672        let removed = pool.update_base_fee((root_tx.max_fee_per_gas() + 1) as u64);
673        assert_eq!(removed.len(), 2);
674        assert!(pool.is_empty());
675        pool.assert_invariants();
676    }
677
678    #[test]
679    fn evict_worst() {
680        let mut f = MockTransactionFactory::default();
681        let mut pool = PendingPool::new(MockOrdering::default());
682
683        let t = MockTransaction::eip1559();
684        pool.add_transaction(f.validated_arc(t.clone()), 0);
685
686        let t2 = MockTransaction::eip1559().inc_price_by(10);
687        pool.add_transaction(f.validated_arc(t2), 0);
688
689        // First transaction should be evicted.
690        assert_eq!(
691            pool.highest_nonces.values().min().map(|tx| *tx.transaction.hash()),
692            Some(*t.hash())
693        );
694
695        // truncate pool with max size = 1, ensure it's the same transaction
696        let removed = pool.truncate_pool(SubPoolLimit { max_txs: 1, max_size: usize::MAX });
697        assert_eq!(removed.len(), 1);
698        assert_eq!(removed[0].hash(), t.hash());
699    }
700
701    #[test]
702    fn correct_independent_descendants() {
703        // this test ensures that we set the right highest nonces set for each sender
704        let mut f = MockTransactionFactory::default();
705        let mut pool = PendingPool::new(MockOrdering::default());
706
707        let a_sender = address!("0x000000000000000000000000000000000000000a");
708        let b_sender = address!("0x000000000000000000000000000000000000000b");
709        let c_sender = address!("0x000000000000000000000000000000000000000c");
710        let d_sender = address!("0x000000000000000000000000000000000000000d");
711
712        // create a chain of transactions by sender A, B, C
713        let mut tx_set = MockTransactionSet::dependent(a_sender, 0, 4, TxType::Eip1559);
714        let a = tx_set.clone().into_vec();
715
716        let b = MockTransactionSet::dependent(b_sender, 0, 3, TxType::Eip1559).into_vec();
717        tx_set.extend(b.clone());
718
719        // C has the same number of txs as B
720        let c = MockTransactionSet::dependent(c_sender, 0, 3, TxType::Eip1559).into_vec();
721        tx_set.extend(c.clone());
722
723        let d = MockTransactionSet::dependent(d_sender, 0, 1, TxType::Eip1559).into_vec();
724        tx_set.extend(d.clone());
725
726        // add all the transactions to the pool
727        let all_txs = tx_set.into_vec();
728        for tx in all_txs {
729            pool.add_transaction(f.validated_arc(tx), 0);
730        }
731
732        pool.assert_invariants();
733
734        // the independent set is the roots of each of these tx chains, these are the highest
735        // nonces for each sender
736        let expected_highest_nonces = vec![d[0].clone(), c[2].clone(), b[2].clone(), a[3].clone()]
737            .iter()
738            .map(|tx| (tx.sender(), tx.nonce()))
739            .collect::<HashSet<_>>();
740        let actual_highest_nonces = pool
741            .highest_nonces
742            .values()
743            .map(|tx| (tx.transaction.sender(), tx.transaction.nonce()))
744            .collect::<HashSet<_>>();
745        assert_eq!(expected_highest_nonces, actual_highest_nonces);
746        pool.assert_invariants();
747    }
748
749    #[test]
750    fn truncate_by_sender() {
751        // This test ensures that transactions are removed from the pending pool by sender.
752        let mut f = MockTransactionFactory::default();
753        let mut pool = PendingPool::new(MockOrdering::default());
754
755        // Addresses for simulated senders A, B, C, and D.
756        let a = address!("0x000000000000000000000000000000000000000a");
757        let b = address!("0x000000000000000000000000000000000000000b");
758        let c = address!("0x000000000000000000000000000000000000000c");
759        let d = address!("0x000000000000000000000000000000000000000d");
760
761        // Create transaction chains for senders A, B, C, and D.
762        let a_txs = MockTransactionSet::sequential_transactions_by_sender(a, 4, TxType::Eip1559);
763        let b_txs = MockTransactionSet::sequential_transactions_by_sender(b, 3, TxType::Eip1559);
764        let c_txs = MockTransactionSet::sequential_transactions_by_sender(c, 3, TxType::Eip1559);
765        let d_txs = MockTransactionSet::sequential_transactions_by_sender(d, 1, TxType::Eip1559);
766
767        // Set up expected pending transactions.
768        let expected_pending = vec![
769            a_txs.transactions[0].clone(),
770            b_txs.transactions[0].clone(),
771            c_txs.transactions[0].clone(),
772            a_txs.transactions[1].clone(),
773        ]
774        .into_iter()
775        .map(|tx| (tx.sender(), tx.nonce()))
776        .collect::<HashSet<_>>();
777
778        // Set up expected removed transactions.
779        let expected_removed = vec![
780            d_txs.transactions[0].clone(),
781            c_txs.transactions[2].clone(),
782            b_txs.transactions[2].clone(),
783            a_txs.transactions[3].clone(),
784            c_txs.transactions[1].clone(),
785            b_txs.transactions[1].clone(),
786            a_txs.transactions[2].clone(),
787        ]
788        .into_iter()
789        .map(|tx| (tx.sender(), tx.nonce()))
790        .collect::<HashSet<_>>();
791
792        // Consolidate all transactions into a single vector.
793        let all_txs =
794            [a_txs.into_vec(), b_txs.into_vec(), c_txs.into_vec(), d_txs.into_vec()].concat();
795
796        // Add all the transactions to the pool.
797        for tx in all_txs {
798            pool.add_transaction(f.validated_arc(tx), 0);
799        }
800
801        // Sanity check, ensuring everything is consistent.
802        pool.assert_invariants();
803
804        // Define the maximum total transactions to be 4, removing transactions for each sender.
805        // Expected order of removal:
806        // * d1, c3, b3, a4
807        // * c2, b2, a3
808        //
809        // Remaining transactions:
810        // * a1, a2
811        // * b1
812        // * c1
813        let pool_limit = SubPoolLimit { max_txs: 4, max_size: usize::MAX };
814
815        // Truncate the pool based on the defined limit.
816        let removed = pool.truncate_pool(pool_limit);
817        pool.assert_invariants();
818        assert_eq!(removed.len(), expected_removed.len());
819
820        // Get the set of removed transactions and compare with the expected set.
821        let removed =
822            removed.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::<HashSet<_>>();
823        assert_eq!(removed, expected_removed);
824
825        // Retrieve the current pending transactions after truncation.
826        let pending = pool.all().collect::<Vec<_>>();
827        assert_eq!(pending.len(), expected_pending.len());
828
829        // Get the set of pending transactions and compare with the expected set.
830        let pending =
831            pending.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::<HashSet<_>>();
832        assert_eq!(pending, expected_pending);
833    }
834
835    // <https://github.com/paradigmxyz/reth/issues/12340>
836    #[test]
837    fn test_eligible_updates_promoted() {
838        let mut pool = PendingPool::new(MockOrdering::default());
839        let mut f = MockTransactionFactory::default();
840
841        let num_senders = 10;
842
843        let first_txs: Vec<_> = (0..num_senders) //
844            .map(|_| MockTransaction::eip1559())
845            .collect();
846        let second_txs: Vec<_> =
847            first_txs.iter().map(|tx| tx.clone().rng_hash().inc_nonce()).collect();
848
849        for tx in first_txs {
850            let valid_tx = f.validated(tx);
851            pool.add_transaction(Arc::new(valid_tx), 0);
852        }
853
854        let mut best = pool.best();
855
856        for _ in 0..num_senders {
857            if let Some(tx) = best.next() {
858                assert_eq!(tx.nonce(), 0);
859            } else {
860                panic!("cannot read one of first_txs");
861            }
862        }
863
864        for tx in second_txs {
865            let valid_tx = f.validated(tx);
866            pool.add_transaction(Arc::new(valid_tx), 0);
867        }
868
869        for _ in 0..num_senders {
870            if let Some(tx) = best.next() {
871                assert_eq!(tx.nonce(), 1);
872            } else {
873                panic!("cannot read one of second_txs");
874            }
875        }
876    }
877
878    #[test]
879    fn test_empty_pool_behavior() {
880        let mut pool = PendingPool::<MockOrdering>::new(MockOrdering::default());
881
882        // Ensure the pool is empty
883        assert!(pool.is_empty());
884        assert_eq!(pool.len(), 0);
885        assert_eq!(pool.size(), 0);
886
887        // Verify that attempting to truncate an empty pool does not panic and returns an empty vec
888        let removed = pool.truncate_pool(SubPoolLimit { max_txs: 10, max_size: 1000 });
889        assert!(removed.is_empty());
890
891        // Verify that retrieving transactions from an empty pool yields nothing
892        let all_txs: Vec<_> = pool.all().collect();
893        assert!(all_txs.is_empty());
894    }
895
896    #[test]
897    fn test_add_remove_transaction() {
898        let mut f = MockTransactionFactory::default();
899        let mut pool = PendingPool::new(MockOrdering::default());
900
901        // Add a transaction and check if it's in the pool
902        let tx = f.validated_arc(MockTransaction::eip1559());
903        pool.add_transaction(tx.clone(), 0);
904        assert!(pool.contains(tx.id()));
905        assert_eq!(pool.len(), 1);
906
907        // Remove the transaction and ensure it's no longer in the pool
908        let removed_tx = pool.remove_transaction(tx.id()).unwrap();
909        assert_eq!(removed_tx.id(), tx.id());
910        assert!(!pool.contains(tx.id()));
911        assert_eq!(pool.len(), 0);
912    }
913
914    #[test]
915    fn test_reorder_on_basefee_update() {
916        let mut f = MockTransactionFactory::default();
917        let mut pool = PendingPool::new(MockOrdering::default());
918
919        // Add two transactions with different fees
920        let tx1 = f.validated_arc(MockTransaction::eip1559().inc_price());
921        let tx2 = f.validated_arc(MockTransaction::eip1559().inc_price_by(20));
922        pool.add_transaction(tx1.clone(), 0);
923        pool.add_transaction(tx2.clone(), 0);
924
925        // Ensure the transactions are in the correct order
926        let mut best = pool.best();
927        assert_eq!(best.next().unwrap().hash(), tx2.hash());
928        assert_eq!(best.next().unwrap().hash(), tx1.hash());
929
930        // Update the base fee to a value higher than tx1's fee, causing it to be removed
931        let removed = pool.update_base_fee((tx1.max_fee_per_gas() + 1) as u64);
932        assert_eq!(removed.len(), 1);
933        assert_eq!(removed[0].hash(), tx1.hash());
934
935        // Verify that only tx2 remains in the pool
936        assert_eq!(pool.len(), 1);
937        assert!(pool.contains(tx2.id()));
938        assert!(!pool.contains(tx1.id()));
939    }
940
941    #[test]
942    #[should_panic(expected = "transaction already included")]
943    fn test_handle_duplicates() {
944        let mut f = MockTransactionFactory::default();
945        let mut pool = PendingPool::new(MockOrdering::default());
946
947        // Add the same transaction twice and ensure it only appears once
948        let tx = f.validated_arc(MockTransaction::eip1559());
949        pool.add_transaction(tx.clone(), 0);
950        assert!(pool.contains(tx.id()));
951        assert_eq!(pool.len(), 1);
952
953        // Attempt to add the same transaction again, which should be ignored
954        pool.add_transaction(tx, 0);
955    }
956
957    #[test]
958    fn test_update_blob_fee() {
959        let mut f = MockTransactionFactory::default();
960        let mut pool = PendingPool::new(MockOrdering::default());
961
962        // Add transactions with varying blob fees
963        let tx1 = f.validated_arc(MockTransaction::eip4844().set_blob_fee(50).clone());
964        let tx2 = f.validated_arc(MockTransaction::eip4844().set_blob_fee(150).clone());
965        pool.add_transaction(tx1.clone(), 0);
966        pool.add_transaction(tx2.clone(), 0);
967
968        // Update the blob fee to a value that causes tx1 to be removed
969        let removed = pool.update_blob_fee(100);
970        assert_eq!(removed.len(), 1);
971        assert_eq!(removed[0].hash(), tx1.hash());
972
973        // Verify that only tx2 remains in the pool
974        assert!(pool.contains(tx2.id()));
975        assert!(!pool.contains(tx1.id()));
976    }
977
978    #[test]
979    fn local_senders_tracking() {
980        let mut f = MockTransactionFactory::default();
981        let mut pool = PendingPool::new(MockOrdering::default());
982
983        // Addresses for simulated senders A, B, C
984        let a = address!("0x000000000000000000000000000000000000000a");
985        let b = address!("0x000000000000000000000000000000000000000b");
986        let c = address!("0x000000000000000000000000000000000000000c");
987
988        // sender A (local) - 11+ transactions (large enough to keep limit exceeded)
989        // sender B (external) - 2 transactions
990        // sender C (external) - 2 transactions
991
992        // Create transaction chains for senders A, B, C
993        let a_txs = MockTransactionSet::sequential_transactions_by_sender(a, 11, TxType::Eip1559);
994        let b_txs = MockTransactionSet::sequential_transactions_by_sender(b, 2, TxType::Eip1559);
995        let c_txs = MockTransactionSet::sequential_transactions_by_sender(c, 2, TxType::Eip1559);
996
997        // create local txs for sender A
998        for tx in a_txs.into_vec() {
999            let final_tx = Arc::new(f.validated_with_origin(crate::TransactionOrigin::Local, tx));
1000
1001            pool.add_transaction(final_tx, 0);
1002        }
1003
1004        // create external txs for senders B and C
1005        let remaining_txs = [b_txs.into_vec(), c_txs.into_vec()].concat();
1006        for tx in remaining_txs {
1007            let final_tx = f.validated_arc(tx);
1008
1009            pool.add_transaction(final_tx, 0);
1010        }
1011
1012        // Sanity check, ensuring everything is consistent.
1013        pool.assert_invariants();
1014
1015        let pool_limit = SubPoolLimit { max_txs: 10, max_size: usize::MAX };
1016        pool.truncate_pool(pool_limit);
1017    }
1018}