Skip to main content

reth_transaction_pool/pool/
pending.rs

1//! Pending transactions
2
3use crate::{
4    identifier::{SenderId, TransactionId},
5    pool::{
6        best::{BestTransactions, BestTransactionsWithFees},
7        size::SizeTracker,
8    },
9    Priority, SubPoolLimit, TransactionOrdering, ValidPoolTransaction,
10};
11use rustc_hash::{FxHashMap, FxHashSet};
12use std::{
13    cmp::Ordering,
14    collections::{hash_map::Entry, BTreeMap},
15    ops::Bound::Unbounded,
16    sync::Arc,
17};
18use tokio::sync::broadcast;
19
20/// A pool of validated and gapless transactions that are ready to be executed on the current state
21/// and are waiting to be included in a block.
22///
23/// This pool distinguishes between `independent` transactions and pending transactions. A
24/// transaction is `independent`, if it is in the pending pool, and it has the current on chain
25/// nonce of the sender. Meaning `independent` transactions can be executed right away, other
26/// pending transactions depend on at least one `independent` transaction.
27///
28/// Once an `independent` transaction was executed it *unlocks* the next nonce, if this transaction
29/// is also pending, then this will be moved to the `independent` queue.
30#[derive(Debug, Clone)]
31pub struct PendingPool<T: TransactionOrdering> {
32    /// How to order transactions.
33    ordering: T,
34    /// Keeps track of transactions inserted in the pool.
35    ///
36    /// This way we can determine when transactions were submitted to the pool.
37    submission_id: u64,
38    /// _All_ Transactions that are currently inside the pool grouped by their identifier.
39    by_id: BTreeMap<TransactionId, PendingTransaction<T>>,
40    /// The highest nonce transactions for each sender - like the `independent` set, but the
41    /// highest instead of lowest nonce.
42    highest_nonces: FxHashMap<SenderId, PendingTransaction<T>>,
43    /// Independent transactions that can be included directly and don't require other
44    /// transactions.
45    independent_transactions: FxHashMap<SenderId, PendingTransaction<T>>,
46    /// Keeps track of the size of this pool.
47    ///
48    /// See also [`reth_primitives_traits::InMemorySize::size`].
49    size_of: SizeTracker,
50    /// Used to broadcast new transactions that have been added to the `PendingPool` to existing
51    /// `static_files` of this pool.
52    new_transaction_notifier: broadcast::Sender<PendingTransaction<T>>,
53}
54
55// === impl PendingPool ===
56
57impl<T: TransactionOrdering> PendingPool<T> {
58    /// Create a new pending pool instance.
59    pub fn new(ordering: T) -> Self {
60        Self::with_buffer(ordering, 200)
61    }
62
63    /// Create a new pool instance with the given buffer capacity.
64    pub fn with_buffer(ordering: T, buffer_capacity: usize) -> Self {
65        let (new_transaction_notifier, _) = broadcast::channel(buffer_capacity);
66        Self {
67            ordering,
68            submission_id: 0,
69            by_id: Default::default(),
70            independent_transactions: Default::default(),
71            highest_nonces: Default::default(),
72            size_of: Default::default(),
73            new_transaction_notifier,
74        }
75    }
76
77    /// Clear all transactions from the pool without resetting other values.
78    /// Used for atomic reordering during basefee update.
79    ///
80    /// # Returns
81    ///
82    /// Returns all transactions by id.
83    fn clear_transactions(&mut self) -> BTreeMap<TransactionId, PendingTransaction<T>> {
84        self.independent_transactions.clear();
85        self.highest_nonces.clear();
86        self.size_of.reset();
87        std::mem::take(&mut self.by_id)
88    }
89
90    /// Returns an iterator over all transactions that are _currently_ ready.
91    ///
92    /// 1. The iterator _always_ returns transactions in order: it never returns a transaction with
93    ///    an unsatisfied dependency and only returns them if dependency transaction were yielded
94    ///    previously. In other words: the nonces of transactions with the same sender will _always_
95    ///    increase by exactly 1.
96    ///
97    /// The order of transactions which satisfy (1.) is determined by their computed priority: a
98    /// transaction with a higher priority is returned before a transaction with a lower priority.
99    ///
100    /// If two transactions have the same priority score, then the transactions which spent more
101    /// time in pool (were added earlier) are returned first.
102    ///
103    /// NOTE: while this iterator returns transaction that pool considers valid at this point, they
104    /// could potentially become invalid at point of execution. Therefore, this iterator
105    /// provides a way to mark transactions that the consumer of this iterator considers invalid. In
106    /// which case the transaction's subgraph is also automatically marked invalid, See (1.).
107    /// Invalid transactions are skipped.
108    pub fn best(&self) -> BestTransactions<T> {
109        BestTransactions {
110            all: self.by_id.clone(),
111            independent: self.independent_transactions.values().cloned().collect(),
112            invalid: Default::default(),
113            new_transaction_receiver: Some(self.new_transaction_notifier.subscribe()),
114            last_priority: None,
115            skip_blobs: false,
116        }
117    }
118
119    /// Same as `best` but only returns transactions that satisfy the given basefee and blobfee.
120    pub(crate) fn best_with_basefee_and_blobfee(
121        &self,
122        base_fee: u64,
123        base_fee_per_blob_gas: u64,
124    ) -> BestTransactionsWithFees<T> {
125        BestTransactionsWithFees { best: self.best(), base_fee, base_fee_per_blob_gas }
126    }
127
128    /// Same as `best` but also includes the given unlocked transactions.
129    ///
130    /// This mimics the [`Self::add_transaction`] method, but does not insert the transactions into
131    /// pool but only into the returned iterator.
132    ///
133    /// Note: this does not insert the unlocked transactions into the pool.
134    ///
135    /// # Panics
136    ///
137    /// if the transaction is already included
138    pub(crate) fn best_with_unlocked_and_attributes(
139        &self,
140        unlocked: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
141        base_fee: u64,
142        base_fee_per_blob_gas: u64,
143    ) -> BestTransactionsWithFees<T> {
144        let mut best = self.best();
145        for (submission_id, tx) in (self.submission_id + 1..).zip(unlocked) {
146            debug_assert!(!best.all.contains_key(tx.id()), "transaction already included");
147            let priority = self.ordering.priority(&tx.transaction, base_fee);
148            let tx_id = *tx.id();
149            let transaction = PendingTransaction { submission_id, transaction: tx, priority };
150            if best.ancestor(&tx_id).is_none() {
151                best.independent.insert(transaction.clone());
152            }
153            best.all.insert(tx_id, transaction);
154        }
155
156        BestTransactionsWithFees { best, base_fee, base_fee_per_blob_gas }
157    }
158
159    /// Returns an iterator over all transactions in the pool
160    pub(crate) fn all(
161        &self,
162    ) -> impl ExactSizeIterator<Item = Arc<ValidPoolTransaction<T::Transaction>>> + '_ {
163        self.by_id.values().map(|tx| tx.transaction.clone())
164    }
165
166    /// Updates the pool with the new blob fee. Removes
167    /// from the subpool all transactions and their dependents that no longer satisfy the given
168    /// blob fee (`tx.max_blob_fee < blob_fee`).
169    ///
170    /// Note: the transactions are not returned in a particular order.
171    ///
172    /// # Returns
173    ///
174    /// Removed transactions that no longer satisfy the blob fee.
175    pub(crate) fn update_blob_fee(
176        &mut self,
177        blob_fee: u128,
178    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
179        // Create a collection for removed transactions.
180        let mut removed = Vec::new();
181
182        // Drain and iterate over all transactions.
183        let mut transactions_iter = self.clear_transactions().into_iter().peekable();
184        while let Some((id, tx)) = transactions_iter.next() {
185            if tx.transaction.is_eip4844() && tx.transaction.max_fee_per_blob_gas() < Some(blob_fee)
186            {
187                // Add this tx to the removed collection since it no longer satisfies the blob fee
188                // condition. Decrease the total pool size.
189                removed.push(Arc::clone(&tx.transaction));
190
191                // Remove all dependent transactions.
192                'this: while let Some((next_id, next_tx)) = transactions_iter.peek() {
193                    if next_id.sender != id.sender {
194                        break 'this
195                    }
196                    removed.push(Arc::clone(&next_tx.transaction));
197                    transactions_iter.next();
198                }
199            } else {
200                self.size_of += tx.transaction.size();
201                self.update_independents_and_highest_nonces(&tx);
202                self.by_id.insert(id, tx);
203            }
204        }
205
206        removed
207    }
208
209    /// Updates the pool with the new base fee. Reorders transactions by new priorities. Removes
210    /// from the subpool all transactions and their dependents that no longer satisfy the given
211    /// base fee (`tx.fee < base_fee`).
212    ///
213    /// Note: the transactions are not returned in a particular order.
214    ///
215    /// # Returns
216    ///
217    /// Removed transactions that no longer satisfy the base fee.
218    pub(crate) fn update_base_fee(
219        &mut self,
220        base_fee: u64,
221    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
222        // Create a collection for removed transactions.
223        let mut removed = Vec::new();
224
225        // Drain and iterate over all transactions.
226        let mut transactions_iter = self.clear_transactions().into_iter().peekable();
227        while let Some((id, mut tx)) = transactions_iter.next() {
228            if tx.transaction.max_fee_per_gas() < base_fee as u128 {
229                // Add this tx to the removed collection since it no longer satisfies the base fee
230                // condition. Decrease the total pool size.
231                removed.push(Arc::clone(&tx.transaction));
232
233                // Remove all dependent transactions.
234                'this: while let Some((next_id, next_tx)) = transactions_iter.peek() {
235                    if next_id.sender != id.sender {
236                        break 'this
237                    }
238                    removed.push(Arc::clone(&next_tx.transaction));
239                    transactions_iter.next();
240                }
241            } else {
242                // Re-insert the transaction with new priority.
243                tx.priority = self.ordering.priority(&tx.transaction.transaction, base_fee);
244
245                self.size_of += tx.transaction.size();
246                self.update_independents_and_highest_nonces(&tx);
247                self.by_id.insert(id, tx);
248            }
249        }
250
251        removed
252    }
253
254    /// Updates the independent transaction and highest nonces set, assuming the given transaction
255    /// is being _added_ to the pool.
256    fn update_independents_and_highest_nonces(&mut self, tx: &PendingTransaction<T>) {
257        match self.highest_nonces.entry(tx.transaction.sender_id()) {
258            Entry::Occupied(mut entry) => {
259                if entry.get().transaction.nonce() < tx.transaction.nonce() {
260                    *entry.get_mut() = tx.clone();
261                }
262            }
263            Entry::Vacant(entry) => {
264                entry.insert(tx.clone());
265            }
266        }
267        match self.independent_transactions.entry(tx.transaction.sender_id()) {
268            Entry::Occupied(mut entry) => {
269                if entry.get().transaction.nonce() > tx.transaction.nonce() {
270                    *entry.get_mut() = tx.clone();
271                }
272            }
273            Entry::Vacant(entry) => {
274                entry.insert(tx.clone());
275            }
276        }
277    }
278
279    /// Adds a new transactions to the pending queue.
280    ///
281    /// # Panics
282    ///
283    /// if the transaction is already included
284    pub fn add_transaction(
285        &mut self,
286        tx: Arc<ValidPoolTransaction<T::Transaction>>,
287        base_fee: u64,
288    ) {
289        debug_assert!(
290            !self.contains(tx.id()),
291            "transaction already included {:?}",
292            self.get(tx.id()).unwrap().transaction
293        );
294
295        // keep track of size
296        self.size_of += tx.size();
297
298        let tx_id = *tx.id();
299
300        let submission_id = self.next_id();
301        let priority = self.ordering.priority(&tx.transaction, base_fee);
302        let tx = PendingTransaction { submission_id, transaction: tx, priority };
303
304        self.update_independents_and_highest_nonces(&tx);
305
306        // send the new transaction to any existing pendingpool static file iterators
307        if self.new_transaction_notifier.receiver_count() > 0 {
308            let _ = self.new_transaction_notifier.send(tx.clone());
309        }
310
311        self.by_id.insert(tx_id, tx);
312    }
313
314    /// Removes the transaction from the pool.
315    ///
316    /// Note: If the transaction has a descendant transaction
317    /// it will advance it to the best queue.
318    pub(crate) fn remove_transaction(
319        &mut self,
320        id: &TransactionId,
321    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
322        if let Some(lowest) = self.independent_transactions.get(&id.sender) &&
323            lowest.transaction.nonce() == id.nonce
324        {
325            self.independent_transactions.remove(&id.sender);
326            // mark the next as independent if it exists
327            if let Some(unlocked) = self.get(&id.descendant()) {
328                self.independent_transactions.insert(id.sender, unlocked.clone());
329            }
330        }
331
332        let tx = self.by_id.remove(id)?;
333        self.size_of -= tx.transaction.size();
334
335        match self.highest_nonces.entry(id.sender) {
336            Entry::Occupied(mut entry) => {
337                if entry.get().transaction.nonce() == id.nonce {
338                    // we just removed the tx with the highest nonce for this sender, find the
339                    // highest remaining tx from that sender
340                    if let Some((_, new_highest)) = self
341                        .by_id
342                        .range((
343                            id.sender.start_bound(),
344                            std::ops::Bound::Included(TransactionId::new(id.sender, u64::MAX)),
345                        ))
346                        .last()
347                    {
348                        // insert the new highest nonce for this sender
349                        entry.insert(new_highest.clone());
350                    } else {
351                        entry.remove();
352                    }
353                }
354            }
355            Entry::Vacant(_) => {
356                debug_assert!(
357                    false,
358                    "removed transaction without a tracked highest nonce {:?}",
359                    id
360                );
361            }
362        }
363
364        Some(tx.transaction)
365    }
366
367    const fn next_id(&mut self) -> u64 {
368        let id = self.submission_id;
369        self.submission_id = self.submission_id.wrapping_add(1);
370        id
371    }
372
373    /// Traverses the pool, starting at the highest nonce set, removing the transactions which
374    /// would put the pool under the specified limits.
375    ///
376    /// This attempts to remove transactions by roughly the same amount for each sender. This is
377    /// done by removing the highest-nonce transactions for each sender.
378    ///
379    /// If the `remove_locals` flag is unset, transactions will be removed per-sender until a
380    /// local transaction is the highest nonce transaction for that sender. If all senders have a
381    /// local highest-nonce transaction, the pool will not be truncated further.
382    ///
383    /// Otherwise, if the `remove_locals` flag is set, transactions will be removed per-sender
384    /// until the pool is under the given limits.
385    ///
386    /// Any removed transactions will be added to the `end_removed` vector.
387    pub fn remove_to_limit(
388        &mut self,
389        limit: &SubPoolLimit,
390        remove_locals: bool,
391        end_removed: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
392    ) {
393        // This serves as a termination condition for the loop - it represents the number of
394        // _valid_ unique senders that might have descendants in the pool.
395        //
396        // If `remove_locals` is false, a value of zero means that there are no non-local txs in the
397        // pool that can be removed.
398        //
399        // If `remove_locals` is true, a value of zero means that there are no txs in the pool that
400        // can be removed.
401        let mut non_local_senders = self.highest_nonces.len();
402
403        // keeps track of unique senders from previous iterations, to understand how many unique
404        // senders were removed in the last iteration
405        let mut unique_senders = self.highest_nonces.len();
406
407        // keeps track of which senders we've marked as local
408        let mut local_senders = FxHashSet::default();
409
410        // keep track of transactions to remove and how many have been removed so far
411        let original_length = self.len();
412        let mut removed = Vec::new();
413        let mut total_removed = 0;
414
415        // track total `size` of transactions to remove
416        let original_size = self.size();
417        let mut total_size = 0;
418
419        loop {
420            // check how many unique senders were removed last iteration
421            let unique_removed = unique_senders - self.highest_nonces.len();
422
423            // the new number of unique senders
424            unique_senders = self.highest_nonces.len();
425            non_local_senders -= unique_removed;
426
427            // we can reuse the temp array
428            removed.clear();
429
430            // we prefer removing transactions with lower ordering
431            let mut worst_transactions = self.highest_nonces.values().collect::<Vec<_>>();
432            worst_transactions.sort_unstable();
433
434            // loop through the highest nonces set, removing transactions until we reach the limit
435            for tx in worst_transactions {
436                // return early if the pool is under limits
437                if !limit.is_exceeded(original_length - total_removed, original_size - total_size) ||
438                    non_local_senders == 0
439                {
440                    // need to remove remaining transactions before exiting
441                    for id in &removed {
442                        if let Some(tx) = self.remove_transaction(id) {
443                            end_removed.push(tx);
444                        }
445                    }
446
447                    return
448                }
449
450                if !remove_locals && tx.transaction.is_local() {
451                    let sender_id = tx.transaction.sender_id();
452                    if local_senders.insert(sender_id) {
453                        non_local_senders -= 1;
454                    }
455                    continue
456                }
457
458                total_size += tx.transaction.size();
459                total_removed += 1;
460                removed.push(*tx.transaction.id());
461            }
462
463            // remove the transactions from this iteration
464            for id in &removed {
465                if let Some(tx) = self.remove_transaction(id) {
466                    end_removed.push(tx);
467                }
468            }
469
470            // return if either the pool is under limits or there are no more _eligible_
471            // transactions to remove
472            if !self.exceeds(limit) || non_local_senders == 0 {
473                return
474            }
475        }
476    }
477
478    /// Truncates the pool to the given [`SubPoolLimit`], removing transactions until the subpool
479    /// limits are met.
480    ///
481    /// This attempts to remove transactions by roughly the same amount for each sender. For more
482    /// information on this exact process see docs for
483    /// [`remove_to_limit`](PendingPool::remove_to_limit).
484    ///
485    /// This first truncates all of the non-local transactions in the pool. If the subpool is still
486    /// not under the limit, this truncates the entire pool, including non-local transactions. The
487    /// removed transactions are returned.
488    pub fn truncate_pool(
489        &mut self,
490        limit: SubPoolLimit,
491    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
492        let mut removed = Vec::new();
493        // return early if the pool is already under the limits
494        if !self.exceeds(&limit) {
495            return removed
496        }
497
498        // first truncate only non-local transactions, returning if the pool end up under the limit
499        self.remove_to_limit(&limit, false, &mut removed);
500        if !self.exceeds(&limit) {
501            return removed
502        }
503
504        // now repeat for local transactions, since local transactions must be removed now for the
505        // pool to be under the limit
506        self.remove_to_limit(&limit, true, &mut removed);
507
508        removed
509    }
510
511    /// Returns true if the pool exceeds the given limit
512    #[inline]
513    pub(crate) fn exceeds(&self, limit: &SubPoolLimit) -> bool {
514        limit.is_exceeded(self.len(), self.size())
515    }
516
517    /// The reported size of all transactions in this pool.
518    pub(crate) fn size(&self) -> usize {
519        self.size_of.into()
520    }
521
522    /// Number of transactions in the entire pool
523    pub(crate) fn len(&self) -> usize {
524        self.by_id.len()
525    }
526
527    /// All transactions grouped by id
528    pub const fn by_id(&self) -> &BTreeMap<TransactionId, PendingTransaction<T>> {
529        &self.by_id
530    }
531
532    /// Independent transactions
533    pub const fn independent_transactions(&self) -> &FxHashMap<SenderId, PendingTransaction<T>> {
534        &self.independent_transactions
535    }
536
537    /// Subscribes to new transactions
538    pub fn new_transaction_receiver(&self) -> broadcast::Receiver<PendingTransaction<T>> {
539        self.new_transaction_notifier.subscribe()
540    }
541
542    /// Whether the pool is empty
543    #[cfg(test)]
544    pub(crate) fn is_empty(&self) -> bool {
545        self.by_id.is_empty()
546    }
547
548    /// Returns `true` if the transaction with the given id is already included in this pool.
549    pub(crate) fn contains(&self, id: &TransactionId) -> bool {
550        self.by_id.contains_key(id)
551    }
552
553    /// Get transactions by sender
554    pub(crate) fn get_txs_by_sender(&self, sender: SenderId) -> Vec<TransactionId> {
555        self.iter_txs_by_sender(sender).copied().collect()
556    }
557
558    /// Returns an iterator over all transaction with the sender id
559    pub(crate) fn iter_txs_by_sender(
560        &self,
561        sender: SenderId,
562    ) -> impl Iterator<Item = &TransactionId> + '_ {
563        self.by_id
564            .range((sender.start_bound(), Unbounded))
565            .take_while(move |(other, _)| sender == other.sender)
566            .map(|(tx_id, _)| tx_id)
567    }
568
569    /// Returns all transactions for the given sender, using a `BTree` range query.
570    pub(crate) fn txs_by_sender(
571        &self,
572        sender: SenderId,
573    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
574        self.by_id
575            .range((sender.start_bound(), Unbounded))
576            .take_while(move |(other, _)| sender == other.sender)
577            .map(|(_, tx)| tx.transaction.clone())
578            .collect()
579    }
580
581    /// Retrieves a transaction with the given ID from the pool, if it exists.
582    fn get(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
583        self.by_id.get(id)
584    }
585
586    /// Returns a reference to the independent transactions in the pool
587    #[cfg(test)]
588    pub(crate) const fn independent(&self) -> &FxHashMap<SenderId, PendingTransaction<T>> {
589        &self.independent_transactions
590    }
591
592    /// Asserts that the bijection between `by_id` and `all` is valid.
593    #[cfg(any(test, feature = "test-utils"))]
594    pub(crate) fn assert_invariants(&self) {
595        assert!(
596            self.independent_transactions.len() <= self.by_id.len(),
597            "independent_transactions.len() > by_id.len()"
598        );
599        assert!(
600            self.highest_nonces.len() <= self.by_id.len(),
601            "highest_nonces.len() > by_id.len()"
602        );
603        assert_eq!(
604            self.highest_nonces.len(),
605            self.independent_transactions.len(),
606            "highest_nonces.len() != independent_transactions.len()"
607        );
608    }
609}
610
611/// A transaction that is ready to be included in a block.
612#[derive(Debug)]
613pub struct PendingTransaction<T: TransactionOrdering> {
614    /// Identifier that tags when transaction was submitted in the pool.
615    pub submission_id: u64,
616    /// Actual transaction.
617    pub transaction: Arc<ValidPoolTransaction<T::Transaction>>,
618    /// The priority value assigned by the used `Ordering` function.
619    pub priority: Priority<T::PriorityValue>,
620}
621
622impl<T: TransactionOrdering> PendingTransaction<T> {
623    /// The next transaction of the sender: `nonce + 1`
624    pub fn unlocks(&self) -> TransactionId {
625        self.transaction.transaction_id.descendant()
626    }
627}
628
629impl<T: TransactionOrdering> Clone for PendingTransaction<T> {
630    fn clone(&self) -> Self {
631        Self {
632            submission_id: self.submission_id,
633            transaction: Arc::clone(&self.transaction),
634            priority: self.priority.clone(),
635        }
636    }
637}
638
639impl<T: TransactionOrdering> Eq for PendingTransaction<T> {}
640
641impl<T: TransactionOrdering> PartialEq<Self> for PendingTransaction<T> {
642    fn eq(&self, other: &Self) -> bool {
643        self.cmp(other) == Ordering::Equal
644    }
645}
646
647impl<T: TransactionOrdering> PartialOrd<Self> for PendingTransaction<T> {
648    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
649        Some(self.cmp(other))
650    }
651}
652
653impl<T: TransactionOrdering> Ord for PendingTransaction<T> {
654    fn cmp(&self, other: &Self) -> Ordering {
655        // This compares by `priority` and only if two tx have the exact same priority this compares
656        // the unique `submission_id`. This ensures that transactions with same priority are not
657        // equal, so they're not replaced in the set
658        self.priority
659            .cmp(&other.priority)
660            .then_with(|| other.submission_id.cmp(&self.submission_id))
661    }
662}
663
664#[cfg(test)]
665mod tests {
666    use super::*;
667    use crate::{
668        test_utils::{MockOrdering, MockTransaction, MockTransactionFactory, MockTransactionSet},
669        PoolTransaction,
670    };
671    use alloy_consensus::{Transaction, TxType};
672    use alloy_primitives::address;
673    use std::collections::HashSet;
674
675    #[test]
676    fn test_enforce_basefee() {
677        let mut f = MockTransactionFactory::default();
678        let mut pool = PendingPool::new(MockOrdering::default());
679        let tx = f.validated_arc(MockTransaction::eip1559().inc_price());
680        pool.add_transaction(tx.clone(), 0);
681
682        assert!(pool.contains(tx.id()));
683        assert_eq!(pool.len(), 1);
684
685        let removed = pool.update_base_fee(0);
686        assert!(removed.is_empty());
687
688        let removed = pool.update_base_fee((tx.max_fee_per_gas() + 1) as u64);
689        assert_eq!(removed.len(), 1);
690        assert!(pool.is_empty());
691    }
692
693    #[test]
694    fn test_enforce_basefee_descendant() {
695        let mut f = MockTransactionFactory::default();
696        let mut pool = PendingPool::new(MockOrdering::default());
697        let t = MockTransaction::eip1559().inc_price_by(10);
698        let root_tx = f.validated_arc(t.clone());
699        pool.add_transaction(root_tx.clone(), 0);
700
701        let descendant_tx = f.validated_arc(t.inc_nonce().decr_price());
702        pool.add_transaction(descendant_tx.clone(), 0);
703
704        assert!(pool.contains(root_tx.id()));
705        assert!(pool.contains(descendant_tx.id()));
706        assert_eq!(pool.len(), 2);
707
708        assert_eq!(pool.independent_transactions.len(), 1);
709        assert_eq!(pool.highest_nonces.len(), 1);
710
711        let removed = pool.update_base_fee(0);
712        assert!(removed.is_empty());
713
714        // two dependent tx in the pool with decreasing fee
715
716        {
717            let mut pool2 = pool.clone();
718            let removed = pool2.update_base_fee((descendant_tx.max_fee_per_gas() + 1) as u64);
719            assert_eq!(removed.len(), 1);
720            assert_eq!(pool2.len(), 1);
721            // descendant got popped
722            assert!(pool2.contains(root_tx.id()));
723            assert!(!pool2.contains(descendant_tx.id()));
724        }
725
726        // remove root transaction via fee
727        let removed = pool.update_base_fee((root_tx.max_fee_per_gas() + 1) as u64);
728        assert_eq!(removed.len(), 2);
729        assert!(pool.is_empty());
730        pool.assert_invariants();
731    }
732
733    #[test]
734    fn evict_worst() {
735        let mut f = MockTransactionFactory::default();
736        let mut pool = PendingPool::new(MockOrdering::default());
737
738        let t = MockTransaction::eip1559();
739        pool.add_transaction(f.validated_arc(t.clone()), 0);
740
741        let t2 = MockTransaction::eip1559().inc_price_by(10);
742        pool.add_transaction(f.validated_arc(t2), 0);
743
744        // First transaction should be evicted.
745        assert_eq!(
746            pool.highest_nonces.values().min().map(|tx| *tx.transaction.hash()),
747            Some(*t.hash())
748        );
749
750        // truncate pool with max size = 1, ensure it's the same transaction
751        let removed = pool.truncate_pool(SubPoolLimit { max_txs: 1, max_size: usize::MAX });
752        assert_eq!(removed.len(), 1);
753        assert_eq!(removed[0].hash(), t.hash());
754    }
755
756    #[test]
757    fn correct_independent_descendants() {
758        // this test ensures that we set the right highest nonces set for each sender
759        let mut f = MockTransactionFactory::default();
760        let mut pool = PendingPool::new(MockOrdering::default());
761
762        let a_sender = address!("0x000000000000000000000000000000000000000a");
763        let b_sender = address!("0x000000000000000000000000000000000000000b");
764        let c_sender = address!("0x000000000000000000000000000000000000000c");
765        let d_sender = address!("0x000000000000000000000000000000000000000d");
766
767        // create a chain of transactions by sender A, B, C
768        let mut tx_set = MockTransactionSet::dependent(a_sender, 0, 4, TxType::Eip1559);
769        let a = tx_set.clone().into_vec();
770
771        let b = MockTransactionSet::dependent(b_sender, 0, 3, TxType::Eip1559).into_vec();
772        tx_set.extend(b.clone());
773
774        // C has the same number of txs as B
775        let c = MockTransactionSet::dependent(c_sender, 0, 3, TxType::Eip1559).into_vec();
776        tx_set.extend(c.clone());
777
778        let d = MockTransactionSet::dependent(d_sender, 0, 1, TxType::Eip1559).into_vec();
779        tx_set.extend(d.clone());
780
781        // add all the transactions to the pool
782        let all_txs = tx_set.into_vec();
783        for tx in all_txs {
784            pool.add_transaction(f.validated_arc(tx), 0);
785        }
786
787        pool.assert_invariants();
788
789        // the independent set is the roots of each of these tx chains, these are the highest
790        // nonces for each sender
791        let expected_highest_nonces = [d[0].clone(), c[2].clone(), b[2].clone(), a[3].clone()]
792            .iter()
793            .map(|tx| (tx.sender(), tx.nonce()))
794            .collect::<HashSet<_>>();
795        let actual_highest_nonces = pool
796            .highest_nonces
797            .values()
798            .map(|tx| (tx.transaction.sender(), tx.transaction.nonce()))
799            .collect::<HashSet<_>>();
800        assert_eq!(expected_highest_nonces, actual_highest_nonces);
801        pool.assert_invariants();
802    }
803
804    #[test]
805    fn truncate_by_sender() {
806        // This test ensures that transactions are removed from the pending pool by sender.
807        let mut f = MockTransactionFactory::default();
808        let mut pool = PendingPool::new(MockOrdering::default());
809
810        // Addresses for simulated senders A, B, C, and D.
811        let a = address!("0x000000000000000000000000000000000000000a");
812        let b = address!("0x000000000000000000000000000000000000000b");
813        let c = address!("0x000000000000000000000000000000000000000c");
814        let d = address!("0x000000000000000000000000000000000000000d");
815
816        // Create transaction chains for senders A, B, C, and D.
817        let a_txs = MockTransactionSet::sequential_transactions_by_sender(a, 4, TxType::Eip1559);
818        let b_txs = MockTransactionSet::sequential_transactions_by_sender(b, 3, TxType::Eip1559);
819        let c_txs = MockTransactionSet::sequential_transactions_by_sender(c, 3, TxType::Eip1559);
820        let d_txs = MockTransactionSet::sequential_transactions_by_sender(d, 1, TxType::Eip1559);
821
822        // Set up expected pending transactions.
823        let expected_pending = vec![
824            a_txs.transactions[0].clone(),
825            b_txs.transactions[0].clone(),
826            c_txs.transactions[0].clone(),
827            a_txs.transactions[1].clone(),
828        ]
829        .into_iter()
830        .map(|tx| (tx.sender(), tx.nonce()))
831        .collect::<HashSet<_>>();
832
833        // Set up expected removed transactions.
834        let expected_removed = vec![
835            d_txs.transactions[0].clone(),
836            c_txs.transactions[2].clone(),
837            b_txs.transactions[2].clone(),
838            a_txs.transactions[3].clone(),
839            c_txs.transactions[1].clone(),
840            b_txs.transactions[1].clone(),
841            a_txs.transactions[2].clone(),
842        ]
843        .into_iter()
844        .map(|tx| (tx.sender(), tx.nonce()))
845        .collect::<HashSet<_>>();
846
847        // Consolidate all transactions into a single vector.
848        let all_txs =
849            [a_txs.into_vec(), b_txs.into_vec(), c_txs.into_vec(), d_txs.into_vec()].concat();
850
851        // Add all the transactions to the pool.
852        for tx in all_txs {
853            pool.add_transaction(f.validated_arc(tx), 0);
854        }
855
856        // Sanity check, ensuring everything is consistent.
857        pool.assert_invariants();
858
859        // Define the maximum total transactions to be 4, removing transactions for each sender.
860        // Expected order of removal:
861        // * d1, c3, b3, a4
862        // * c2, b2, a3
863        //
864        // Remaining transactions:
865        // * a1, a2
866        // * b1
867        // * c1
868        let pool_limit = SubPoolLimit { max_txs: 4, max_size: usize::MAX };
869
870        // Truncate the pool based on the defined limit.
871        let removed = pool.truncate_pool(pool_limit);
872        pool.assert_invariants();
873        assert_eq!(removed.len(), expected_removed.len());
874
875        // Get the set of removed transactions and compare with the expected set.
876        let removed =
877            removed.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::<HashSet<_>>();
878        assert_eq!(removed, expected_removed);
879
880        // Retrieve the current pending transactions after truncation.
881        let pending = pool.all().collect::<Vec<_>>();
882        assert_eq!(pending.len(), expected_pending.len());
883
884        // Get the set of pending transactions and compare with the expected set.
885        let pending =
886            pending.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::<HashSet<_>>();
887        assert_eq!(pending, expected_pending);
888    }
889
890    // <https://github.com/paradigmxyz/reth/issues/12340>
891    #[test]
892    fn test_eligible_updates_promoted() {
893        let mut pool = PendingPool::new(MockOrdering::default());
894        let mut f = MockTransactionFactory::default();
895
896        let num_senders = 10;
897
898        let first_txs: Vec<_> = (0..num_senders) //
899            .map(|_| MockTransaction::eip1559())
900            .collect();
901        let second_txs: Vec<_> =
902            first_txs.iter().map(|tx| tx.clone().rng_hash().inc_nonce()).collect();
903
904        for tx in first_txs {
905            let valid_tx = f.validated(tx);
906            pool.add_transaction(Arc::new(valid_tx), 0);
907        }
908
909        let mut best = pool.best();
910
911        for _ in 0..num_senders {
912            if let Some(tx) = best.next() {
913                assert_eq!(tx.nonce(), 0);
914            } else {
915                panic!("cannot read one of first_txs");
916            }
917        }
918
919        for tx in second_txs {
920            let valid_tx = f.validated(tx);
921            pool.add_transaction(Arc::new(valid_tx), 0);
922        }
923
924        for _ in 0..num_senders {
925            if let Some(tx) = best.next() {
926                assert_eq!(tx.nonce(), 1);
927            } else {
928                panic!("cannot read one of second_txs");
929            }
930        }
931    }
932
933    #[test]
934    fn test_empty_pool_behavior() {
935        let mut pool = PendingPool::<MockOrdering>::new(MockOrdering::default());
936
937        // Ensure the pool is empty
938        assert!(pool.is_empty());
939        assert_eq!(pool.len(), 0);
940        assert_eq!(pool.size(), 0);
941
942        // Verify that attempting to truncate an empty pool does not panic and returns an empty vec
943        let removed = pool.truncate_pool(SubPoolLimit { max_txs: 10, max_size: 1000 });
944        assert!(removed.is_empty());
945
946        // Verify that retrieving transactions from an empty pool yields nothing
947        assert!(pool.all().next().is_none());
948    }
949
950    #[test]
951    fn test_add_remove_transaction() {
952        let mut f = MockTransactionFactory::default();
953        let mut pool = PendingPool::new(MockOrdering::default());
954
955        // Add a transaction and check if it's in the pool
956        let tx = f.validated_arc(MockTransaction::eip1559());
957        pool.add_transaction(tx.clone(), 0);
958        assert!(pool.contains(tx.id()));
959        assert_eq!(pool.len(), 1);
960
961        // Remove the transaction and ensure it's no longer in the pool
962        let removed_tx = pool.remove_transaction(tx.id()).unwrap();
963        assert_eq!(removed_tx.id(), tx.id());
964        assert!(!pool.contains(tx.id()));
965        assert_eq!(pool.len(), 0);
966    }
967
968    #[test]
969    fn test_reorder_on_basefee_update() {
970        let mut f = MockTransactionFactory::default();
971        let mut pool = PendingPool::new(MockOrdering::default());
972
973        // Add two transactions with different fees
974        let tx1 = f.validated_arc(MockTransaction::eip1559().inc_price());
975        let tx2 = f.validated_arc(MockTransaction::eip1559().inc_price_by(20));
976        pool.add_transaction(tx1.clone(), 0);
977        pool.add_transaction(tx2.clone(), 0);
978
979        // Ensure the transactions are in the correct order
980        let mut best = pool.best();
981        assert_eq!(best.next().unwrap().hash(), tx2.hash());
982        assert_eq!(best.next().unwrap().hash(), tx1.hash());
983
984        // Update the base fee to a value higher than tx1's fee, causing it to be removed
985        let removed = pool.update_base_fee((tx1.max_fee_per_gas() + 1) as u64);
986        assert_eq!(removed.len(), 1);
987        assert_eq!(removed[0].hash(), tx1.hash());
988
989        // Verify that only tx2 remains in the pool
990        assert_eq!(pool.len(), 1);
991        assert!(pool.contains(tx2.id()));
992        assert!(!pool.contains(tx1.id()));
993    }
994
995    #[test]
996    #[cfg(debug_assertions)]
997    #[should_panic(expected = "transaction already included")]
998    fn test_handle_duplicates() {
999        let mut f = MockTransactionFactory::default();
1000        let mut pool = PendingPool::new(MockOrdering::default());
1001
1002        // Add the same transaction twice and ensure it only appears once
1003        let tx = f.validated_arc(MockTransaction::eip1559());
1004        pool.add_transaction(tx.clone(), 0);
1005        assert!(pool.contains(tx.id()));
1006        assert_eq!(pool.len(), 1);
1007
1008        // Attempt to add the same transaction again, which should be ignored
1009        pool.add_transaction(tx, 0);
1010    }
1011
1012    #[test]
1013    fn test_update_blob_fee() {
1014        let mut f = MockTransactionFactory::default();
1015        let mut pool = PendingPool::new(MockOrdering::default());
1016
1017        // Add transactions with varying blob fees
1018        let tx1 = f.validated_arc(MockTransaction::eip4844().set_blob_fee(50).clone());
1019        let tx2 = f.validated_arc(MockTransaction::eip4844().set_blob_fee(150).clone());
1020        pool.add_transaction(tx1.clone(), 0);
1021        pool.add_transaction(tx2.clone(), 0);
1022
1023        // Update the blob fee to a value that causes tx1 to be removed
1024        let removed = pool.update_blob_fee(100);
1025        assert_eq!(removed.len(), 1);
1026        assert_eq!(removed[0].hash(), tx1.hash());
1027
1028        // Verify that only tx2 remains in the pool
1029        assert!(pool.contains(tx2.id()));
1030        assert!(!pool.contains(tx1.id()));
1031    }
1032
1033    #[test]
1034    fn local_senders_tracking() {
1035        let mut f = MockTransactionFactory::default();
1036        let mut pool = PendingPool::new(MockOrdering::default());
1037
1038        // Addresses for simulated senders A, B, C
1039        let a = address!("0x000000000000000000000000000000000000000a");
1040        let b = address!("0x000000000000000000000000000000000000000b");
1041        let c = address!("0x000000000000000000000000000000000000000c");
1042
1043        // sender A (local) - 11+ transactions (large enough to keep limit exceeded)
1044        // sender B (external) - 2 transactions
1045        // sender C (external) - 2 transactions
1046
1047        // Create transaction chains for senders A, B, C
1048        let a_txs = MockTransactionSet::sequential_transactions_by_sender(a, 11, TxType::Eip1559);
1049        let b_txs = MockTransactionSet::sequential_transactions_by_sender(b, 2, TxType::Eip1559);
1050        let c_txs = MockTransactionSet::sequential_transactions_by_sender(c, 2, TxType::Eip1559);
1051
1052        // create local txs for sender A
1053        for tx in a_txs.into_vec() {
1054            let final_tx = Arc::new(f.validated_with_origin(crate::TransactionOrigin::Local, tx));
1055
1056            pool.add_transaction(final_tx, 0);
1057        }
1058
1059        // create external txs for senders B and C
1060        let remaining_txs = [b_txs.into_vec(), c_txs.into_vec()].concat();
1061        for tx in remaining_txs {
1062            let final_tx = f.validated_arc(tx);
1063
1064            pool.add_transaction(final_tx, 0);
1065        }
1066
1067        // Sanity check, ensuring everything is consistent.
1068        pool.assert_invariants();
1069
1070        let pool_limit = SubPoolLimit { max_txs: 10, max_size: usize::MAX };
1071        pool.truncate_pool(pool_limit);
1072
1073        let sender_a = f.ids.sender_id(&a).unwrap();
1074        let sender_b = f.ids.sender_id(&b).unwrap();
1075        let sender_c = f.ids.sender_id(&c).unwrap();
1076
1077        assert_eq!(pool.get_txs_by_sender(sender_a).len(), 10);
1078        assert!(pool.get_txs_by_sender(sender_b).is_empty());
1079        assert!(pool.get_txs_by_sender(sender_c).is_empty());
1080    }
1081
1082    #[test]
1083    fn test_remove_non_highest_keeps_highest() {
1084        let mut f = MockTransactionFactory::default();
1085        let mut pool = PendingPool::new(MockOrdering::default());
1086        let sender = address!("0x00000000000000000000000000000000000000aa");
1087        let txs = MockTransactionSet::dependent(sender, 0, 3, TxType::Eip1559).into_vec();
1088        for tx in txs {
1089            pool.add_transaction(f.validated_arc(tx), 0);
1090        }
1091        pool.assert_invariants();
1092        let sender_id = f.ids.sender_id(&sender).unwrap();
1093        let mid_id = TransactionId::new(sender_id, 1);
1094        let _ = pool.remove_transaction(&mid_id);
1095        let highest = pool.highest_nonces.get(&sender_id).unwrap();
1096        assert_eq!(highest.transaction.nonce(), 2);
1097        pool.assert_invariants();
1098    }
1099
1100    #[test]
1101    fn test_cascade_removal_recomputes_highest() {
1102        let mut f = MockTransactionFactory::default();
1103        let mut pool = PendingPool::new(MockOrdering::default());
1104        let sender = address!("0x00000000000000000000000000000000000000bb");
1105        let txs = MockTransactionSet::dependent(sender, 0, 4, TxType::Eip1559).into_vec();
1106        for tx in txs {
1107            pool.add_transaction(f.validated_arc(tx), 0);
1108        }
1109        pool.assert_invariants();
1110        let sender_id = f.ids.sender_id(&sender).unwrap();
1111        let id3 = TransactionId::new(sender_id, 3);
1112        let _ = pool.remove_transaction(&id3);
1113        let highest = pool.highest_nonces.get(&sender_id).unwrap();
1114        assert_eq!(highest.transaction.nonce(), 2);
1115        let id2 = TransactionId::new(sender_id, 2);
1116        let _ = pool.remove_transaction(&id2);
1117        let highest = pool.highest_nonces.get(&sender_id).unwrap();
1118        assert_eq!(highest.transaction.nonce(), 1);
1119        pool.assert_invariants();
1120    }
1121
1122    #[test]
1123    fn test_remove_only_tx_clears_highest() {
1124        let mut f = MockTransactionFactory::default();
1125        let mut pool = PendingPool::new(MockOrdering::default());
1126        let sender = address!("0x00000000000000000000000000000000000000cc");
1127        let txs = MockTransactionSet::dependent(sender, 0, 1, TxType::Eip1559).into_vec();
1128        for tx in txs {
1129            pool.add_transaction(f.validated_arc(tx), 0);
1130        }
1131        pool.assert_invariants();
1132        let sender_id = f.ids.sender_id(&sender).unwrap();
1133        let id0 = TransactionId::new(sender_id, 0);
1134        let _ = pool.remove_transaction(&id0);
1135        assert!(!pool.highest_nonces.contains_key(&sender_id));
1136        pool.assert_invariants();
1137    }
1138}