reth_transaction_pool/pool/
pending.rs

1use crate::{
2    identifier::{SenderId, TransactionId},
3    pool::{
4        best::{BestTransactions, BestTransactionsWithFees},
5        size::SizeTracker,
6    },
7    Priority, SubPoolLimit, TransactionOrdering, ValidPoolTransaction,
8};
9use rustc_hash::{FxHashMap, FxHashSet};
10use std::{
11    cmp::Ordering,
12    collections::{hash_map::Entry, BTreeMap},
13    ops::Bound::Unbounded,
14    sync::Arc,
15};
16use tokio::sync::broadcast;
17
18/// A pool of validated and gapless transactions that are ready to be executed on the current state
19/// and are waiting to be included in a block.
20///
21/// This pool distinguishes between `independent` transactions and pending transactions. A
22/// transaction is `independent`, if it is in the pending pool, and it has the current on chain
23/// nonce of the sender. Meaning `independent` transactions can be executed right away, other
24/// pending transactions depend on at least one `independent` transaction.
25///
26/// Once an `independent` transaction was executed it *unlocks* the next nonce, if this transaction
27/// is also pending, then this will be moved to the `independent` queue.
28#[derive(Debug, Clone)]
29pub struct PendingPool<T: TransactionOrdering> {
30    /// How to order transactions.
31    ordering: T,
32    /// Keeps track of transactions inserted in the pool.
33    ///
34    /// This way we can determine when transactions were submitted to the pool.
35    submission_id: u64,
36    /// _All_ Transactions that are currently inside the pool grouped by their identifier.
37    by_id: BTreeMap<TransactionId, PendingTransaction<T>>,
38    /// The highest nonce transactions for each sender - like the `independent` set, but the
39    /// highest instead of lowest nonce.
40    highest_nonces: FxHashMap<SenderId, PendingTransaction<T>>,
41    /// Independent transactions that can be included directly and don't require other
42    /// transactions.
43    independent_transactions: FxHashMap<SenderId, PendingTransaction<T>>,
44    /// Keeps track of the size of this pool.
45    ///
46    /// See also [`reth_primitives_traits::InMemorySize::size`].
47    size_of: SizeTracker,
48    /// Used to broadcast new transactions that have been added to the `PendingPool` to existing
49    /// `static_files` of this pool.
50    new_transaction_notifier: broadcast::Sender<PendingTransaction<T>>,
51}
52
53// === impl PendingPool ===
54
55impl<T: TransactionOrdering> PendingPool<T> {
56    /// Create a new pending pool instance.
57    pub fn new(ordering: T) -> Self {
58        Self::with_buffer(ordering, 200)
59    }
60
61    /// Create a new pool instance with the given buffer capacity.
62    pub fn with_buffer(ordering: T, buffer_capacity: usize) -> Self {
63        let (new_transaction_notifier, _) = broadcast::channel(buffer_capacity);
64        Self {
65            ordering,
66            submission_id: 0,
67            by_id: Default::default(),
68            independent_transactions: Default::default(),
69            highest_nonces: Default::default(),
70            size_of: Default::default(),
71            new_transaction_notifier,
72        }
73    }
74
75    /// Clear all transactions from the pool without resetting other values.
76    /// Used for atomic reordering during basefee update.
77    ///
78    /// # Returns
79    ///
80    /// Returns all transactions by id.
81    fn clear_transactions(&mut self) -> BTreeMap<TransactionId, PendingTransaction<T>> {
82        self.independent_transactions.clear();
83        self.highest_nonces.clear();
84        self.size_of.reset();
85        std::mem::take(&mut self.by_id)
86    }
87
88    /// Returns an iterator over all transactions that are _currently_ ready.
89    ///
90    /// 1. The iterator _always_ returns transactions in order: it never returns a transaction with
91    ///    an unsatisfied dependency and only returns them if dependency transaction were yielded
92    ///    previously. In other words: the nonces of transactions with the same sender will _always_
93    ///    increase by exactly 1.
94    ///
95    /// The order of transactions which satisfy (1.) is determined by their computed priority: a
96    /// transaction with a higher priority is returned before a transaction with a lower priority.
97    ///
98    /// If two transactions have the same priority score, then the transactions which spent more
99    /// time in pool (were added earlier) are returned first.
100    ///
101    /// NOTE: while this iterator returns transaction that pool considers valid at this point, they
102    /// could potentially become invalid at point of execution. Therefore, this iterator
103    /// provides a way to mark transactions that the consumer of this iterator considers invalid. In
104    /// which case the transaction's subgraph is also automatically marked invalid, See (1.).
105    /// Invalid transactions are skipped.
106    pub fn best(&self) -> BestTransactions<T> {
107        BestTransactions {
108            all: self.by_id.clone(),
109            independent: self.independent_transactions.values().cloned().collect(),
110            invalid: Default::default(),
111            new_transaction_receiver: Some(self.new_transaction_notifier.subscribe()),
112            last_priority: None,
113            skip_blobs: false,
114        }
115    }
116
117    /// Same as `best` but only returns transactions that satisfy the given basefee and blobfee.
118    pub(crate) fn best_with_basefee_and_blobfee(
119        &self,
120        base_fee: u64,
121        base_fee_per_blob_gas: u64,
122    ) -> BestTransactionsWithFees<T> {
123        BestTransactionsWithFees { best: self.best(), base_fee, base_fee_per_blob_gas }
124    }
125
126    /// Same as `best` but also includes the given unlocked transactions.
127    ///
128    /// This mimics the [`Self::add_transaction`] method, but does not insert the transactions into
129    /// pool but only into the returned iterator.
130    ///
131    /// Note: this does not insert the unlocked transactions into the pool.
132    ///
133    /// # Panics
134    ///
135    /// if the transaction is already included
136    pub(crate) fn best_with_unlocked_and_attributes(
137        &self,
138        unlocked: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
139        base_fee: u64,
140        base_fee_per_blob_gas: u64,
141    ) -> BestTransactionsWithFees<T> {
142        let mut best = self.best();
143        let mut submission_id = self.submission_id;
144        for tx in unlocked {
145            submission_id += 1;
146            debug_assert!(!best.all.contains_key(tx.id()), "transaction already included");
147            let priority = self.ordering.priority(&tx.transaction, base_fee);
148            let tx_id = *tx.id();
149            let transaction = PendingTransaction { submission_id, transaction: tx, priority };
150            if best.ancestor(&tx_id).is_none() {
151                best.independent.insert(transaction.clone());
152            }
153            best.all.insert(tx_id, transaction);
154        }
155
156        BestTransactionsWithFees { best, base_fee, base_fee_per_blob_gas }
157    }
158
159    /// Returns an iterator over all transactions in the pool
160    pub(crate) fn all(
161        &self,
162    ) -> impl ExactSizeIterator<Item = Arc<ValidPoolTransaction<T::Transaction>>> + '_ {
163        self.by_id.values().map(|tx| tx.transaction.clone())
164    }
165
166    /// Updates the pool with the new blob fee. Removes
167    /// from the subpool all transactions and their dependents that no longer satisfy the given
168    /// blob fee (`tx.max_blob_fee < blob_fee`).
169    ///
170    /// Note: the transactions are not returned in a particular order.
171    ///
172    /// # Returns
173    ///
174    /// Removed transactions that no longer satisfy the blob fee.
175    pub(crate) fn update_blob_fee(
176        &mut self,
177        blob_fee: u128,
178    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
179        // Create a collection for removed transactions.
180        let mut removed = Vec::new();
181
182        // Drain and iterate over all transactions.
183        let mut transactions_iter = self.clear_transactions().into_iter().peekable();
184        while let Some((id, tx)) = transactions_iter.next() {
185            if tx.transaction.is_eip4844() && tx.transaction.max_fee_per_blob_gas() < Some(blob_fee)
186            {
187                // Add this tx to the removed collection since it no longer satisfies the blob fee
188                // condition. Decrease the total pool size.
189                removed.push(Arc::clone(&tx.transaction));
190
191                // Remove all dependent transactions.
192                'this: while let Some((next_id, next_tx)) = transactions_iter.peek() {
193                    if next_id.sender != id.sender {
194                        break 'this
195                    }
196                    removed.push(Arc::clone(&next_tx.transaction));
197                    transactions_iter.next();
198                }
199            } else {
200                self.size_of += tx.transaction.size();
201                self.update_independents_and_highest_nonces(&tx);
202                self.by_id.insert(id, tx);
203            }
204        }
205
206        removed
207    }
208
209    /// Updates the pool with the new base fee. Reorders transactions by new priorities. Removes
210    /// from the subpool all transactions and their dependents that no longer satisfy the given
211    /// base fee (`tx.fee < base_fee`).
212    ///
213    /// Note: the transactions are not returned in a particular order.
214    ///
215    /// # Returns
216    ///
217    /// Removed transactions that no longer satisfy the base fee.
218    pub(crate) fn update_base_fee(
219        &mut self,
220        base_fee: u64,
221    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
222        // Create a collection for removed transactions.
223        let mut removed = Vec::new();
224
225        // Drain and iterate over all transactions.
226        let mut transactions_iter = self.clear_transactions().into_iter().peekable();
227        while let Some((id, mut tx)) = transactions_iter.next() {
228            if tx.transaction.max_fee_per_gas() < base_fee as u128 {
229                // Add this tx to the removed collection since it no longer satisfies the base fee
230                // condition. Decrease the total pool size.
231                removed.push(Arc::clone(&tx.transaction));
232
233                // Remove all dependent transactions.
234                'this: while let Some((next_id, next_tx)) = transactions_iter.peek() {
235                    if next_id.sender != id.sender {
236                        break 'this
237                    }
238                    removed.push(Arc::clone(&next_tx.transaction));
239                    transactions_iter.next();
240                }
241            } else {
242                // Re-insert the transaction with new priority.
243                tx.priority = self.ordering.priority(&tx.transaction.transaction, base_fee);
244
245                self.size_of += tx.transaction.size();
246                self.update_independents_and_highest_nonces(&tx);
247                self.by_id.insert(id, tx);
248            }
249        }
250
251        removed
252    }
253
254    /// Updates the independent transaction and highest nonces set, assuming the given transaction
255    /// is being _added_ to the pool.
256    fn update_independents_and_highest_nonces(&mut self, tx: &PendingTransaction<T>) {
257        match self.highest_nonces.entry(tx.transaction.sender_id()) {
258            Entry::Occupied(mut entry) => {
259                if entry.get().transaction.nonce() < tx.transaction.nonce() {
260                    *entry.get_mut() = tx.clone();
261                }
262            }
263            Entry::Vacant(entry) => {
264                entry.insert(tx.clone());
265            }
266        }
267        match self.independent_transactions.entry(tx.transaction.sender_id()) {
268            Entry::Occupied(mut entry) => {
269                if entry.get().transaction.nonce() > tx.transaction.nonce() {
270                    *entry.get_mut() = tx.clone();
271                }
272            }
273            Entry::Vacant(entry) => {
274                entry.insert(tx.clone());
275            }
276        }
277    }
278
279    /// Returns the ancestor the given transaction, the transaction with `nonce - 1`.
280    ///
281    /// Note: for a transaction with nonce higher than the current on chain nonce this will always
282    /// return an ancestor since all transaction in this pool are gapless.
283    fn ancestor(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
284        self.get(&id.unchecked_ancestor()?)
285    }
286
287    /// Adds a new transactions to the pending queue.
288    ///
289    /// # Panics
290    ///
291    /// if the transaction is already included
292    pub fn add_transaction(
293        &mut self,
294        tx: Arc<ValidPoolTransaction<T::Transaction>>,
295        base_fee: u64,
296    ) {
297        debug_assert!(
298            !self.contains(tx.id()),
299            "transaction already included {:?}",
300            self.get(tx.id()).unwrap().transaction
301        );
302
303        // keep track of size
304        self.size_of += tx.size();
305
306        let tx_id = *tx.id();
307
308        let submission_id = self.next_id();
309        let priority = self.ordering.priority(&tx.transaction, base_fee);
310        let tx = PendingTransaction { submission_id, transaction: tx, priority };
311
312        self.update_independents_and_highest_nonces(&tx);
313
314        // send the new transaction to any existing pendingpool static file iterators
315        if self.new_transaction_notifier.receiver_count() > 0 {
316            let _ = self.new_transaction_notifier.send(tx.clone());
317        }
318
319        self.by_id.insert(tx_id, tx);
320    }
321
322    /// Removes the transaction from the pool.
323    ///
324    /// Note: If the transaction has a descendant transaction
325    /// it will advance it to the best queue.
326    pub(crate) fn remove_transaction(
327        &mut self,
328        id: &TransactionId,
329    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
330        if let Some(lowest) = self.independent_transactions.get(&id.sender) {
331            if lowest.transaction.nonce() == id.nonce {
332                self.independent_transactions.remove(&id.sender);
333                // mark the next as independent if it exists
334                if let Some(unlocked) = self.get(&id.descendant()) {
335                    self.independent_transactions.insert(id.sender, unlocked.clone());
336                }
337            }
338        }
339
340        let tx = self.by_id.remove(id)?;
341        self.size_of -= tx.transaction.size();
342
343        if let Some(highest) = self.highest_nonces.get(&id.sender) {
344            if highest.transaction.nonce() == id.nonce {
345                self.highest_nonces.remove(&id.sender);
346            }
347            if let Some(ancestor) = self.ancestor(id) {
348                self.highest_nonces.insert(id.sender, ancestor.clone());
349            }
350        }
351        Some(tx.transaction)
352    }
353
354    const fn next_id(&mut self) -> u64 {
355        let id = self.submission_id;
356        self.submission_id = self.submission_id.wrapping_add(1);
357        id
358    }
359
360    /// Traverses the pool, starting at the highest nonce set, removing the transactions which
361    /// would put the pool under the specified limits.
362    ///
363    /// This attempts to remove transactions by roughly the same amount for each sender. This is
364    /// done by removing the highest-nonce transactions for each sender.
365    ///
366    /// If the `remove_locals` flag is unset, transactions will be removed per-sender until a
367    /// local transaction is the highest nonce transaction for that sender. If all senders have a
368    /// local highest-nonce transaction, the pool will not be truncated further.
369    ///
370    /// Otherwise, if the `remove_locals` flag is set, transactions will be removed per-sender
371    /// until the pool is under the given limits.
372    ///
373    /// Any removed transactions will be added to the `end_removed` vector.
374    pub fn remove_to_limit(
375        &mut self,
376        limit: &SubPoolLimit,
377        remove_locals: bool,
378        end_removed: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
379    ) {
380        // This serves as a termination condition for the loop - it represents the number of
381        // _valid_ unique senders that might have descendants in the pool.
382        //
383        // If `remove_locals` is false, a value of zero means that there are no non-local txs in the
384        // pool that can be removed.
385        //
386        // If `remove_locals` is true, a value of zero means that there are no txs in the pool that
387        // can be removed.
388        let mut non_local_senders = self.highest_nonces.len();
389
390        // keeps track of unique senders from previous iterations, to understand how many unique
391        // senders were removed in the last iteration
392        let mut unique_senders = self.highest_nonces.len();
393
394        // keeps track of which senders we've marked as local
395        let mut local_senders = FxHashSet::default();
396
397        // keep track of transactions to remove and how many have been removed so far
398        let original_length = self.len();
399        let mut removed = Vec::new();
400        let mut total_removed = 0;
401
402        // track total `size` of transactions to remove
403        let original_size = self.size();
404        let mut total_size = 0;
405
406        loop {
407            // check how many unique senders were removed last iteration
408            let unique_removed = unique_senders - self.highest_nonces.len();
409
410            // the new number of unique senders
411            unique_senders = self.highest_nonces.len();
412            non_local_senders -= unique_removed;
413
414            // we can reuse the temp array
415            removed.clear();
416
417            // we prefer removing transactions with lower ordering
418            let mut worst_transactions = self.highest_nonces.values().collect::<Vec<_>>();
419            worst_transactions.sort();
420
421            // loop through the highest nonces set, removing transactions until we reach the limit
422            for tx in worst_transactions {
423                // return early if the pool is under limits
424                if !limit.is_exceeded(original_length - total_removed, original_size - total_size) ||
425                    non_local_senders == 0
426                {
427                    // need to remove remaining transactions before exiting
428                    for id in &removed {
429                        if let Some(tx) = self.remove_transaction(id) {
430                            end_removed.push(tx);
431                        }
432                    }
433
434                    return
435                }
436
437                if !remove_locals && tx.transaction.is_local() {
438                    let sender_id = tx.transaction.sender_id();
439                    if local_senders.insert(sender_id) {
440                        non_local_senders -= 1;
441                    }
442                    continue
443                }
444
445                total_size += tx.transaction.size();
446                total_removed += 1;
447                removed.push(*tx.transaction.id());
448            }
449
450            // remove the transactions from this iteration
451            for id in &removed {
452                if let Some(tx) = self.remove_transaction(id) {
453                    end_removed.push(tx);
454                }
455            }
456
457            // return if either the pool is under limits or there are no more _eligible_
458            // transactions to remove
459            if !self.exceeds(limit) || non_local_senders == 0 {
460                return
461            }
462        }
463    }
464
465    /// Truncates the pool to the given [`SubPoolLimit`], removing transactions until the subpool
466    /// limits are met.
467    ///
468    /// This attempts to remove transactions by roughly the same amount for each sender. For more
469    /// information on this exact process see docs for
470    /// [`remove_to_limit`](PendingPool::remove_to_limit).
471    ///
472    /// This first truncates all of the non-local transactions in the pool. If the subpool is still
473    /// not under the limit, this truncates the entire pool, including non-local transactions. The
474    /// removed transactions are returned.
475    pub fn truncate_pool(
476        &mut self,
477        limit: SubPoolLimit,
478    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
479        let mut removed = Vec::new();
480        // return early if the pool is already under the limits
481        if !self.exceeds(&limit) {
482            return removed
483        }
484
485        // first truncate only non-local transactions, returning if the pool end up under the limit
486        self.remove_to_limit(&limit, false, &mut removed);
487        if !self.exceeds(&limit) {
488            return removed
489        }
490
491        // now repeat for local transactions, since local transactions must be removed now for the
492        // pool to be under the limit
493        self.remove_to_limit(&limit, true, &mut removed);
494
495        removed
496    }
497
498    /// Returns true if the pool exceeds the given limit
499    #[inline]
500    pub(crate) fn exceeds(&self, limit: &SubPoolLimit) -> bool {
501        limit.is_exceeded(self.len(), self.size())
502    }
503
504    /// The reported size of all transactions in this pool.
505    pub(crate) fn size(&self) -> usize {
506        self.size_of.into()
507    }
508
509    /// Number of transactions in the entire pool
510    pub(crate) fn len(&self) -> usize {
511        self.by_id.len()
512    }
513
514    /// Whether the pool is empty
515    #[cfg(test)]
516    pub(crate) fn is_empty(&self) -> bool {
517        self.by_id.is_empty()
518    }
519
520    /// Returns `true` if the transaction with the given id is already included in this pool.
521    pub(crate) fn contains(&self, id: &TransactionId) -> bool {
522        self.by_id.contains_key(id)
523    }
524
525    /// Get transactions by sender
526    pub(crate) fn get_txs_by_sender(&self, sender: SenderId) -> Vec<TransactionId> {
527        self.iter_txs_by_sender(sender).copied().collect()
528    }
529
530    /// Returns an iterator over all transaction with the sender id
531    pub(crate) fn iter_txs_by_sender(
532        &self,
533        sender: SenderId,
534    ) -> impl Iterator<Item = &TransactionId> + '_ {
535        self.by_id
536            .range((sender.start_bound(), Unbounded))
537            .take_while(move |(other, _)| sender == other.sender)
538            .map(|(tx_id, _)| tx_id)
539    }
540
541    /// Retrieves a transaction with the given ID from the pool, if it exists.
542    fn get(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
543        self.by_id.get(id)
544    }
545
546    /// Returns a reference to the independent transactions in the pool
547    #[cfg(test)]
548    pub(crate) const fn independent(&self) -> &FxHashMap<SenderId, PendingTransaction<T>> {
549        &self.independent_transactions
550    }
551
552    /// Asserts that the bijection between `by_id` and `all` is valid.
553    #[cfg(any(test, feature = "test-utils"))]
554    pub(crate) fn assert_invariants(&self) {
555        assert!(
556            self.independent_transactions.len() <= self.by_id.len(),
557            "independent.len() > all.len()"
558        );
559        assert!(
560            self.highest_nonces.len() <= self.by_id.len(),
561            "independent_descendants.len() > all.len()"
562        );
563        assert_eq!(
564            self.highest_nonces.len(),
565            self.independent_transactions.len(),
566            "independent.len() = independent_descendants.len()"
567        );
568    }
569}
570
571/// A transaction that is ready to be included in a block.
572#[derive(Debug)]
573pub(crate) struct PendingTransaction<T: TransactionOrdering> {
574    /// Identifier that tags when transaction was submitted in the pool.
575    pub(crate) submission_id: u64,
576    /// Actual transaction.
577    pub(crate) transaction: Arc<ValidPoolTransaction<T::Transaction>>,
578    /// The priority value assigned by the used `Ordering` function.
579    pub(crate) priority: Priority<T::PriorityValue>,
580}
581
582impl<T: TransactionOrdering> PendingTransaction<T> {
583    /// The next transaction of the sender: `nonce + 1`
584    pub(crate) fn unlocks(&self) -> TransactionId {
585        self.transaction.transaction_id.descendant()
586    }
587}
588
589impl<T: TransactionOrdering> Clone for PendingTransaction<T> {
590    fn clone(&self) -> Self {
591        Self {
592            submission_id: self.submission_id,
593            transaction: Arc::clone(&self.transaction),
594            priority: self.priority.clone(),
595        }
596    }
597}
598
599impl<T: TransactionOrdering> Eq for PendingTransaction<T> {}
600
601impl<T: TransactionOrdering> PartialEq<Self> for PendingTransaction<T> {
602    fn eq(&self, other: &Self) -> bool {
603        self.cmp(other) == Ordering::Equal
604    }
605}
606
607impl<T: TransactionOrdering> PartialOrd<Self> for PendingTransaction<T> {
608    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
609        Some(self.cmp(other))
610    }
611}
612
613impl<T: TransactionOrdering> Ord for PendingTransaction<T> {
614    fn cmp(&self, other: &Self) -> Ordering {
615        // This compares by `priority` and only if two tx have the exact same priority this compares
616        // the unique `submission_id`. This ensures that transactions with same priority are not
617        // equal, so they're not replaced in the set
618        self.priority
619            .cmp(&other.priority)
620            .then_with(|| other.submission_id.cmp(&self.submission_id))
621    }
622}
623
624#[cfg(test)]
625mod tests {
626    use super::*;
627    use crate::{
628        test_utils::{MockOrdering, MockTransaction, MockTransactionFactory, MockTransactionSet},
629        PoolTransaction,
630    };
631    use alloy_consensus::{Transaction, TxType};
632    use alloy_primitives::address;
633    use std::collections::HashSet;
634
635    #[test]
636    fn test_enforce_basefee() {
637        let mut f = MockTransactionFactory::default();
638        let mut pool = PendingPool::new(MockOrdering::default());
639        let tx = f.validated_arc(MockTransaction::eip1559().inc_price());
640        pool.add_transaction(tx.clone(), 0);
641
642        assert!(pool.contains(tx.id()));
643        assert_eq!(pool.len(), 1);
644
645        let removed = pool.update_base_fee(0);
646        assert!(removed.is_empty());
647
648        let removed = pool.update_base_fee((tx.max_fee_per_gas() + 1) as u64);
649        assert_eq!(removed.len(), 1);
650        assert!(pool.is_empty());
651    }
652
653    #[test]
654    fn test_enforce_basefee_descendant() {
655        let mut f = MockTransactionFactory::default();
656        let mut pool = PendingPool::new(MockOrdering::default());
657        let t = MockTransaction::eip1559().inc_price_by(10);
658        let root_tx = f.validated_arc(t.clone());
659        pool.add_transaction(root_tx.clone(), 0);
660
661        let descendant_tx = f.validated_arc(t.inc_nonce().decr_price());
662        pool.add_transaction(descendant_tx.clone(), 0);
663
664        assert!(pool.contains(root_tx.id()));
665        assert!(pool.contains(descendant_tx.id()));
666        assert_eq!(pool.len(), 2);
667
668        assert_eq!(pool.independent_transactions.len(), 1);
669        assert_eq!(pool.highest_nonces.len(), 1);
670
671        let removed = pool.update_base_fee(0);
672        assert!(removed.is_empty());
673
674        // two dependent tx in the pool with decreasing fee
675
676        {
677            let mut pool2 = pool.clone();
678            let removed = pool2.update_base_fee((descendant_tx.max_fee_per_gas() + 1) as u64);
679            assert_eq!(removed.len(), 1);
680            assert_eq!(pool2.len(), 1);
681            // descendant got popped
682            assert!(pool2.contains(root_tx.id()));
683            assert!(!pool2.contains(descendant_tx.id()));
684        }
685
686        // remove root transaction via fee
687        let removed = pool.update_base_fee((root_tx.max_fee_per_gas() + 1) as u64);
688        assert_eq!(removed.len(), 2);
689        assert!(pool.is_empty());
690        pool.assert_invariants();
691    }
692
693    #[test]
694    fn evict_worst() {
695        let mut f = MockTransactionFactory::default();
696        let mut pool = PendingPool::new(MockOrdering::default());
697
698        let t = MockTransaction::eip1559();
699        pool.add_transaction(f.validated_arc(t.clone()), 0);
700
701        let t2 = MockTransaction::eip1559().inc_price_by(10);
702        pool.add_transaction(f.validated_arc(t2), 0);
703
704        // First transaction should be evicted.
705        assert_eq!(
706            pool.highest_nonces.values().min().map(|tx| *tx.transaction.hash()),
707            Some(*t.hash())
708        );
709
710        // truncate pool with max size = 1, ensure it's the same transaction
711        let removed = pool.truncate_pool(SubPoolLimit { max_txs: 1, max_size: usize::MAX });
712        assert_eq!(removed.len(), 1);
713        assert_eq!(removed[0].hash(), t.hash());
714    }
715
716    #[test]
717    fn correct_independent_descendants() {
718        // this test ensures that we set the right highest nonces set for each sender
719        let mut f = MockTransactionFactory::default();
720        let mut pool = PendingPool::new(MockOrdering::default());
721
722        let a_sender = address!("0x000000000000000000000000000000000000000a");
723        let b_sender = address!("0x000000000000000000000000000000000000000b");
724        let c_sender = address!("0x000000000000000000000000000000000000000c");
725        let d_sender = address!("0x000000000000000000000000000000000000000d");
726
727        // create a chain of transactions by sender A, B, C
728        let mut tx_set = MockTransactionSet::dependent(a_sender, 0, 4, TxType::Eip1559);
729        let a = tx_set.clone().into_vec();
730
731        let b = MockTransactionSet::dependent(b_sender, 0, 3, TxType::Eip1559).into_vec();
732        tx_set.extend(b.clone());
733
734        // C has the same number of txs as B
735        let c = MockTransactionSet::dependent(c_sender, 0, 3, TxType::Eip1559).into_vec();
736        tx_set.extend(c.clone());
737
738        let d = MockTransactionSet::dependent(d_sender, 0, 1, TxType::Eip1559).into_vec();
739        tx_set.extend(d.clone());
740
741        // add all the transactions to the pool
742        let all_txs = tx_set.into_vec();
743        for tx in all_txs {
744            pool.add_transaction(f.validated_arc(tx), 0);
745        }
746
747        pool.assert_invariants();
748
749        // the independent set is the roots of each of these tx chains, these are the highest
750        // nonces for each sender
751        let expected_highest_nonces = vec![d[0].clone(), c[2].clone(), b[2].clone(), a[3].clone()]
752            .iter()
753            .map(|tx| (tx.sender(), tx.nonce()))
754            .collect::<HashSet<_>>();
755        let actual_highest_nonces = pool
756            .highest_nonces
757            .values()
758            .map(|tx| (tx.transaction.sender(), tx.transaction.nonce()))
759            .collect::<HashSet<_>>();
760        assert_eq!(expected_highest_nonces, actual_highest_nonces);
761        pool.assert_invariants();
762    }
763
764    #[test]
765    fn truncate_by_sender() {
766        // This test ensures that transactions are removed from the pending pool by sender.
767        let mut f = MockTransactionFactory::default();
768        let mut pool = PendingPool::new(MockOrdering::default());
769
770        // Addresses for simulated senders A, B, C, and D.
771        let a = address!("0x000000000000000000000000000000000000000a");
772        let b = address!("0x000000000000000000000000000000000000000b");
773        let c = address!("0x000000000000000000000000000000000000000c");
774        let d = address!("0x000000000000000000000000000000000000000d");
775
776        // Create transaction chains for senders A, B, C, and D.
777        let a_txs = MockTransactionSet::sequential_transactions_by_sender(a, 4, TxType::Eip1559);
778        let b_txs = MockTransactionSet::sequential_transactions_by_sender(b, 3, TxType::Eip1559);
779        let c_txs = MockTransactionSet::sequential_transactions_by_sender(c, 3, TxType::Eip1559);
780        let d_txs = MockTransactionSet::sequential_transactions_by_sender(d, 1, TxType::Eip1559);
781
782        // Set up expected pending transactions.
783        let expected_pending = vec![
784            a_txs.transactions[0].clone(),
785            b_txs.transactions[0].clone(),
786            c_txs.transactions[0].clone(),
787            a_txs.transactions[1].clone(),
788        ]
789        .into_iter()
790        .map(|tx| (tx.sender(), tx.nonce()))
791        .collect::<HashSet<_>>();
792
793        // Set up expected removed transactions.
794        let expected_removed = vec![
795            d_txs.transactions[0].clone(),
796            c_txs.transactions[2].clone(),
797            b_txs.transactions[2].clone(),
798            a_txs.transactions[3].clone(),
799            c_txs.transactions[1].clone(),
800            b_txs.transactions[1].clone(),
801            a_txs.transactions[2].clone(),
802        ]
803        .into_iter()
804        .map(|tx| (tx.sender(), tx.nonce()))
805        .collect::<HashSet<_>>();
806
807        // Consolidate all transactions into a single vector.
808        let all_txs =
809            [a_txs.into_vec(), b_txs.into_vec(), c_txs.into_vec(), d_txs.into_vec()].concat();
810
811        // Add all the transactions to the pool.
812        for tx in all_txs {
813            pool.add_transaction(f.validated_arc(tx), 0);
814        }
815
816        // Sanity check, ensuring everything is consistent.
817        pool.assert_invariants();
818
819        // Define the maximum total transactions to be 4, removing transactions for each sender.
820        // Expected order of removal:
821        // * d1, c3, b3, a4
822        // * c2, b2, a3
823        //
824        // Remaining transactions:
825        // * a1, a2
826        // * b1
827        // * c1
828        let pool_limit = SubPoolLimit { max_txs: 4, max_size: usize::MAX };
829
830        // Truncate the pool based on the defined limit.
831        let removed = pool.truncate_pool(pool_limit);
832        pool.assert_invariants();
833        assert_eq!(removed.len(), expected_removed.len());
834
835        // Get the set of removed transactions and compare with the expected set.
836        let removed =
837            removed.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::<HashSet<_>>();
838        assert_eq!(removed, expected_removed);
839
840        // Retrieve the current pending transactions after truncation.
841        let pending = pool.all().collect::<Vec<_>>();
842        assert_eq!(pending.len(), expected_pending.len());
843
844        // Get the set of pending transactions and compare with the expected set.
845        let pending =
846            pending.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::<HashSet<_>>();
847        assert_eq!(pending, expected_pending);
848    }
849
850    // <https://github.com/paradigmxyz/reth/issues/12340>
851    #[test]
852    fn test_eligible_updates_promoted() {
853        let mut pool = PendingPool::new(MockOrdering::default());
854        let mut f = MockTransactionFactory::default();
855
856        let num_senders = 10;
857
858        let first_txs: Vec<_> = (0..num_senders) //
859            .map(|_| MockTransaction::eip1559())
860            .collect();
861        let second_txs: Vec<_> =
862            first_txs.iter().map(|tx| tx.clone().rng_hash().inc_nonce()).collect();
863
864        for tx in first_txs {
865            let valid_tx = f.validated(tx);
866            pool.add_transaction(Arc::new(valid_tx), 0);
867        }
868
869        let mut best = pool.best();
870
871        for _ in 0..num_senders {
872            if let Some(tx) = best.next() {
873                assert_eq!(tx.nonce(), 0);
874            } else {
875                panic!("cannot read one of first_txs");
876            }
877        }
878
879        for tx in second_txs {
880            let valid_tx = f.validated(tx);
881            pool.add_transaction(Arc::new(valid_tx), 0);
882        }
883
884        for _ in 0..num_senders {
885            if let Some(tx) = best.next() {
886                assert_eq!(tx.nonce(), 1);
887            } else {
888                panic!("cannot read one of second_txs");
889            }
890        }
891    }
892
893    #[test]
894    fn test_empty_pool_behavior() {
895        let mut pool = PendingPool::<MockOrdering>::new(MockOrdering::default());
896
897        // Ensure the pool is empty
898        assert!(pool.is_empty());
899        assert_eq!(pool.len(), 0);
900        assert_eq!(pool.size(), 0);
901
902        // Verify that attempting to truncate an empty pool does not panic and returns an empty vec
903        let removed = pool.truncate_pool(SubPoolLimit { max_txs: 10, max_size: 1000 });
904        assert!(removed.is_empty());
905
906        // Verify that retrieving transactions from an empty pool yields nothing
907        let all_txs: Vec<_> = pool.all().collect();
908        assert!(all_txs.is_empty());
909    }
910
911    #[test]
912    fn test_add_remove_transaction() {
913        let mut f = MockTransactionFactory::default();
914        let mut pool = PendingPool::new(MockOrdering::default());
915
916        // Add a transaction and check if it's in the pool
917        let tx = f.validated_arc(MockTransaction::eip1559());
918        pool.add_transaction(tx.clone(), 0);
919        assert!(pool.contains(tx.id()));
920        assert_eq!(pool.len(), 1);
921
922        // Remove the transaction and ensure it's no longer in the pool
923        let removed_tx = pool.remove_transaction(tx.id()).unwrap();
924        assert_eq!(removed_tx.id(), tx.id());
925        assert!(!pool.contains(tx.id()));
926        assert_eq!(pool.len(), 0);
927    }
928
929    #[test]
930    fn test_reorder_on_basefee_update() {
931        let mut f = MockTransactionFactory::default();
932        let mut pool = PendingPool::new(MockOrdering::default());
933
934        // Add two transactions with different fees
935        let tx1 = f.validated_arc(MockTransaction::eip1559().inc_price());
936        let tx2 = f.validated_arc(MockTransaction::eip1559().inc_price_by(20));
937        pool.add_transaction(tx1.clone(), 0);
938        pool.add_transaction(tx2.clone(), 0);
939
940        // Ensure the transactions are in the correct order
941        let mut best = pool.best();
942        assert_eq!(best.next().unwrap().hash(), tx2.hash());
943        assert_eq!(best.next().unwrap().hash(), tx1.hash());
944
945        // Update the base fee to a value higher than tx1's fee, causing it to be removed
946        let removed = pool.update_base_fee((tx1.max_fee_per_gas() + 1) as u64);
947        assert_eq!(removed.len(), 1);
948        assert_eq!(removed[0].hash(), tx1.hash());
949
950        // Verify that only tx2 remains in the pool
951        assert_eq!(pool.len(), 1);
952        assert!(pool.contains(tx2.id()));
953        assert!(!pool.contains(tx1.id()));
954    }
955
956    #[test]
957    #[should_panic(expected = "transaction already included")]
958    fn test_handle_duplicates() {
959        let mut f = MockTransactionFactory::default();
960        let mut pool = PendingPool::new(MockOrdering::default());
961
962        // Add the same transaction twice and ensure it only appears once
963        let tx = f.validated_arc(MockTransaction::eip1559());
964        pool.add_transaction(tx.clone(), 0);
965        assert!(pool.contains(tx.id()));
966        assert_eq!(pool.len(), 1);
967
968        // Attempt to add the same transaction again, which should be ignored
969        pool.add_transaction(tx, 0);
970    }
971
972    #[test]
973    fn test_update_blob_fee() {
974        let mut f = MockTransactionFactory::default();
975        let mut pool = PendingPool::new(MockOrdering::default());
976
977        // Add transactions with varying blob fees
978        let tx1 = f.validated_arc(MockTransaction::eip4844().set_blob_fee(50).clone());
979        let tx2 = f.validated_arc(MockTransaction::eip4844().set_blob_fee(150).clone());
980        pool.add_transaction(tx1.clone(), 0);
981        pool.add_transaction(tx2.clone(), 0);
982
983        // Update the blob fee to a value that causes tx1 to be removed
984        let removed = pool.update_blob_fee(100);
985        assert_eq!(removed.len(), 1);
986        assert_eq!(removed[0].hash(), tx1.hash());
987
988        // Verify that only tx2 remains in the pool
989        assert!(pool.contains(tx2.id()));
990        assert!(!pool.contains(tx1.id()));
991    }
992
993    #[test]
994    fn local_senders_tracking() {
995        let mut f = MockTransactionFactory::default();
996        let mut pool = PendingPool::new(MockOrdering::default());
997
998        // Addresses for simulated senders A, B, C
999        let a = address!("0x000000000000000000000000000000000000000a");
1000        let b = address!("0x000000000000000000000000000000000000000b");
1001        let c = address!("0x000000000000000000000000000000000000000c");
1002
1003        // sender A (local) - 11+ transactions (large enough to keep limit exceeded)
1004        // sender B (external) - 2 transactions
1005        // sender C (external) - 2 transactions
1006
1007        // Create transaction chains for senders A, B, C
1008        let a_txs = MockTransactionSet::sequential_transactions_by_sender(a, 11, TxType::Eip1559);
1009        let b_txs = MockTransactionSet::sequential_transactions_by_sender(b, 2, TxType::Eip1559);
1010        let c_txs = MockTransactionSet::sequential_transactions_by_sender(c, 2, TxType::Eip1559);
1011
1012        // create local txs for sender A
1013        for tx in a_txs.into_vec() {
1014            let final_tx = Arc::new(f.validated_with_origin(crate::TransactionOrigin::Local, tx));
1015
1016            pool.add_transaction(final_tx, 0);
1017        }
1018
1019        // create external txs for senders B and C
1020        let remaining_txs = [b_txs.into_vec(), c_txs.into_vec()].concat();
1021        for tx in remaining_txs {
1022            let final_tx = f.validated_arc(tx);
1023
1024            pool.add_transaction(final_tx, 0);
1025        }
1026
1027        // Sanity check, ensuring everything is consistent.
1028        pool.assert_invariants();
1029
1030        let pool_limit = SubPoolLimit { max_txs: 10, max_size: usize::MAX };
1031        pool.truncate_pool(pool_limit);
1032
1033        let sender_a = f.ids.sender_id(&a).unwrap();
1034        let sender_b = f.ids.sender_id(&b).unwrap();
1035        let sender_c = f.ids.sender_id(&c).unwrap();
1036
1037        assert_eq!(pool.get_txs_by_sender(sender_a).len(), 10);
1038        assert!(pool.get_txs_by_sender(sender_b).is_empty());
1039        assert!(pool.get_txs_by_sender(sender_c).is_empty());
1040    }
1041}