reth_transaction_pool/pool/
pending.rs

1//! Pending transactions
2
3use crate::{
4    identifier::{SenderId, TransactionId},
5    pool::{
6        best::{BestTransactions, BestTransactionsWithFees},
7        size::SizeTracker,
8    },
9    Priority, SubPoolLimit, TransactionOrdering, ValidPoolTransaction,
10};
11use rustc_hash::{FxHashMap, FxHashSet};
12use std::{
13    cmp::Ordering,
14    collections::{hash_map::Entry, BTreeMap},
15    ops::Bound::Unbounded,
16    sync::Arc,
17};
18use tokio::sync::broadcast;
19
20/// A pool of validated and gapless transactions that are ready to be executed on the current state
21/// and are waiting to be included in a block.
22///
23/// This pool distinguishes between `independent` transactions and pending transactions. A
24/// transaction is `independent`, if it is in the pending pool, and it has the current on chain
25/// nonce of the sender. Meaning `independent` transactions can be executed right away, other
26/// pending transactions depend on at least one `independent` transaction.
27///
28/// Once an `independent` transaction was executed it *unlocks* the next nonce, if this transaction
29/// is also pending, then this will be moved to the `independent` queue.
30#[derive(Debug, Clone)]
31pub struct PendingPool<T: TransactionOrdering> {
32    /// How to order transactions.
33    ordering: T,
34    /// Keeps track of transactions inserted in the pool.
35    ///
36    /// This way we can determine when transactions were submitted to the pool.
37    submission_id: u64,
38    /// _All_ Transactions that are currently inside the pool grouped by their identifier.
39    by_id: BTreeMap<TransactionId, PendingTransaction<T>>,
40    /// The highest nonce transactions for each sender - like the `independent` set, but the
41    /// highest instead of lowest nonce.
42    highest_nonces: FxHashMap<SenderId, PendingTransaction<T>>,
43    /// Independent transactions that can be included directly and don't require other
44    /// transactions.
45    independent_transactions: FxHashMap<SenderId, PendingTransaction<T>>,
46    /// Keeps track of the size of this pool.
47    ///
48    /// See also [`reth_primitives_traits::InMemorySize::size`].
49    size_of: SizeTracker,
50    /// Used to broadcast new transactions that have been added to the `PendingPool` to existing
51    /// `static_files` of this pool.
52    new_transaction_notifier: broadcast::Sender<PendingTransaction<T>>,
53}
54
55// === impl PendingPool ===
56
57impl<T: TransactionOrdering> PendingPool<T> {
58    /// Create a new pending pool instance.
59    pub fn new(ordering: T) -> Self {
60        Self::with_buffer(ordering, 200)
61    }
62
63    /// Create a new pool instance with the given buffer capacity.
64    pub fn with_buffer(ordering: T, buffer_capacity: usize) -> Self {
65        let (new_transaction_notifier, _) = broadcast::channel(buffer_capacity);
66        Self {
67            ordering,
68            submission_id: 0,
69            by_id: Default::default(),
70            independent_transactions: Default::default(),
71            highest_nonces: Default::default(),
72            size_of: Default::default(),
73            new_transaction_notifier,
74        }
75    }
76
77    /// Clear all transactions from the pool without resetting other values.
78    /// Used for atomic reordering during basefee update.
79    ///
80    /// # Returns
81    ///
82    /// Returns all transactions by id.
83    fn clear_transactions(&mut self) -> BTreeMap<TransactionId, PendingTransaction<T>> {
84        self.independent_transactions.clear();
85        self.highest_nonces.clear();
86        self.size_of.reset();
87        std::mem::take(&mut self.by_id)
88    }
89
90    /// Returns an iterator over all transactions that are _currently_ ready.
91    ///
92    /// 1. The iterator _always_ returns transactions in order: it never returns a transaction with
93    ///    an unsatisfied dependency and only returns them if dependency transaction were yielded
94    ///    previously. In other words: the nonces of transactions with the same sender will _always_
95    ///    increase by exactly 1.
96    ///
97    /// The order of transactions which satisfy (1.) is determined by their computed priority: a
98    /// transaction with a higher priority is returned before a transaction with a lower priority.
99    ///
100    /// If two transactions have the same priority score, then the transactions which spent more
101    /// time in pool (were added earlier) are returned first.
102    ///
103    /// NOTE: while this iterator returns transaction that pool considers valid at this point, they
104    /// could potentially become invalid at point of execution. Therefore, this iterator
105    /// provides a way to mark transactions that the consumer of this iterator considers invalid. In
106    /// which case the transaction's subgraph is also automatically marked invalid, See (1.).
107    /// Invalid transactions are skipped.
108    pub fn best(&self) -> BestTransactions<T> {
109        BestTransactions {
110            all: self.by_id.clone(),
111            independent: self.independent_transactions.values().cloned().collect(),
112            invalid: Default::default(),
113            new_transaction_receiver: Some(self.new_transaction_notifier.subscribe()),
114            last_priority: None,
115            skip_blobs: false,
116        }
117    }
118
119    /// Same as `best` but only returns transactions that satisfy the given basefee and blobfee.
120    pub(crate) fn best_with_basefee_and_blobfee(
121        &self,
122        base_fee: u64,
123        base_fee_per_blob_gas: u64,
124    ) -> BestTransactionsWithFees<T> {
125        BestTransactionsWithFees { best: self.best(), base_fee, base_fee_per_blob_gas }
126    }
127
128    /// Same as `best` but also includes the given unlocked transactions.
129    ///
130    /// This mimics the [`Self::add_transaction`] method, but does not insert the transactions into
131    /// pool but only into the returned iterator.
132    ///
133    /// Note: this does not insert the unlocked transactions into the pool.
134    ///
135    /// # Panics
136    ///
137    /// if the transaction is already included
138    pub(crate) fn best_with_unlocked_and_attributes(
139        &self,
140        unlocked: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
141        base_fee: u64,
142        base_fee_per_blob_gas: u64,
143    ) -> BestTransactionsWithFees<T> {
144        let mut best = self.best();
145        let mut submission_id = self.submission_id;
146        for tx in unlocked {
147            submission_id += 1;
148            debug_assert!(!best.all.contains_key(tx.id()), "transaction already included");
149            let priority = self.ordering.priority(&tx.transaction, base_fee);
150            let tx_id = *tx.id();
151            let transaction = PendingTransaction { submission_id, transaction: tx, priority };
152            if best.ancestor(&tx_id).is_none() {
153                best.independent.insert(transaction.clone());
154            }
155            best.all.insert(tx_id, transaction);
156        }
157
158        BestTransactionsWithFees { best, base_fee, base_fee_per_blob_gas }
159    }
160
161    /// Returns an iterator over all transactions in the pool
162    pub(crate) fn all(
163        &self,
164    ) -> impl ExactSizeIterator<Item = Arc<ValidPoolTransaction<T::Transaction>>> + '_ {
165        self.by_id.values().map(|tx| tx.transaction.clone())
166    }
167
168    /// Updates the pool with the new blob fee. Removes
169    /// from the subpool all transactions and their dependents that no longer satisfy the given
170    /// blob fee (`tx.max_blob_fee < blob_fee`).
171    ///
172    /// Note: the transactions are not returned in a particular order.
173    ///
174    /// # Returns
175    ///
176    /// Removed transactions that no longer satisfy the blob fee.
177    pub(crate) fn update_blob_fee(
178        &mut self,
179        blob_fee: u128,
180    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
181        // Create a collection for removed transactions.
182        let mut removed = Vec::new();
183
184        // Drain and iterate over all transactions.
185        let mut transactions_iter = self.clear_transactions().into_iter().peekable();
186        while let Some((id, tx)) = transactions_iter.next() {
187            if tx.transaction.is_eip4844() && tx.transaction.max_fee_per_blob_gas() < Some(blob_fee)
188            {
189                // Add this tx to the removed collection since it no longer satisfies the blob fee
190                // condition. Decrease the total pool size.
191                removed.push(Arc::clone(&tx.transaction));
192
193                // Remove all dependent transactions.
194                'this: while let Some((next_id, next_tx)) = transactions_iter.peek() {
195                    if next_id.sender != id.sender {
196                        break 'this
197                    }
198                    removed.push(Arc::clone(&next_tx.transaction));
199                    transactions_iter.next();
200                }
201            } else {
202                self.size_of += tx.transaction.size();
203                self.update_independents_and_highest_nonces(&tx);
204                self.by_id.insert(id, tx);
205            }
206        }
207
208        removed
209    }
210
211    /// Updates the pool with the new base fee. Reorders transactions by new priorities. Removes
212    /// from the subpool all transactions and their dependents that no longer satisfy the given
213    /// base fee (`tx.fee < base_fee`).
214    ///
215    /// Note: the transactions are not returned in a particular order.
216    ///
217    /// # Returns
218    ///
219    /// Removed transactions that no longer satisfy the base fee.
220    pub(crate) fn update_base_fee(
221        &mut self,
222        base_fee: u64,
223    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
224        // Create a collection for removed transactions.
225        let mut removed = Vec::new();
226
227        // Drain and iterate over all transactions.
228        let mut transactions_iter = self.clear_transactions().into_iter().peekable();
229        while let Some((id, mut tx)) = transactions_iter.next() {
230            if tx.transaction.max_fee_per_gas() < base_fee as u128 {
231                // Add this tx to the removed collection since it no longer satisfies the base fee
232                // condition. Decrease the total pool size.
233                removed.push(Arc::clone(&tx.transaction));
234
235                // Remove all dependent transactions.
236                'this: while let Some((next_id, next_tx)) = transactions_iter.peek() {
237                    if next_id.sender != id.sender {
238                        break 'this
239                    }
240                    removed.push(Arc::clone(&next_tx.transaction));
241                    transactions_iter.next();
242                }
243            } else {
244                // Re-insert the transaction with new priority.
245                tx.priority = self.ordering.priority(&tx.transaction.transaction, base_fee);
246
247                self.size_of += tx.transaction.size();
248                self.update_independents_and_highest_nonces(&tx);
249                self.by_id.insert(id, tx);
250            }
251        }
252
253        removed
254    }
255
256    /// Updates the independent transaction and highest nonces set, assuming the given transaction
257    /// is being _added_ to the pool.
258    fn update_independents_and_highest_nonces(&mut self, tx: &PendingTransaction<T>) {
259        match self.highest_nonces.entry(tx.transaction.sender_id()) {
260            Entry::Occupied(mut entry) => {
261                if entry.get().transaction.nonce() < tx.transaction.nonce() {
262                    *entry.get_mut() = tx.clone();
263                }
264            }
265            Entry::Vacant(entry) => {
266                entry.insert(tx.clone());
267            }
268        }
269        match self.independent_transactions.entry(tx.transaction.sender_id()) {
270            Entry::Occupied(mut entry) => {
271                if entry.get().transaction.nonce() > tx.transaction.nonce() {
272                    *entry.get_mut() = tx.clone();
273                }
274            }
275            Entry::Vacant(entry) => {
276                entry.insert(tx.clone());
277            }
278        }
279    }
280
281    /// Adds a new transactions to the pending queue.
282    ///
283    /// # Panics
284    ///
285    /// if the transaction is already included
286    pub fn add_transaction(
287        &mut self,
288        tx: Arc<ValidPoolTransaction<T::Transaction>>,
289        base_fee: u64,
290    ) {
291        debug_assert!(
292            !self.contains(tx.id()),
293            "transaction already included {:?}",
294            self.get(tx.id()).unwrap().transaction
295        );
296
297        // keep track of size
298        self.size_of += tx.size();
299
300        let tx_id = *tx.id();
301
302        let submission_id = self.next_id();
303        let priority = self.ordering.priority(&tx.transaction, base_fee);
304        let tx = PendingTransaction { submission_id, transaction: tx, priority };
305
306        self.update_independents_and_highest_nonces(&tx);
307
308        // send the new transaction to any existing pendingpool static file iterators
309        if self.new_transaction_notifier.receiver_count() > 0 {
310            let _ = self.new_transaction_notifier.send(tx.clone());
311        }
312
313        self.by_id.insert(tx_id, tx);
314    }
315
316    /// Removes the transaction from the pool.
317    ///
318    /// Note: If the transaction has a descendant transaction
319    /// it will advance it to the best queue.
320    pub(crate) fn remove_transaction(
321        &mut self,
322        id: &TransactionId,
323    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
324        if let Some(lowest) = self.independent_transactions.get(&id.sender) &&
325            lowest.transaction.nonce() == id.nonce
326        {
327            self.independent_transactions.remove(&id.sender);
328            // mark the next as independent if it exists
329            if let Some(unlocked) = self.get(&id.descendant()) {
330                self.independent_transactions.insert(id.sender, unlocked.clone());
331            }
332        }
333
334        let tx = self.by_id.remove(id)?;
335        self.size_of -= tx.transaction.size();
336
337        match self.highest_nonces.entry(id.sender) {
338            Entry::Occupied(mut entry) => {
339                if entry.get().transaction.nonce() == id.nonce {
340                    // we just removed the tx with the highest nonce for this sender, find the
341                    // highest remaining tx from that sender
342                    if let Some((_, new_highest)) = self
343                        .by_id
344                        .range((
345                            id.sender.start_bound(),
346                            std::ops::Bound::Included(TransactionId::new(id.sender, u64::MAX)),
347                        ))
348                        .last()
349                    {
350                        // insert the new highest nonce for this sender
351                        entry.insert(new_highest.clone());
352                    } else {
353                        entry.remove();
354                    }
355                }
356            }
357            Entry::Vacant(_) => {
358                debug_assert!(
359                    false,
360                    "removed transaction without a tracked highest nonce {:?}",
361                    id
362                );
363            }
364        }
365
366        Some(tx.transaction)
367    }
368
369    const fn next_id(&mut self) -> u64 {
370        let id = self.submission_id;
371        self.submission_id = self.submission_id.wrapping_add(1);
372        id
373    }
374
375    /// Traverses the pool, starting at the highest nonce set, removing the transactions which
376    /// would put the pool under the specified limits.
377    ///
378    /// This attempts to remove transactions by roughly the same amount for each sender. This is
379    /// done by removing the highest-nonce transactions for each sender.
380    ///
381    /// If the `remove_locals` flag is unset, transactions will be removed per-sender until a
382    /// local transaction is the highest nonce transaction for that sender. If all senders have a
383    /// local highest-nonce transaction, the pool will not be truncated further.
384    ///
385    /// Otherwise, if the `remove_locals` flag is set, transactions will be removed per-sender
386    /// until the pool is under the given limits.
387    ///
388    /// Any removed transactions will be added to the `end_removed` vector.
389    pub fn remove_to_limit(
390        &mut self,
391        limit: &SubPoolLimit,
392        remove_locals: bool,
393        end_removed: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
394    ) {
395        // This serves as a termination condition for the loop - it represents the number of
396        // _valid_ unique senders that might have descendants in the pool.
397        //
398        // If `remove_locals` is false, a value of zero means that there are no non-local txs in the
399        // pool that can be removed.
400        //
401        // If `remove_locals` is true, a value of zero means that there are no txs in the pool that
402        // can be removed.
403        let mut non_local_senders = self.highest_nonces.len();
404
405        // keeps track of unique senders from previous iterations, to understand how many unique
406        // senders were removed in the last iteration
407        let mut unique_senders = self.highest_nonces.len();
408
409        // keeps track of which senders we've marked as local
410        let mut local_senders = FxHashSet::default();
411
412        // keep track of transactions to remove and how many have been removed so far
413        let original_length = self.len();
414        let mut removed = Vec::new();
415        let mut total_removed = 0;
416
417        // track total `size` of transactions to remove
418        let original_size = self.size();
419        let mut total_size = 0;
420
421        loop {
422            // check how many unique senders were removed last iteration
423            let unique_removed = unique_senders - self.highest_nonces.len();
424
425            // the new number of unique senders
426            unique_senders = self.highest_nonces.len();
427            non_local_senders -= unique_removed;
428
429            // we can reuse the temp array
430            removed.clear();
431
432            // we prefer removing transactions with lower ordering
433            let mut worst_transactions = self.highest_nonces.values().collect::<Vec<_>>();
434            worst_transactions.sort();
435
436            // loop through the highest nonces set, removing transactions until we reach the limit
437            for tx in worst_transactions {
438                // return early if the pool is under limits
439                if !limit.is_exceeded(original_length - total_removed, original_size - total_size) ||
440                    non_local_senders == 0
441                {
442                    // need to remove remaining transactions before exiting
443                    for id in &removed {
444                        if let Some(tx) = self.remove_transaction(id) {
445                            end_removed.push(tx);
446                        }
447                    }
448
449                    return
450                }
451
452                if !remove_locals && tx.transaction.is_local() {
453                    let sender_id = tx.transaction.sender_id();
454                    if local_senders.insert(sender_id) {
455                        non_local_senders -= 1;
456                    }
457                    continue
458                }
459
460                total_size += tx.transaction.size();
461                total_removed += 1;
462                removed.push(*tx.transaction.id());
463            }
464
465            // remove the transactions from this iteration
466            for id in &removed {
467                if let Some(tx) = self.remove_transaction(id) {
468                    end_removed.push(tx);
469                }
470            }
471
472            // return if either the pool is under limits or there are no more _eligible_
473            // transactions to remove
474            if !self.exceeds(limit) || non_local_senders == 0 {
475                return
476            }
477        }
478    }
479
480    /// Truncates the pool to the given [`SubPoolLimit`], removing transactions until the subpool
481    /// limits are met.
482    ///
483    /// This attempts to remove transactions by roughly the same amount for each sender. For more
484    /// information on this exact process see docs for
485    /// [`remove_to_limit`](PendingPool::remove_to_limit).
486    ///
487    /// This first truncates all of the non-local transactions in the pool. If the subpool is still
488    /// not under the limit, this truncates the entire pool, including non-local transactions. The
489    /// removed transactions are returned.
490    pub fn truncate_pool(
491        &mut self,
492        limit: SubPoolLimit,
493    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
494        let mut removed = Vec::new();
495        // return early if the pool is already under the limits
496        if !self.exceeds(&limit) {
497            return removed
498        }
499
500        // first truncate only non-local transactions, returning if the pool end up under the limit
501        self.remove_to_limit(&limit, false, &mut removed);
502        if !self.exceeds(&limit) {
503            return removed
504        }
505
506        // now repeat for local transactions, since local transactions must be removed now for the
507        // pool to be under the limit
508        self.remove_to_limit(&limit, true, &mut removed);
509
510        removed
511    }
512
513    /// Returns true if the pool exceeds the given limit
514    #[inline]
515    pub(crate) fn exceeds(&self, limit: &SubPoolLimit) -> bool {
516        limit.is_exceeded(self.len(), self.size())
517    }
518
519    /// The reported size of all transactions in this pool.
520    pub(crate) fn size(&self) -> usize {
521        self.size_of.into()
522    }
523
524    /// Number of transactions in the entire pool
525    pub(crate) fn len(&self) -> usize {
526        self.by_id.len()
527    }
528
529    /// All transactions grouped by id
530    pub const fn by_id(&self) -> &BTreeMap<TransactionId, PendingTransaction<T>> {
531        &self.by_id
532    }
533
534    /// Independent transactions
535    pub const fn independent_transactions(&self) -> &FxHashMap<SenderId, PendingTransaction<T>> {
536        &self.independent_transactions
537    }
538
539    /// Subscribes to new transactions
540    pub fn new_transaction_receiver(&self) -> broadcast::Receiver<PendingTransaction<T>> {
541        self.new_transaction_notifier.subscribe()
542    }
543
544    /// Whether the pool is empty
545    #[cfg(test)]
546    pub(crate) fn is_empty(&self) -> bool {
547        self.by_id.is_empty()
548    }
549
550    /// Returns `true` if the transaction with the given id is already included in this pool.
551    pub(crate) fn contains(&self, id: &TransactionId) -> bool {
552        self.by_id.contains_key(id)
553    }
554
555    /// Get transactions by sender
556    pub(crate) fn get_txs_by_sender(&self, sender: SenderId) -> Vec<TransactionId> {
557        self.iter_txs_by_sender(sender).copied().collect()
558    }
559
560    /// Returns an iterator over all transaction with the sender id
561    pub(crate) fn iter_txs_by_sender(
562        &self,
563        sender: SenderId,
564    ) -> impl Iterator<Item = &TransactionId> + '_ {
565        self.by_id
566            .range((sender.start_bound(), Unbounded))
567            .take_while(move |(other, _)| sender == other.sender)
568            .map(|(tx_id, _)| tx_id)
569    }
570
571    /// Retrieves a transaction with the given ID from the pool, if it exists.
572    fn get(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
573        self.by_id.get(id)
574    }
575
576    /// Returns a reference to the independent transactions in the pool
577    #[cfg(test)]
578    pub(crate) const fn independent(&self) -> &FxHashMap<SenderId, PendingTransaction<T>> {
579        &self.independent_transactions
580    }
581
582    /// Asserts that the bijection between `by_id` and `all` is valid.
583    #[cfg(any(test, feature = "test-utils"))]
584    pub(crate) fn assert_invariants(&self) {
585        assert!(
586            self.independent_transactions.len() <= self.by_id.len(),
587            "independent_transactions.len() > by_id.len()"
588        );
589        assert!(
590            self.highest_nonces.len() <= self.by_id.len(),
591            "highest_nonces.len() > by_id.len()"
592        );
593        assert_eq!(
594            self.highest_nonces.len(),
595            self.independent_transactions.len(),
596            "highest_nonces.len() != independent_transactions.len()"
597        );
598    }
599}
600
601/// A transaction that is ready to be included in a block.
602#[derive(Debug)]
603pub struct PendingTransaction<T: TransactionOrdering> {
604    /// Identifier that tags when transaction was submitted in the pool.
605    pub submission_id: u64,
606    /// Actual transaction.
607    pub transaction: Arc<ValidPoolTransaction<T::Transaction>>,
608    /// The priority value assigned by the used `Ordering` function.
609    pub priority: Priority<T::PriorityValue>,
610}
611
612impl<T: TransactionOrdering> PendingTransaction<T> {
613    /// The next transaction of the sender: `nonce + 1`
614    pub fn unlocks(&self) -> TransactionId {
615        self.transaction.transaction_id.descendant()
616    }
617}
618
619impl<T: TransactionOrdering> Clone for PendingTransaction<T> {
620    fn clone(&self) -> Self {
621        Self {
622            submission_id: self.submission_id,
623            transaction: Arc::clone(&self.transaction),
624            priority: self.priority.clone(),
625        }
626    }
627}
628
629impl<T: TransactionOrdering> Eq for PendingTransaction<T> {}
630
631impl<T: TransactionOrdering> PartialEq<Self> for PendingTransaction<T> {
632    fn eq(&self, other: &Self) -> bool {
633        self.cmp(other) == Ordering::Equal
634    }
635}
636
637impl<T: TransactionOrdering> PartialOrd<Self> for PendingTransaction<T> {
638    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
639        Some(self.cmp(other))
640    }
641}
642
643impl<T: TransactionOrdering> Ord for PendingTransaction<T> {
644    fn cmp(&self, other: &Self) -> Ordering {
645        // This compares by `priority` and only if two tx have the exact same priority this compares
646        // the unique `submission_id`. This ensures that transactions with same priority are not
647        // equal, so they're not replaced in the set
648        self.priority
649            .cmp(&other.priority)
650            .then_with(|| other.submission_id.cmp(&self.submission_id))
651    }
652}
653
654#[cfg(test)]
655mod tests {
656    use super::*;
657    use crate::{
658        test_utils::{MockOrdering, MockTransaction, MockTransactionFactory, MockTransactionSet},
659        PoolTransaction,
660    };
661    use alloy_consensus::{Transaction, TxType};
662    use alloy_primitives::address;
663    use std::collections::HashSet;
664
665    #[test]
666    fn test_enforce_basefee() {
667        let mut f = MockTransactionFactory::default();
668        let mut pool = PendingPool::new(MockOrdering::default());
669        let tx = f.validated_arc(MockTransaction::eip1559().inc_price());
670        pool.add_transaction(tx.clone(), 0);
671
672        assert!(pool.contains(tx.id()));
673        assert_eq!(pool.len(), 1);
674
675        let removed = pool.update_base_fee(0);
676        assert!(removed.is_empty());
677
678        let removed = pool.update_base_fee((tx.max_fee_per_gas() + 1) as u64);
679        assert_eq!(removed.len(), 1);
680        assert!(pool.is_empty());
681    }
682
683    #[test]
684    fn test_enforce_basefee_descendant() {
685        let mut f = MockTransactionFactory::default();
686        let mut pool = PendingPool::new(MockOrdering::default());
687        let t = MockTransaction::eip1559().inc_price_by(10);
688        let root_tx = f.validated_arc(t.clone());
689        pool.add_transaction(root_tx.clone(), 0);
690
691        let descendant_tx = f.validated_arc(t.inc_nonce().decr_price());
692        pool.add_transaction(descendant_tx.clone(), 0);
693
694        assert!(pool.contains(root_tx.id()));
695        assert!(pool.contains(descendant_tx.id()));
696        assert_eq!(pool.len(), 2);
697
698        assert_eq!(pool.independent_transactions.len(), 1);
699        assert_eq!(pool.highest_nonces.len(), 1);
700
701        let removed = pool.update_base_fee(0);
702        assert!(removed.is_empty());
703
704        // two dependent tx in the pool with decreasing fee
705
706        {
707            let mut pool2 = pool.clone();
708            let removed = pool2.update_base_fee((descendant_tx.max_fee_per_gas() + 1) as u64);
709            assert_eq!(removed.len(), 1);
710            assert_eq!(pool2.len(), 1);
711            // descendant got popped
712            assert!(pool2.contains(root_tx.id()));
713            assert!(!pool2.contains(descendant_tx.id()));
714        }
715
716        // remove root transaction via fee
717        let removed = pool.update_base_fee((root_tx.max_fee_per_gas() + 1) as u64);
718        assert_eq!(removed.len(), 2);
719        assert!(pool.is_empty());
720        pool.assert_invariants();
721    }
722
723    #[test]
724    fn evict_worst() {
725        let mut f = MockTransactionFactory::default();
726        let mut pool = PendingPool::new(MockOrdering::default());
727
728        let t = MockTransaction::eip1559();
729        pool.add_transaction(f.validated_arc(t.clone()), 0);
730
731        let t2 = MockTransaction::eip1559().inc_price_by(10);
732        pool.add_transaction(f.validated_arc(t2), 0);
733
734        // First transaction should be evicted.
735        assert_eq!(
736            pool.highest_nonces.values().min().map(|tx| *tx.transaction.hash()),
737            Some(*t.hash())
738        );
739
740        // truncate pool with max size = 1, ensure it's the same transaction
741        let removed = pool.truncate_pool(SubPoolLimit { max_txs: 1, max_size: usize::MAX });
742        assert_eq!(removed.len(), 1);
743        assert_eq!(removed[0].hash(), t.hash());
744    }
745
746    #[test]
747    fn correct_independent_descendants() {
748        // this test ensures that we set the right highest nonces set for each sender
749        let mut f = MockTransactionFactory::default();
750        let mut pool = PendingPool::new(MockOrdering::default());
751
752        let a_sender = address!("0x000000000000000000000000000000000000000a");
753        let b_sender = address!("0x000000000000000000000000000000000000000b");
754        let c_sender = address!("0x000000000000000000000000000000000000000c");
755        let d_sender = address!("0x000000000000000000000000000000000000000d");
756
757        // create a chain of transactions by sender A, B, C
758        let mut tx_set = MockTransactionSet::dependent(a_sender, 0, 4, TxType::Eip1559);
759        let a = tx_set.clone().into_vec();
760
761        let b = MockTransactionSet::dependent(b_sender, 0, 3, TxType::Eip1559).into_vec();
762        tx_set.extend(b.clone());
763
764        // C has the same number of txs as B
765        let c = MockTransactionSet::dependent(c_sender, 0, 3, TxType::Eip1559).into_vec();
766        tx_set.extend(c.clone());
767
768        let d = MockTransactionSet::dependent(d_sender, 0, 1, TxType::Eip1559).into_vec();
769        tx_set.extend(d.clone());
770
771        // add all the transactions to the pool
772        let all_txs = tx_set.into_vec();
773        for tx in all_txs {
774            pool.add_transaction(f.validated_arc(tx), 0);
775        }
776
777        pool.assert_invariants();
778
779        // the independent set is the roots of each of these tx chains, these are the highest
780        // nonces for each sender
781        let expected_highest_nonces = [d[0].clone(), c[2].clone(), b[2].clone(), a[3].clone()]
782            .iter()
783            .map(|tx| (tx.sender(), tx.nonce()))
784            .collect::<HashSet<_>>();
785        let actual_highest_nonces = pool
786            .highest_nonces
787            .values()
788            .map(|tx| (tx.transaction.sender(), tx.transaction.nonce()))
789            .collect::<HashSet<_>>();
790        assert_eq!(expected_highest_nonces, actual_highest_nonces);
791        pool.assert_invariants();
792    }
793
794    #[test]
795    fn truncate_by_sender() {
796        // This test ensures that transactions are removed from the pending pool by sender.
797        let mut f = MockTransactionFactory::default();
798        let mut pool = PendingPool::new(MockOrdering::default());
799
800        // Addresses for simulated senders A, B, C, and D.
801        let a = address!("0x000000000000000000000000000000000000000a");
802        let b = address!("0x000000000000000000000000000000000000000b");
803        let c = address!("0x000000000000000000000000000000000000000c");
804        let d = address!("0x000000000000000000000000000000000000000d");
805
806        // Create transaction chains for senders A, B, C, and D.
807        let a_txs = MockTransactionSet::sequential_transactions_by_sender(a, 4, TxType::Eip1559);
808        let b_txs = MockTransactionSet::sequential_transactions_by_sender(b, 3, TxType::Eip1559);
809        let c_txs = MockTransactionSet::sequential_transactions_by_sender(c, 3, TxType::Eip1559);
810        let d_txs = MockTransactionSet::sequential_transactions_by_sender(d, 1, TxType::Eip1559);
811
812        // Set up expected pending transactions.
813        let expected_pending = vec![
814            a_txs.transactions[0].clone(),
815            b_txs.transactions[0].clone(),
816            c_txs.transactions[0].clone(),
817            a_txs.transactions[1].clone(),
818        ]
819        .into_iter()
820        .map(|tx| (tx.sender(), tx.nonce()))
821        .collect::<HashSet<_>>();
822
823        // Set up expected removed transactions.
824        let expected_removed = vec![
825            d_txs.transactions[0].clone(),
826            c_txs.transactions[2].clone(),
827            b_txs.transactions[2].clone(),
828            a_txs.transactions[3].clone(),
829            c_txs.transactions[1].clone(),
830            b_txs.transactions[1].clone(),
831            a_txs.transactions[2].clone(),
832        ]
833        .into_iter()
834        .map(|tx| (tx.sender(), tx.nonce()))
835        .collect::<HashSet<_>>();
836
837        // Consolidate all transactions into a single vector.
838        let all_txs =
839            [a_txs.into_vec(), b_txs.into_vec(), c_txs.into_vec(), d_txs.into_vec()].concat();
840
841        // Add all the transactions to the pool.
842        for tx in all_txs {
843            pool.add_transaction(f.validated_arc(tx), 0);
844        }
845
846        // Sanity check, ensuring everything is consistent.
847        pool.assert_invariants();
848
849        // Define the maximum total transactions to be 4, removing transactions for each sender.
850        // Expected order of removal:
851        // * d1, c3, b3, a4
852        // * c2, b2, a3
853        //
854        // Remaining transactions:
855        // * a1, a2
856        // * b1
857        // * c1
858        let pool_limit = SubPoolLimit { max_txs: 4, max_size: usize::MAX };
859
860        // Truncate the pool based on the defined limit.
861        let removed = pool.truncate_pool(pool_limit);
862        pool.assert_invariants();
863        assert_eq!(removed.len(), expected_removed.len());
864
865        // Get the set of removed transactions and compare with the expected set.
866        let removed =
867            removed.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::<HashSet<_>>();
868        assert_eq!(removed, expected_removed);
869
870        // Retrieve the current pending transactions after truncation.
871        let pending = pool.all().collect::<Vec<_>>();
872        assert_eq!(pending.len(), expected_pending.len());
873
874        // Get the set of pending transactions and compare with the expected set.
875        let pending =
876            pending.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::<HashSet<_>>();
877        assert_eq!(pending, expected_pending);
878    }
879
880    // <https://github.com/paradigmxyz/reth/issues/12340>
881    #[test]
882    fn test_eligible_updates_promoted() {
883        let mut pool = PendingPool::new(MockOrdering::default());
884        let mut f = MockTransactionFactory::default();
885
886        let num_senders = 10;
887
888        let first_txs: Vec<_> = (0..num_senders) //
889            .map(|_| MockTransaction::eip1559())
890            .collect();
891        let second_txs: Vec<_> =
892            first_txs.iter().map(|tx| tx.clone().rng_hash().inc_nonce()).collect();
893
894        for tx in first_txs {
895            let valid_tx = f.validated(tx);
896            pool.add_transaction(Arc::new(valid_tx), 0);
897        }
898
899        let mut best = pool.best();
900
901        for _ in 0..num_senders {
902            if let Some(tx) = best.next() {
903                assert_eq!(tx.nonce(), 0);
904            } else {
905                panic!("cannot read one of first_txs");
906            }
907        }
908
909        for tx in second_txs {
910            let valid_tx = f.validated(tx);
911            pool.add_transaction(Arc::new(valid_tx), 0);
912        }
913
914        for _ in 0..num_senders {
915            if let Some(tx) = best.next() {
916                assert_eq!(tx.nonce(), 1);
917            } else {
918                panic!("cannot read one of second_txs");
919            }
920        }
921    }
922
923    #[test]
924    fn test_empty_pool_behavior() {
925        let mut pool = PendingPool::<MockOrdering>::new(MockOrdering::default());
926
927        // Ensure the pool is empty
928        assert!(pool.is_empty());
929        assert_eq!(pool.len(), 0);
930        assert_eq!(pool.size(), 0);
931
932        // Verify that attempting to truncate an empty pool does not panic and returns an empty vec
933        let removed = pool.truncate_pool(SubPoolLimit { max_txs: 10, max_size: 1000 });
934        assert!(removed.is_empty());
935
936        // Verify that retrieving transactions from an empty pool yields nothing
937        assert!(pool.all().next().is_none());
938    }
939
940    #[test]
941    fn test_add_remove_transaction() {
942        let mut f = MockTransactionFactory::default();
943        let mut pool = PendingPool::new(MockOrdering::default());
944
945        // Add a transaction and check if it's in the pool
946        let tx = f.validated_arc(MockTransaction::eip1559());
947        pool.add_transaction(tx.clone(), 0);
948        assert!(pool.contains(tx.id()));
949        assert_eq!(pool.len(), 1);
950
951        // Remove the transaction and ensure it's no longer in the pool
952        let removed_tx = pool.remove_transaction(tx.id()).unwrap();
953        assert_eq!(removed_tx.id(), tx.id());
954        assert!(!pool.contains(tx.id()));
955        assert_eq!(pool.len(), 0);
956    }
957
958    #[test]
959    fn test_reorder_on_basefee_update() {
960        let mut f = MockTransactionFactory::default();
961        let mut pool = PendingPool::new(MockOrdering::default());
962
963        // Add two transactions with different fees
964        let tx1 = f.validated_arc(MockTransaction::eip1559().inc_price());
965        let tx2 = f.validated_arc(MockTransaction::eip1559().inc_price_by(20));
966        pool.add_transaction(tx1.clone(), 0);
967        pool.add_transaction(tx2.clone(), 0);
968
969        // Ensure the transactions are in the correct order
970        let mut best = pool.best();
971        assert_eq!(best.next().unwrap().hash(), tx2.hash());
972        assert_eq!(best.next().unwrap().hash(), tx1.hash());
973
974        // Update the base fee to a value higher than tx1's fee, causing it to be removed
975        let removed = pool.update_base_fee((tx1.max_fee_per_gas() + 1) as u64);
976        assert_eq!(removed.len(), 1);
977        assert_eq!(removed[0].hash(), tx1.hash());
978
979        // Verify that only tx2 remains in the pool
980        assert_eq!(pool.len(), 1);
981        assert!(pool.contains(tx2.id()));
982        assert!(!pool.contains(tx1.id()));
983    }
984
985    #[test]
986    #[should_panic(expected = "transaction already included")]
987    fn test_handle_duplicates() {
988        let mut f = MockTransactionFactory::default();
989        let mut pool = PendingPool::new(MockOrdering::default());
990
991        // Add the same transaction twice and ensure it only appears once
992        let tx = f.validated_arc(MockTransaction::eip1559());
993        pool.add_transaction(tx.clone(), 0);
994        assert!(pool.contains(tx.id()));
995        assert_eq!(pool.len(), 1);
996
997        // Attempt to add the same transaction again, which should be ignored
998        pool.add_transaction(tx, 0);
999    }
1000
1001    #[test]
1002    fn test_update_blob_fee() {
1003        let mut f = MockTransactionFactory::default();
1004        let mut pool = PendingPool::new(MockOrdering::default());
1005
1006        // Add transactions with varying blob fees
1007        let tx1 = f.validated_arc(MockTransaction::eip4844().set_blob_fee(50).clone());
1008        let tx2 = f.validated_arc(MockTransaction::eip4844().set_blob_fee(150).clone());
1009        pool.add_transaction(tx1.clone(), 0);
1010        pool.add_transaction(tx2.clone(), 0);
1011
1012        // Update the blob fee to a value that causes tx1 to be removed
1013        let removed = pool.update_blob_fee(100);
1014        assert_eq!(removed.len(), 1);
1015        assert_eq!(removed[0].hash(), tx1.hash());
1016
1017        // Verify that only tx2 remains in the pool
1018        assert!(pool.contains(tx2.id()));
1019        assert!(!pool.contains(tx1.id()));
1020    }
1021
1022    #[test]
1023    fn local_senders_tracking() {
1024        let mut f = MockTransactionFactory::default();
1025        let mut pool = PendingPool::new(MockOrdering::default());
1026
1027        // Addresses for simulated senders A, B, C
1028        let a = address!("0x000000000000000000000000000000000000000a");
1029        let b = address!("0x000000000000000000000000000000000000000b");
1030        let c = address!("0x000000000000000000000000000000000000000c");
1031
1032        // sender A (local) - 11+ transactions (large enough to keep limit exceeded)
1033        // sender B (external) - 2 transactions
1034        // sender C (external) - 2 transactions
1035
1036        // Create transaction chains for senders A, B, C
1037        let a_txs = MockTransactionSet::sequential_transactions_by_sender(a, 11, TxType::Eip1559);
1038        let b_txs = MockTransactionSet::sequential_transactions_by_sender(b, 2, TxType::Eip1559);
1039        let c_txs = MockTransactionSet::sequential_transactions_by_sender(c, 2, TxType::Eip1559);
1040
1041        // create local txs for sender A
1042        for tx in a_txs.into_vec() {
1043            let final_tx = Arc::new(f.validated_with_origin(crate::TransactionOrigin::Local, tx));
1044
1045            pool.add_transaction(final_tx, 0);
1046        }
1047
1048        // create external txs for senders B and C
1049        let remaining_txs = [b_txs.into_vec(), c_txs.into_vec()].concat();
1050        for tx in remaining_txs {
1051            let final_tx = f.validated_arc(tx);
1052
1053            pool.add_transaction(final_tx, 0);
1054        }
1055
1056        // Sanity check, ensuring everything is consistent.
1057        pool.assert_invariants();
1058
1059        let pool_limit = SubPoolLimit { max_txs: 10, max_size: usize::MAX };
1060        pool.truncate_pool(pool_limit);
1061
1062        let sender_a = f.ids.sender_id(&a).unwrap();
1063        let sender_b = f.ids.sender_id(&b).unwrap();
1064        let sender_c = f.ids.sender_id(&c).unwrap();
1065
1066        assert_eq!(pool.get_txs_by_sender(sender_a).len(), 10);
1067        assert!(pool.get_txs_by_sender(sender_b).is_empty());
1068        assert!(pool.get_txs_by_sender(sender_c).is_empty());
1069    }
1070
1071    #[test]
1072    fn test_remove_non_highest_keeps_highest() {
1073        let mut f = MockTransactionFactory::default();
1074        let mut pool = PendingPool::new(MockOrdering::default());
1075        let sender = address!("0x00000000000000000000000000000000000000aa");
1076        let txs = MockTransactionSet::dependent(sender, 0, 3, TxType::Eip1559).into_vec();
1077        for tx in txs {
1078            pool.add_transaction(f.validated_arc(tx), 0);
1079        }
1080        pool.assert_invariants();
1081        let sender_id = f.ids.sender_id(&sender).unwrap();
1082        let mid_id = TransactionId::new(sender_id, 1);
1083        let _ = pool.remove_transaction(&mid_id);
1084        let highest = pool.highest_nonces.get(&sender_id).unwrap();
1085        assert_eq!(highest.transaction.nonce(), 2);
1086        pool.assert_invariants();
1087    }
1088
1089    #[test]
1090    fn test_cascade_removal_recomputes_highest() {
1091        let mut f = MockTransactionFactory::default();
1092        let mut pool = PendingPool::new(MockOrdering::default());
1093        let sender = address!("0x00000000000000000000000000000000000000bb");
1094        let txs = MockTransactionSet::dependent(sender, 0, 4, TxType::Eip1559).into_vec();
1095        for tx in txs {
1096            pool.add_transaction(f.validated_arc(tx), 0);
1097        }
1098        pool.assert_invariants();
1099        let sender_id = f.ids.sender_id(&sender).unwrap();
1100        let id3 = TransactionId::new(sender_id, 3);
1101        let _ = pool.remove_transaction(&id3);
1102        let highest = pool.highest_nonces.get(&sender_id).unwrap();
1103        assert_eq!(highest.transaction.nonce(), 2);
1104        let id2 = TransactionId::new(sender_id, 2);
1105        let _ = pool.remove_transaction(&id2);
1106        let highest = pool.highest_nonces.get(&sender_id).unwrap();
1107        assert_eq!(highest.transaction.nonce(), 1);
1108        pool.assert_invariants();
1109    }
1110
1111    #[test]
1112    fn test_remove_only_tx_clears_highest() {
1113        let mut f = MockTransactionFactory::default();
1114        let mut pool = PendingPool::new(MockOrdering::default());
1115        let sender = address!("0x00000000000000000000000000000000000000cc");
1116        let txs = MockTransactionSet::dependent(sender, 0, 1, TxType::Eip1559).into_vec();
1117        for tx in txs {
1118            pool.add_transaction(f.validated_arc(tx), 0);
1119        }
1120        pool.assert_invariants();
1121        let sender_id = f.ids.sender_id(&sender).unwrap();
1122        let id0 = TransactionId::new(sender_id, 0);
1123        let _ = pool.remove_transaction(&id0);
1124        assert!(!pool.highest_nonces.contains_key(&sender_id));
1125        pool.assert_invariants();
1126    }
1127}