Skip to main content

reth_transaction_pool/pool/
pending.rs

1//! Pending transactions
2
3use crate::{
4    identifier::{SenderId, TransactionId},
5    pool::{
6        best::{BestTransactions, BestTransactionsWithFees},
7        size::SizeTracker,
8    },
9    Priority, SubPoolLimit, TransactionOrdering, ValidPoolTransaction,
10};
11use imbl::OrdMap;
12use rustc_hash::{FxHashMap, FxHashSet};
13use std::{cmp::Ordering, collections::hash_map::Entry, ops::Bound::Unbounded, sync::Arc};
14use tokio::sync::broadcast;
15
16/// A pool of validated and gapless transactions that are ready to be executed on the current state
17/// and are waiting to be included in a block.
18///
19/// This pool distinguishes between `independent` transactions and pending transactions. A
20/// transaction is `independent`, if it is in the pending pool, and it has the current on chain
21/// nonce of the sender. Meaning `independent` transactions can be executed right away, other
22/// pending transactions depend on at least one `independent` transaction.
23///
24/// Once an `independent` transaction was executed it *unlocks* the next nonce, if this transaction
25/// is also pending, then this will be moved to the `independent` queue.
26#[derive(Debug, Clone)]
27pub struct PendingPool<T: TransactionOrdering> {
28    /// How to order transactions.
29    ordering: T,
30    /// Keeps track of transactions inserted in the pool.
31    ///
32    /// This way we can determine when transactions were submitted to the pool.
33    submission_id: u64,
34    /// _All_ Transactions that are currently inside the pool grouped by their identifier.
35    by_id: OrdMap<TransactionId, PendingTransaction<T>>,
36    /// The highest nonce transactions for each sender - like the `independent` set, but the
37    /// highest instead of lowest nonce.
38    highest_nonces: FxHashMap<SenderId, PendingTransaction<T>>,
39    /// Independent transactions that can be included directly and don't require other
40    /// transactions.
41    independent_transactions: FxHashMap<SenderId, PendingTransaction<T>>,
42    /// Keeps track of the size of this pool.
43    ///
44    /// See also [`reth_primitives_traits::InMemorySize::size`].
45    size_of: SizeTracker,
46    /// Used to broadcast new transactions that have been added to the `PendingPool` to existing
47    /// `static_files` of this pool.
48    new_transaction_notifier: broadcast::Sender<PendingTransaction<T>>,
49}
50
51// === impl PendingPool ===
52
53impl<T: TransactionOrdering> PendingPool<T> {
54    /// Create a new pending pool instance.
55    pub fn new(ordering: T) -> Self {
56        Self::with_buffer(ordering, 200)
57    }
58
59    /// Create a new pool instance with the given buffer capacity.
60    pub fn with_buffer(ordering: T, buffer_capacity: usize) -> Self {
61        let (new_transaction_notifier, _) = broadcast::channel(buffer_capacity);
62        Self {
63            ordering,
64            submission_id: 0,
65            by_id: Default::default(),
66            independent_transactions: Default::default(),
67            highest_nonces: Default::default(),
68            size_of: Default::default(),
69            new_transaction_notifier,
70        }
71    }
72
73    /// Clear all transactions from the pool without resetting other values.
74    /// Used for atomic reordering during basefee update.
75    ///
76    /// # Returns
77    ///
78    /// Returns all transactions by id.
79    fn clear_transactions(&mut self) -> OrdMap<TransactionId, PendingTransaction<T>> {
80        self.independent_transactions.clear();
81        self.highest_nonces.clear();
82        self.size_of.reset();
83        std::mem::take(&mut self.by_id)
84    }
85
86    /// Returns an iterator over all transactions that are _currently_ ready.
87    ///
88    /// 1. The iterator _always_ returns transactions in order: it never returns a transaction with
89    ///    an unsatisfied dependency and only returns them if dependency transaction were yielded
90    ///    previously. In other words: the nonces of transactions with the same sender will _always_
91    ///    increase by exactly 1.
92    ///
93    /// The order of transactions which satisfy (1.) is determined by their computed priority: a
94    /// transaction with a higher priority is returned before a transaction with a lower priority.
95    ///
96    /// If two transactions have the same priority score, then the transactions which spent more
97    /// time in pool (were added earlier) are returned first.
98    ///
99    /// NOTE: while this iterator returns transaction that pool considers valid at this point, they
100    /// could potentially become invalid at point of execution. Therefore, this iterator
101    /// provides a way to mark transactions that the consumer of this iterator considers invalid. In
102    /// which case the transaction's subgraph is also automatically marked invalid, See (1.).
103    /// Invalid transactions are skipped.
104    pub fn best(&self) -> BestTransactions<T> {
105        BestTransactions {
106            all: self.by_id.clone(),
107            independent: self.independent_transactions.values().cloned().collect(),
108            invalid: Default::default(),
109            new_transaction_receiver: Some(self.new_transaction_notifier.subscribe()),
110            last_priority: None,
111            skip_blobs: false,
112        }
113    }
114
115    /// Same as `best` but only returns transactions that satisfy the given basefee and blobfee.
116    pub(crate) fn best_with_basefee_and_blobfee(
117        &self,
118        base_fee: u64,
119        base_fee_per_blob_gas: u64,
120    ) -> BestTransactionsWithFees<T> {
121        BestTransactionsWithFees { best: self.best(), base_fee, base_fee_per_blob_gas }
122    }
123
124    /// Same as `best` but also includes the given unlocked transactions.
125    ///
126    /// This mimics the [`Self::add_transaction`] method, but does not insert the transactions into
127    /// pool but only into the returned iterator.
128    ///
129    /// Note: this does not insert the unlocked transactions into the pool.
130    ///
131    /// # Panics
132    ///
133    /// if the transaction is already included
134    pub(crate) fn best_with_unlocked_and_attributes(
135        &self,
136        unlocked: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
137        base_fee: u64,
138        base_fee_per_blob_gas: u64,
139    ) -> BestTransactionsWithFees<T> {
140        let mut best = self.best();
141        for (submission_id, tx) in (self.submission_id + 1..).zip(unlocked) {
142            debug_assert!(!best.all.contains_key(tx.id()), "transaction already included");
143            let priority = self.ordering.priority(&tx.transaction, base_fee);
144            let tx_id = *tx.id();
145            let transaction = PendingTransaction { submission_id, transaction: tx, priority };
146            if best.ancestor(&tx_id).is_none() {
147                best.independent.insert(transaction.clone());
148            }
149            best.all.insert(tx_id, transaction);
150        }
151
152        BestTransactionsWithFees { best, base_fee, base_fee_per_blob_gas }
153    }
154
155    /// Returns an iterator over all transactions in the pool
156    pub(crate) fn all(
157        &self,
158    ) -> impl ExactSizeIterator<Item = Arc<ValidPoolTransaction<T::Transaction>>> + '_ {
159        self.by_id.values().map(|tx| tx.transaction.clone())
160    }
161
162    /// Updates the pool with the new blob fee. Removes
163    /// from the subpool all transactions and their dependents that no longer satisfy the given
164    /// blob fee (`tx.max_blob_fee < blob_fee`).
165    ///
166    /// Note: the transactions are not returned in a particular order.
167    ///
168    /// # Returns
169    ///
170    /// Removed transactions that no longer satisfy the blob fee.
171    pub(crate) fn update_blob_fee(
172        &mut self,
173        blob_fee: u128,
174    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
175        // Create a collection for removed transactions.
176        let mut removed = Vec::new();
177
178        // Drain and iterate over all transactions.
179        let mut transactions_iter = self.clear_transactions().into_iter().peekable();
180        while let Some((id, tx)) = transactions_iter.next() {
181            if tx.transaction.is_eip4844() && tx.transaction.max_fee_per_blob_gas() < Some(blob_fee)
182            {
183                // Add this tx to the removed collection since it no longer satisfies the blob fee
184                // condition. Decrease the total pool size.
185                removed.push(Arc::clone(&tx.transaction));
186
187                // Remove all dependent transactions.
188                'this: while let Some((next_id, next_tx)) = transactions_iter.peek() {
189                    if next_id.sender != id.sender {
190                        break 'this
191                    }
192                    removed.push(Arc::clone(&next_tx.transaction));
193                    transactions_iter.next();
194                }
195            } else {
196                self.size_of += tx.transaction.size();
197                self.update_independents_and_highest_nonces(&tx);
198                self.by_id.insert(id, tx);
199            }
200        }
201
202        removed
203    }
204
205    /// Updates the pool with the new base fee. Reorders transactions by new priorities. Removes
206    /// from the subpool all transactions and their dependents that no longer satisfy the given
207    /// base fee (`tx.fee < base_fee`).
208    ///
209    /// Note: the transactions are not returned in a particular order.
210    ///
211    /// # Returns
212    ///
213    /// Removed transactions that no longer satisfy the base fee.
214    pub(crate) fn update_base_fee(
215        &mut self,
216        base_fee: u64,
217    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
218        // Create a collection for removed transactions.
219        let mut removed = Vec::new();
220
221        // Drain and iterate over all transactions.
222        let mut transactions_iter = self.clear_transactions().into_iter().peekable();
223        while let Some((id, mut tx)) = transactions_iter.next() {
224            if tx.transaction.max_fee_per_gas() < base_fee as u128 {
225                // Add this tx to the removed collection since it no longer satisfies the base fee
226                // condition. Decrease the total pool size.
227                removed.push(Arc::clone(&tx.transaction));
228
229                // Remove all dependent transactions.
230                'this: while let Some((next_id, next_tx)) = transactions_iter.peek() {
231                    if next_id.sender != id.sender {
232                        break 'this
233                    }
234                    removed.push(Arc::clone(&next_tx.transaction));
235                    transactions_iter.next();
236                }
237            } else {
238                // Re-insert the transaction with new priority.
239                tx.priority = self.ordering.priority(&tx.transaction.transaction, base_fee);
240
241                self.size_of += tx.transaction.size();
242                self.update_independents_and_highest_nonces(&tx);
243                self.by_id.insert(id, tx);
244            }
245        }
246
247        removed
248    }
249
250    /// Updates the independent transaction and highest nonces set, assuming the given transaction
251    /// is being _added_ to the pool.
252    fn update_independents_and_highest_nonces(&mut self, tx: &PendingTransaction<T>) {
253        match self.highest_nonces.entry(tx.transaction.sender_id()) {
254            Entry::Occupied(mut entry) => {
255                if entry.get().transaction.nonce() < tx.transaction.nonce() {
256                    *entry.get_mut() = tx.clone();
257                }
258            }
259            Entry::Vacant(entry) => {
260                entry.insert(tx.clone());
261            }
262        }
263        match self.independent_transactions.entry(tx.transaction.sender_id()) {
264            Entry::Occupied(mut entry) => {
265                if entry.get().transaction.nonce() > tx.transaction.nonce() {
266                    *entry.get_mut() = tx.clone();
267                }
268            }
269            Entry::Vacant(entry) => {
270                entry.insert(tx.clone());
271            }
272        }
273    }
274
275    /// Adds a new transactions to the pending queue.
276    ///
277    /// # Panics
278    ///
279    /// if the transaction is already included
280    pub fn add_transaction(
281        &mut self,
282        tx: Arc<ValidPoolTransaction<T::Transaction>>,
283        base_fee: u64,
284    ) {
285        debug_assert!(
286            !self.contains(tx.id()),
287            "transaction already included {:?}",
288            self.get(tx.id()).unwrap().transaction
289        );
290
291        // keep track of size
292        self.size_of += tx.size();
293
294        let tx_id = *tx.id();
295
296        let submission_id = self.next_id();
297        let priority = self.ordering.priority(&tx.transaction, base_fee);
298        let tx = PendingTransaction { submission_id, transaction: tx, priority };
299
300        self.update_independents_and_highest_nonces(&tx);
301
302        // send the new transaction to any existing pendingpool static file iterators
303        if self.new_transaction_notifier.receiver_count() > 0 {
304            let _ = self.new_transaction_notifier.send(tx.clone());
305        }
306
307        self.by_id.insert(tx_id, tx);
308    }
309
310    /// Removes the transaction from the pool.
311    ///
312    /// Note: If the transaction has a descendant transaction
313    /// it will advance it to the best queue.
314    pub(crate) fn remove_transaction(
315        &mut self,
316        id: &TransactionId,
317    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
318        if let Some(lowest) = self.independent_transactions.get(&id.sender) &&
319            lowest.transaction.nonce() == id.nonce
320        {
321            self.independent_transactions.remove(&id.sender);
322            // mark the next as independent if it exists
323            if let Some(unlocked) = self.get(&id.descendant()) {
324                self.independent_transactions.insert(id.sender, unlocked.clone());
325            }
326        }
327
328        let tx = self.by_id.remove(id)?;
329        self.size_of -= tx.transaction.size();
330
331        match self.highest_nonces.entry(id.sender) {
332            Entry::Occupied(mut entry) => {
333                if entry.get().transaction.nonce() == id.nonce {
334                    // we just removed the tx with the highest nonce for this sender, find the
335                    // highest remaining tx from that sender
336                    if let Some((_, new_highest)) = self
337                        .by_id
338                        .range((
339                            id.sender.start_bound(),
340                            std::ops::Bound::Included(TransactionId::new(id.sender, u64::MAX)),
341                        ))
342                        .last()
343                    {
344                        // insert the new highest nonce for this sender
345                        entry.insert(new_highest.clone());
346                    } else {
347                        entry.remove();
348                    }
349                }
350            }
351            Entry::Vacant(_) => {
352                debug_assert!(
353                    false,
354                    "removed transaction without a tracked highest nonce {:?}",
355                    id
356                );
357            }
358        }
359
360        Some(tx.transaction)
361    }
362
363    const fn next_id(&mut self) -> u64 {
364        let id = self.submission_id;
365        self.submission_id = self.submission_id.wrapping_add(1);
366        id
367    }
368
369    /// Traverses the pool, starting at the highest nonce set, removing the transactions which
370    /// would put the pool under the specified limits.
371    ///
372    /// This attempts to remove transactions by roughly the same amount for each sender. This is
373    /// done by removing the highest-nonce transactions for each sender.
374    ///
375    /// If the `remove_locals` flag is unset, transactions will be removed per-sender until a
376    /// local transaction is the highest nonce transaction for that sender. If all senders have a
377    /// local highest-nonce transaction, the pool will not be truncated further.
378    ///
379    /// Otherwise, if the `remove_locals` flag is set, transactions will be removed per-sender
380    /// until the pool is under the given limits.
381    ///
382    /// Any removed transactions will be added to the `end_removed` vector.
383    pub fn remove_to_limit(
384        &mut self,
385        limit: &SubPoolLimit,
386        remove_locals: bool,
387        end_removed: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
388    ) {
389        // This serves as a termination condition for the loop - it represents the number of
390        // _valid_ unique senders that might have descendants in the pool.
391        //
392        // If `remove_locals` is false, a value of zero means that there are no non-local txs in the
393        // pool that can be removed.
394        //
395        // If `remove_locals` is true, a value of zero means that there are no txs in the pool that
396        // can be removed.
397        let mut non_local_senders = self.highest_nonces.len();
398
399        // keeps track of unique senders from previous iterations, to understand how many unique
400        // senders were removed in the last iteration
401        let mut unique_senders = self.highest_nonces.len();
402
403        // keeps track of which senders we've marked as local
404        let mut local_senders = FxHashSet::default();
405
406        // keep track of transactions to remove and how many have been removed so far
407        let original_length = self.len();
408        let mut removed = Vec::new();
409        let mut total_removed = 0;
410
411        // track total `size` of transactions to remove
412        let original_size = self.size();
413        let mut total_size = 0;
414
415        loop {
416            // check how many unique senders were removed last iteration
417            let unique_removed = unique_senders - self.highest_nonces.len();
418
419            // the new number of unique senders
420            unique_senders = self.highest_nonces.len();
421            non_local_senders -= unique_removed;
422
423            // we can reuse the temp array
424            removed.clear();
425
426            // we prefer removing transactions with lower ordering
427            let mut worst_transactions = self.highest_nonces.values().collect::<Vec<_>>();
428
429            // Each pass removes at most one transaction per sender (its highest nonce), so only
430            // the worst few senders can be relevant in this pass. Selecting them is O(n)
431            // instead of sorting all senders. The estimate may fall short for size-based limits
432            // or skipped local senders, in which case the outer loop runs another pass.
433            let current_len = original_length - total_removed;
434            let current_size = original_size - total_size;
435            let excess_txs = current_len.saturating_sub(limit.max_txs);
436            let avg_tx_size = (current_size / current_len.max(1)).max(1);
437            let excess_size_txs = current_size.saturating_sub(limit.max_size).div_ceil(avg_tx_size);
438            // Number of worst senders to consider for removal in this pass: enough to cover the
439            // count and (estimated) size excess, widened by known local senders since those are
440            // skipped below.
441            let removal_candidates = excess_txs.max(excess_size_txs).max(1) + local_senders.len();
442
443            if removal_candidates < worst_transactions.len() {
444                // keep only the `removal_candidates` worst senders, in O(n) without a full sort
445                worst_transactions.select_nth_unstable(removal_candidates);
446                worst_transactions.truncate(removal_candidates);
447            }
448            worst_transactions.sort_unstable();
449
450            // loop through the highest nonces set, removing transactions until we reach the limit
451            for tx in worst_transactions {
452                // return early if the pool is under limits
453                if !limit.is_exceeded(original_length - total_removed, original_size - total_size) ||
454                    non_local_senders == 0
455                {
456                    // need to remove remaining transactions before exiting
457                    for id in &removed {
458                        if let Some(tx) = self.remove_transaction(id) {
459                            end_removed.push(tx);
460                        }
461                    }
462
463                    return
464                }
465
466                if !remove_locals && tx.transaction.is_local() {
467                    let sender_id = tx.transaction.sender_id();
468                    if local_senders.insert(sender_id) {
469                        non_local_senders -= 1;
470                    }
471                    continue
472                }
473
474                total_size += tx.transaction.size();
475                total_removed += 1;
476                removed.push(*tx.transaction.id());
477            }
478
479            // remove the transactions from this iteration
480            for id in &removed {
481                if let Some(tx) = self.remove_transaction(id) {
482                    end_removed.push(tx);
483                }
484            }
485
486            // return if either the pool is under limits or there are no more _eligible_
487            // transactions to remove
488            if !self.exceeds(limit) || non_local_senders == 0 {
489                return
490            }
491        }
492    }
493
494    /// Truncates the pool to the given [`SubPoolLimit`], removing transactions until the subpool
495    /// limits are met.
496    ///
497    /// This attempts to remove transactions by roughly the same amount for each sender. For more
498    /// information on this exact process see docs for
499    /// [`remove_to_limit`](PendingPool::remove_to_limit).
500    ///
501    /// This first truncates all of the non-local transactions in the pool. If the subpool is still
502    /// not under the limit, this truncates the entire pool, including non-local transactions. The
503    /// removed transactions are returned.
504    pub fn truncate_pool(
505        &mut self,
506        limit: SubPoolLimit,
507    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
508        let mut removed = Vec::new();
509        // return early if the pool is already under the limits
510        if !self.exceeds(&limit) {
511            return removed
512        }
513
514        // first truncate only non-local transactions, returning if the pool end up under the limit
515        self.remove_to_limit(&limit, false, &mut removed);
516        if !self.exceeds(&limit) {
517            return removed
518        }
519
520        // now repeat for local transactions, since local transactions must be removed now for the
521        // pool to be under the limit
522        self.remove_to_limit(&limit, true, &mut removed);
523
524        removed
525    }
526
527    /// Returns true if the pool exceeds the given limit
528    #[inline]
529    pub(crate) fn exceeds(&self, limit: &SubPoolLimit) -> bool {
530        limit.is_exceeded(self.len(), self.size())
531    }
532
533    /// The reported size of all transactions in this pool.
534    pub(crate) fn size(&self) -> usize {
535        self.size_of.into()
536    }
537
538    /// Number of transactions in the entire pool
539    pub(crate) fn len(&self) -> usize {
540        self.by_id.len()
541    }
542
543    /// All transactions grouped by id
544    pub const fn by_id(&self) -> &OrdMap<TransactionId, PendingTransaction<T>> {
545        &self.by_id
546    }
547
548    /// Independent transactions
549    pub const fn independent_transactions(&self) -> &FxHashMap<SenderId, PendingTransaction<T>> {
550        &self.independent_transactions
551    }
552
553    /// Subscribes to new transactions
554    pub fn new_transaction_receiver(&self) -> broadcast::Receiver<PendingTransaction<T>> {
555        self.new_transaction_notifier.subscribe()
556    }
557
558    /// Whether the pool is empty
559    #[cfg(test)]
560    pub(crate) fn is_empty(&self) -> bool {
561        self.by_id.is_empty()
562    }
563
564    /// Returns `true` if the transaction with the given id is already included in this pool.
565    pub(crate) fn contains(&self, id: &TransactionId) -> bool {
566        self.by_id.contains_key(id)
567    }
568
569    /// Get transactions by sender
570    pub(crate) fn get_txs_by_sender(&self, sender: SenderId) -> Vec<TransactionId> {
571        self.iter_txs_by_sender(sender).copied().collect()
572    }
573
574    /// Returns an iterator over all transaction with the sender id
575    pub(crate) fn iter_txs_by_sender(
576        &self,
577        sender: SenderId,
578    ) -> impl Iterator<Item = &TransactionId> + '_ {
579        self.by_id
580            .range((sender.start_bound(), Unbounded))
581            .take_while(move |(other, _)| sender == other.sender)
582            .map(|(tx_id, _)| tx_id)
583    }
584
585    /// Returns all transactions for the given sender, using a `BTree` range query.
586    pub(crate) fn txs_by_sender(
587        &self,
588        sender: SenderId,
589    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
590        self.by_id
591            .range((sender.start_bound(), Unbounded))
592            .take_while(move |(other, _)| sender == other.sender)
593            .map(|(_, tx)| tx.transaction.clone())
594            .collect()
595    }
596
597    /// Retrieves a transaction with the given ID from the pool, if it exists.
598    fn get(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
599        self.by_id.get(id)
600    }
601
602    /// Returns a reference to the independent transactions in the pool
603    #[cfg(test)]
604    pub(crate) const fn independent(&self) -> &FxHashMap<SenderId, PendingTransaction<T>> {
605        &self.independent_transactions
606    }
607
608    /// Asserts that the bijection between `by_id` and `all` is valid.
609    #[cfg(any(test, feature = "test-utils"))]
610    pub(crate) fn assert_invariants(&self) {
611        assert!(
612            self.independent_transactions.len() <= self.by_id.len(),
613            "independent_transactions.len() > by_id.len()"
614        );
615        assert!(
616            self.highest_nonces.len() <= self.by_id.len(),
617            "highest_nonces.len() > by_id.len()"
618        );
619        assert_eq!(
620            self.highest_nonces.len(),
621            self.independent_transactions.len(),
622            "highest_nonces.len() != independent_transactions.len()"
623        );
624    }
625}
626
627/// A transaction that is ready to be included in a block.
628#[derive(Debug)]
629pub struct PendingTransaction<T: TransactionOrdering> {
630    /// Identifier that tags when transaction was submitted in the pool.
631    pub submission_id: u64,
632    /// Actual transaction.
633    pub transaction: Arc<ValidPoolTransaction<T::Transaction>>,
634    /// The priority value assigned by the used `Ordering` function.
635    pub priority: Priority<T::PriorityValue>,
636}
637
638impl<T: TransactionOrdering> PendingTransaction<T> {
639    /// The next transaction of the sender: `nonce + 1`
640    pub fn unlocks(&self) -> TransactionId {
641        self.transaction.transaction_id.descendant()
642    }
643}
644
645impl<T: TransactionOrdering> Clone for PendingTransaction<T> {
646    fn clone(&self) -> Self {
647        Self {
648            submission_id: self.submission_id,
649            transaction: Arc::clone(&self.transaction),
650            priority: self.priority.clone(),
651        }
652    }
653}
654
655impl<T: TransactionOrdering> Eq for PendingTransaction<T> {}
656
657impl<T: TransactionOrdering> PartialEq<Self> for PendingTransaction<T> {
658    fn eq(&self, other: &Self) -> bool {
659        self.cmp(other) == Ordering::Equal
660    }
661}
662
663impl<T: TransactionOrdering> PartialOrd<Self> for PendingTransaction<T> {
664    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
665        Some(self.cmp(other))
666    }
667}
668
669impl<T: TransactionOrdering> Ord for PendingTransaction<T> {
670    fn cmp(&self, other: &Self) -> Ordering {
671        // This compares by `priority` and only if two tx have the exact same priority this compares
672        // the unique `submission_id`. This ensures that transactions with same priority are not
673        // equal, so they're not replaced in the set
674        self.priority
675            .cmp(&other.priority)
676            .then_with(|| other.submission_id.cmp(&self.submission_id))
677    }
678}
679
680#[cfg(test)]
681mod tests {
682    use super::*;
683    use crate::{
684        test_utils::{MockOrdering, MockTransaction, MockTransactionFactory, MockTransactionSet},
685        PoolTransaction,
686    };
687    use alloy_consensus::{Transaction, TxType};
688    use alloy_primitives::address;
689    use std::collections::HashSet;
690
691    #[test]
692    fn test_enforce_basefee() {
693        let mut f = MockTransactionFactory::default();
694        let mut pool = PendingPool::new(MockOrdering::default());
695        let tx = f.validated_arc(MockTransaction::eip1559().inc_price());
696        pool.add_transaction(tx.clone(), 0);
697
698        assert!(pool.contains(tx.id()));
699        assert_eq!(pool.len(), 1);
700
701        let removed = pool.update_base_fee(0);
702        assert!(removed.is_empty());
703
704        let removed = pool.update_base_fee((tx.max_fee_per_gas() + 1) as u64);
705        assert_eq!(removed.len(), 1);
706        assert!(pool.is_empty());
707    }
708
709    #[test]
710    fn test_enforce_basefee_descendant() {
711        let mut f = MockTransactionFactory::default();
712        let mut pool = PendingPool::new(MockOrdering::default());
713        let t = MockTransaction::eip1559().inc_price_by(10);
714        let root_tx = f.validated_arc(t.clone());
715        pool.add_transaction(root_tx.clone(), 0);
716
717        let descendant_tx = f.validated_arc(t.inc_nonce().decr_price());
718        pool.add_transaction(descendant_tx.clone(), 0);
719
720        assert!(pool.contains(root_tx.id()));
721        assert!(pool.contains(descendant_tx.id()));
722        assert_eq!(pool.len(), 2);
723
724        assert_eq!(pool.independent_transactions.len(), 1);
725        assert_eq!(pool.highest_nonces.len(), 1);
726
727        let removed = pool.update_base_fee(0);
728        assert!(removed.is_empty());
729
730        // two dependent tx in the pool with decreasing fee
731
732        {
733            let mut pool2 = pool.clone();
734            let removed = pool2.update_base_fee((descendant_tx.max_fee_per_gas() + 1) as u64);
735            assert_eq!(removed.len(), 1);
736            assert_eq!(pool2.len(), 1);
737            // descendant got popped
738            assert!(pool2.contains(root_tx.id()));
739            assert!(!pool2.contains(descendant_tx.id()));
740        }
741
742        // remove root transaction via fee
743        let removed = pool.update_base_fee((root_tx.max_fee_per_gas() + 1) as u64);
744        assert_eq!(removed.len(), 2);
745        assert!(pool.is_empty());
746        pool.assert_invariants();
747    }
748
749    #[test]
750    fn evict_worst() {
751        let mut f = MockTransactionFactory::default();
752        let mut pool = PendingPool::new(MockOrdering::default());
753
754        let t = MockTransaction::eip1559();
755        pool.add_transaction(f.validated_arc(t.clone()), 0);
756
757        let t2 = MockTransaction::eip1559().inc_price_by(10);
758        pool.add_transaction(f.validated_arc(t2), 0);
759
760        // First transaction should be evicted.
761        assert_eq!(
762            pool.highest_nonces.values().min().map(|tx| *tx.transaction.hash()),
763            Some(*t.hash())
764        );
765
766        // truncate pool with max size = 1, ensure it's the same transaction
767        let removed = pool.truncate_pool(SubPoolLimit { max_txs: 1, max_size: usize::MAX });
768        assert_eq!(removed.len(), 1);
769        assert_eq!(removed[0].hash(), t.hash());
770    }
771
772    #[test]
773    fn correct_independent_descendants() {
774        // this test ensures that we set the right highest nonces set for each sender
775        let mut f = MockTransactionFactory::default();
776        let mut pool = PendingPool::new(MockOrdering::default());
777
778        let a_sender = address!("0x000000000000000000000000000000000000000a");
779        let b_sender = address!("0x000000000000000000000000000000000000000b");
780        let c_sender = address!("0x000000000000000000000000000000000000000c");
781        let d_sender = address!("0x000000000000000000000000000000000000000d");
782
783        // create a chain of transactions by sender A, B, C
784        let mut tx_set = MockTransactionSet::dependent(a_sender, 0, 4, TxType::Eip1559);
785        let a = tx_set.clone().into_vec();
786
787        let b = MockTransactionSet::dependent(b_sender, 0, 3, TxType::Eip1559).into_vec();
788        tx_set.extend(b.clone());
789
790        // C has the same number of txs as B
791        let c = MockTransactionSet::dependent(c_sender, 0, 3, TxType::Eip1559).into_vec();
792        tx_set.extend(c.clone());
793
794        let d = MockTransactionSet::dependent(d_sender, 0, 1, TxType::Eip1559).into_vec();
795        tx_set.extend(d.clone());
796
797        // add all the transactions to the pool
798        let all_txs = tx_set.into_vec();
799        for tx in all_txs {
800            pool.add_transaction(f.validated_arc(tx), 0);
801        }
802
803        pool.assert_invariants();
804
805        // the independent set is the roots of each of these tx chains, these are the highest
806        // nonces for each sender
807        let expected_highest_nonces = [d[0].clone(), c[2].clone(), b[2].clone(), a[3].clone()]
808            .iter()
809            .map(|tx| (tx.sender(), tx.nonce()))
810            .collect::<HashSet<_>>();
811        let actual_highest_nonces = pool
812            .highest_nonces
813            .values()
814            .map(|tx| (tx.transaction.sender(), tx.transaction.nonce()))
815            .collect::<HashSet<_>>();
816        assert_eq!(expected_highest_nonces, actual_highest_nonces);
817        pool.assert_invariants();
818    }
819
820    #[test]
821    fn truncate_by_sender() {
822        // This test ensures that transactions are removed from the pending pool by sender.
823        let mut f = MockTransactionFactory::default();
824        let mut pool = PendingPool::new(MockOrdering::default());
825
826        // Addresses for simulated senders A, B, C, and D.
827        let a = address!("0x000000000000000000000000000000000000000a");
828        let b = address!("0x000000000000000000000000000000000000000b");
829        let c = address!("0x000000000000000000000000000000000000000c");
830        let d = address!("0x000000000000000000000000000000000000000d");
831
832        // Create transaction chains for senders A, B, C, and D.
833        let a_txs = MockTransactionSet::sequential_transactions_by_sender(a, 4, TxType::Eip1559);
834        let b_txs = MockTransactionSet::sequential_transactions_by_sender(b, 3, TxType::Eip1559);
835        let c_txs = MockTransactionSet::sequential_transactions_by_sender(c, 3, TxType::Eip1559);
836        let d_txs = MockTransactionSet::sequential_transactions_by_sender(d, 1, TxType::Eip1559);
837
838        // Set up expected pending transactions.
839        let expected_pending = vec![
840            a_txs.transactions[0].clone(),
841            b_txs.transactions[0].clone(),
842            c_txs.transactions[0].clone(),
843            a_txs.transactions[1].clone(),
844        ]
845        .into_iter()
846        .map(|tx| (tx.sender(), tx.nonce()))
847        .collect::<HashSet<_>>();
848
849        // Set up expected removed transactions.
850        let expected_removed = vec![
851            d_txs.transactions[0].clone(),
852            c_txs.transactions[2].clone(),
853            b_txs.transactions[2].clone(),
854            a_txs.transactions[3].clone(),
855            c_txs.transactions[1].clone(),
856            b_txs.transactions[1].clone(),
857            a_txs.transactions[2].clone(),
858        ]
859        .into_iter()
860        .map(|tx| (tx.sender(), tx.nonce()))
861        .collect::<HashSet<_>>();
862
863        // Consolidate all transactions into a single vector.
864        let all_txs =
865            [a_txs.into_vec(), b_txs.into_vec(), c_txs.into_vec(), d_txs.into_vec()].concat();
866
867        // Add all the transactions to the pool.
868        for tx in all_txs {
869            pool.add_transaction(f.validated_arc(tx), 0);
870        }
871
872        // Sanity check, ensuring everything is consistent.
873        pool.assert_invariants();
874
875        // Define the maximum total transactions to be 4, removing transactions for each sender.
876        // Expected order of removal:
877        // * d1, c3, b3, a4
878        // * c2, b2, a3
879        //
880        // Remaining transactions:
881        // * a1, a2
882        // * b1
883        // * c1
884        let pool_limit = SubPoolLimit { max_txs: 4, max_size: usize::MAX };
885
886        // Truncate the pool based on the defined limit.
887        let removed = pool.truncate_pool(pool_limit);
888        pool.assert_invariants();
889        assert_eq!(removed.len(), expected_removed.len());
890
891        // Get the set of removed transactions and compare with the expected set.
892        let removed =
893            removed.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::<HashSet<_>>();
894        assert_eq!(removed, expected_removed);
895
896        // Retrieve the current pending transactions after truncation.
897        let pending = pool.all().collect::<Vec<_>>();
898        assert_eq!(pending.len(), expected_pending.len());
899
900        // Get the set of pending transactions and compare with the expected set.
901        let pending =
902            pending.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::<HashSet<_>>();
903        assert_eq!(pending, expected_pending);
904    }
905
906    // <https://github.com/paradigmxyz/reth/issues/12340>
907    #[test]
908    fn test_eligible_updates_promoted() {
909        let mut pool = PendingPool::new(MockOrdering::default());
910        let mut f = MockTransactionFactory::default();
911
912        let num_senders = 10;
913
914        let first_txs: Vec<_> = (0..num_senders) //
915            .map(|_| MockTransaction::eip1559())
916            .collect();
917        let second_txs: Vec<_> =
918            first_txs.iter().map(|tx| tx.clone().rng_hash().inc_nonce()).collect();
919
920        for tx in first_txs {
921            let valid_tx = f.validated(tx);
922            pool.add_transaction(Arc::new(valid_tx), 0);
923        }
924
925        let mut best = pool.best();
926
927        for _ in 0..num_senders {
928            if let Some(tx) = best.next() {
929                assert_eq!(tx.nonce(), 0);
930            } else {
931                panic!("cannot read one of first_txs");
932            }
933        }
934
935        for tx in second_txs {
936            let valid_tx = f.validated(tx);
937            pool.add_transaction(Arc::new(valid_tx), 0);
938        }
939
940        for _ in 0..num_senders {
941            if let Some(tx) = best.next() {
942                assert_eq!(tx.nonce(), 1);
943            } else {
944                panic!("cannot read one of second_txs");
945            }
946        }
947    }
948
949    #[test]
950    fn test_empty_pool_behavior() {
951        let mut pool = PendingPool::<MockOrdering>::new(MockOrdering::default());
952
953        // Ensure the pool is empty
954        assert!(pool.is_empty());
955        assert_eq!(pool.len(), 0);
956        assert_eq!(pool.size(), 0);
957
958        // Verify that attempting to truncate an empty pool does not panic and returns an empty vec
959        let removed = pool.truncate_pool(SubPoolLimit { max_txs: 10, max_size: 1000 });
960        assert!(removed.is_empty());
961
962        // Verify that retrieving transactions from an empty pool yields nothing
963        assert!(pool.all().next().is_none());
964    }
965
966    #[test]
967    fn test_add_remove_transaction() {
968        let mut f = MockTransactionFactory::default();
969        let mut pool = PendingPool::new(MockOrdering::default());
970
971        // Add a transaction and check if it's in the pool
972        let tx = f.validated_arc(MockTransaction::eip1559());
973        pool.add_transaction(tx.clone(), 0);
974        assert!(pool.contains(tx.id()));
975        assert_eq!(pool.len(), 1);
976
977        // Remove the transaction and ensure it's no longer in the pool
978        let removed_tx = pool.remove_transaction(tx.id()).unwrap();
979        assert_eq!(removed_tx.id(), tx.id());
980        assert!(!pool.contains(tx.id()));
981        assert_eq!(pool.len(), 0);
982    }
983
984    #[test]
985    fn test_reorder_on_basefee_update() {
986        let mut f = MockTransactionFactory::default();
987        let mut pool = PendingPool::new(MockOrdering::default());
988
989        // Add two transactions with different fees
990        let tx1 = f.validated_arc(MockTransaction::eip1559().inc_price());
991        let tx2 = f.validated_arc(MockTransaction::eip1559().inc_price_by(20));
992        pool.add_transaction(tx1.clone(), 0);
993        pool.add_transaction(tx2.clone(), 0);
994
995        // Ensure the transactions are in the correct order
996        let mut best = pool.best();
997        assert_eq!(best.next().unwrap().hash(), tx2.hash());
998        assert_eq!(best.next().unwrap().hash(), tx1.hash());
999
1000        // Update the base fee to a value higher than tx1's fee, causing it to be removed
1001        let removed = pool.update_base_fee((tx1.max_fee_per_gas() + 1) as u64);
1002        assert_eq!(removed.len(), 1);
1003        assert_eq!(removed[0].hash(), tx1.hash());
1004
1005        // Verify that only tx2 remains in the pool
1006        assert_eq!(pool.len(), 1);
1007        assert!(pool.contains(tx2.id()));
1008        assert!(!pool.contains(tx1.id()));
1009    }
1010
1011    #[test]
1012    #[cfg(debug_assertions)]
1013    #[should_panic(expected = "transaction already included")]
1014    fn test_handle_duplicates() {
1015        let mut f = MockTransactionFactory::default();
1016        let mut pool = PendingPool::new(MockOrdering::default());
1017
1018        // Add the same transaction twice and ensure it only appears once
1019        let tx = f.validated_arc(MockTransaction::eip1559());
1020        pool.add_transaction(tx.clone(), 0);
1021        assert!(pool.contains(tx.id()));
1022        assert_eq!(pool.len(), 1);
1023
1024        // Attempt to add the same transaction again, which should be ignored
1025        pool.add_transaction(tx, 0);
1026    }
1027
1028    #[test]
1029    fn test_update_blob_fee() {
1030        let mut f = MockTransactionFactory::default();
1031        let mut pool = PendingPool::new(MockOrdering::default());
1032
1033        // Add transactions with varying blob fees
1034        let tx1 = f.validated_arc(MockTransaction::eip4844().set_blob_fee(50).clone());
1035        let tx2 = f.validated_arc(MockTransaction::eip4844().set_blob_fee(150).clone());
1036        pool.add_transaction(tx1.clone(), 0);
1037        pool.add_transaction(tx2.clone(), 0);
1038
1039        // Update the blob fee to a value that causes tx1 to be removed
1040        let removed = pool.update_blob_fee(100);
1041        assert_eq!(removed.len(), 1);
1042        assert_eq!(removed[0].hash(), tx1.hash());
1043
1044        // Verify that only tx2 remains in the pool
1045        assert!(pool.contains(tx2.id()));
1046        assert!(!pool.contains(tx1.id()));
1047    }
1048
1049    #[test]
1050    fn local_senders_tracking() {
1051        let mut f = MockTransactionFactory::default();
1052        let mut pool = PendingPool::new(MockOrdering::default());
1053
1054        // Addresses for simulated senders A, B, C
1055        let a = address!("0x000000000000000000000000000000000000000a");
1056        let b = address!("0x000000000000000000000000000000000000000b");
1057        let c = address!("0x000000000000000000000000000000000000000c");
1058
1059        // sender A (local) - 11+ transactions (large enough to keep limit exceeded)
1060        // sender B (external) - 2 transactions
1061        // sender C (external) - 2 transactions
1062
1063        // Create transaction chains for senders A, B, C
1064        let a_txs = MockTransactionSet::sequential_transactions_by_sender(a, 11, TxType::Eip1559);
1065        let b_txs = MockTransactionSet::sequential_transactions_by_sender(b, 2, TxType::Eip1559);
1066        let c_txs = MockTransactionSet::sequential_transactions_by_sender(c, 2, TxType::Eip1559);
1067
1068        // create local txs for sender A
1069        for tx in a_txs.into_vec() {
1070            let final_tx = Arc::new(f.validated_with_origin(crate::TransactionOrigin::Local, tx));
1071
1072            pool.add_transaction(final_tx, 0);
1073        }
1074
1075        // create external txs for senders B and C
1076        let remaining_txs = [b_txs.into_vec(), c_txs.into_vec()].concat();
1077        for tx in remaining_txs {
1078            let final_tx = f.validated_arc(tx);
1079
1080            pool.add_transaction(final_tx, 0);
1081        }
1082
1083        // Sanity check, ensuring everything is consistent.
1084        pool.assert_invariants();
1085
1086        let pool_limit = SubPoolLimit { max_txs: 10, max_size: usize::MAX };
1087        pool.truncate_pool(pool_limit);
1088
1089        let sender_a = f.ids.sender_id(&a).unwrap();
1090        let sender_b = f.ids.sender_id(&b).unwrap();
1091        let sender_c = f.ids.sender_id(&c).unwrap();
1092
1093        assert_eq!(pool.get_txs_by_sender(sender_a).len(), 10);
1094        assert!(pool.get_txs_by_sender(sender_b).is_empty());
1095        assert!(pool.get_txs_by_sender(sender_c).is_empty());
1096    }
1097
1098    #[test]
1099    fn test_remove_non_highest_keeps_highest() {
1100        let mut f = MockTransactionFactory::default();
1101        let mut pool = PendingPool::new(MockOrdering::default());
1102        let sender = address!("0x00000000000000000000000000000000000000aa");
1103        let txs = MockTransactionSet::dependent(sender, 0, 3, TxType::Eip1559).into_vec();
1104        for tx in txs {
1105            pool.add_transaction(f.validated_arc(tx), 0);
1106        }
1107        pool.assert_invariants();
1108        let sender_id = f.ids.sender_id(&sender).unwrap();
1109        let mid_id = TransactionId::new(sender_id, 1);
1110        let _ = pool.remove_transaction(&mid_id);
1111        let highest = pool.highest_nonces.get(&sender_id).unwrap();
1112        assert_eq!(highest.transaction.nonce(), 2);
1113        pool.assert_invariants();
1114    }
1115
1116    #[test]
1117    fn test_cascade_removal_recomputes_highest() {
1118        let mut f = MockTransactionFactory::default();
1119        let mut pool = PendingPool::new(MockOrdering::default());
1120        let sender = address!("0x00000000000000000000000000000000000000bb");
1121        let txs = MockTransactionSet::dependent(sender, 0, 4, TxType::Eip1559).into_vec();
1122        for tx in txs {
1123            pool.add_transaction(f.validated_arc(tx), 0);
1124        }
1125        pool.assert_invariants();
1126        let sender_id = f.ids.sender_id(&sender).unwrap();
1127        let id3 = TransactionId::new(sender_id, 3);
1128        let _ = pool.remove_transaction(&id3);
1129        let highest = pool.highest_nonces.get(&sender_id).unwrap();
1130        assert_eq!(highest.transaction.nonce(), 2);
1131        let id2 = TransactionId::new(sender_id, 2);
1132        let _ = pool.remove_transaction(&id2);
1133        let highest = pool.highest_nonces.get(&sender_id).unwrap();
1134        assert_eq!(highest.transaction.nonce(), 1);
1135        pool.assert_invariants();
1136    }
1137
1138    #[test]
1139    fn test_remove_only_tx_clears_highest() {
1140        let mut f = MockTransactionFactory::default();
1141        let mut pool = PendingPool::new(MockOrdering::default());
1142        let sender = address!("0x00000000000000000000000000000000000000cc");
1143        let txs = MockTransactionSet::dependent(sender, 0, 1, TxType::Eip1559).into_vec();
1144        for tx in txs {
1145            pool.add_transaction(f.validated_arc(tx), 0);
1146        }
1147        pool.assert_invariants();
1148        let sender_id = f.ids.sender_id(&sender).unwrap();
1149        let id0 = TransactionId::new(sender_id, 0);
1150        let _ = pool.remove_transaction(&id0);
1151        assert!(!pool.highest_nonces.contains_key(&sender_id));
1152        pool.assert_invariants();
1153    }
1154}