reth_transaction_pool/pool/
best.rs

1use crate::{
2    error::{Eip4844PoolTransactionError, InvalidPoolTransactionError},
3    identifier::{SenderId, TransactionId},
4    pool::pending::PendingTransaction,
5    PoolTransaction, Priority, TransactionOrdering, ValidPoolTransaction,
6};
7use alloy_consensus::Transaction;
8use alloy_eips::Typed2718;
9use alloy_primitives::Address;
10use core::fmt;
11use reth_primitives_traits::transaction::error::InvalidTransactionError;
12use std::{
13    collections::{BTreeMap, BTreeSet, HashSet, VecDeque},
14    sync::Arc,
15};
16use tokio::sync::broadcast::{error::TryRecvError, Receiver};
17use tracing::debug;
18
19const MAX_NEW_TRANSACTIONS_PER_BATCH: usize = 16;
20
21/// An iterator that returns transactions that can be executed on the current state (*best*
22/// transactions).
23///
24/// This is a wrapper around [`BestTransactions`] that also enforces a specific basefee.
25///
26/// This iterator guarantees that all transactions it returns satisfy both the base fee and blob
27/// fee!
28pub(crate) struct BestTransactionsWithFees<T: TransactionOrdering> {
29    pub(crate) best: BestTransactions<T>,
30    pub(crate) base_fee: u64,
31    pub(crate) base_fee_per_blob_gas: u64,
32}
33
34impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactionsWithFees<T> {
35    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
36        BestTransactions::mark_invalid(&mut self.best, tx, kind)
37    }
38
39    fn no_updates(&mut self) {
40        self.best.no_updates()
41    }
42
43    fn skip_blobs(&mut self) {
44        self.set_skip_blobs(true)
45    }
46
47    fn set_skip_blobs(&mut self, skip_blobs: bool) {
48        self.best.set_skip_blobs(skip_blobs)
49    }
50}
51
52impl<T: TransactionOrdering> Iterator for BestTransactionsWithFees<T> {
53    type Item = Arc<ValidPoolTransaction<T::Transaction>>;
54
55    fn next(&mut self) -> Option<Self::Item> {
56        // find the next transaction that satisfies the base fee
57        loop {
58            let best = Iterator::next(&mut self.best)?;
59            // If both the base fee and blob fee (if applicable for EIP-4844) are satisfied, return
60            // the transaction
61            if best.transaction.max_fee_per_gas() >= self.base_fee as u128 &&
62                best.transaction
63                    .max_fee_per_blob_gas()
64                    .is_none_or(|fee| fee >= self.base_fee_per_blob_gas as u128)
65            {
66                return Some(best);
67            }
68            crate::traits::BestTransactions::mark_invalid(
69                self,
70                &best,
71                InvalidPoolTransactionError::Underpriced,
72            );
73        }
74    }
75}
76
77/// An iterator that returns transactions that can be executed on the current state (*best*
78/// transactions).
79///
80/// The [`PendingPool`](crate::pool::pending::PendingPool) contains transactions that *could* all
81/// be executed on the current state, but only yields transactions that are ready to be executed
82/// now. While it contains all gapless transactions of a sender, it _always_ only returns the
83/// transaction with the current on chain nonce.
84#[derive(Debug)]
85pub struct BestTransactions<T: TransactionOrdering> {
86    /// Contains a copy of _all_ transactions of the pending pool at the point in time this
87    /// iterator was created.
88    pub(crate) all: BTreeMap<TransactionId, PendingTransaction<T>>,
89    /// Transactions that can be executed right away: these have the expected nonce.
90    ///
91    /// Once an `independent` transaction with the nonce `N` is returned, it unlocks `N+1`, which
92    /// then can be moved from the `all` set to the `independent` set.
93    pub(crate) independent: BTreeSet<PendingTransaction<T>>,
94    /// There might be the case where a yielded transactions is invalid, this will track it.
95    pub(crate) invalid: HashSet<SenderId>,
96    /// Used to receive any new pending transactions that have been added to the pool after this
97    /// iterator was static filtered
98    ///
99    /// These new pending transactions are inserted into this iterator's pool before yielding the
100    /// next value
101    pub(crate) new_transaction_receiver: Option<Receiver<PendingTransaction<T>>>,
102    /// The priority value of most recently yielded transaction.
103    ///
104    /// This is required if new pending transactions are fed in while it yields new values.
105    pub(crate) last_priority: Option<Priority<T::PriorityValue>>,
106    /// Flag to control whether to skip blob transactions (EIP4844).
107    pub(crate) skip_blobs: bool,
108}
109
110impl<T: TransactionOrdering> BestTransactions<T> {
111    /// Mark the transaction and its descendants as invalid.
112    pub(crate) fn mark_invalid(
113        &mut self,
114        tx: &Arc<ValidPoolTransaction<T::Transaction>>,
115        _kind: InvalidPoolTransactionError,
116    ) {
117        self.invalid.insert(tx.sender_id());
118    }
119
120    /// Returns the ancestor the given transaction, the transaction with `nonce - 1`.
121    ///
122    /// Note: for a transaction with nonce higher than the current on chain nonce this will always
123    /// return an ancestor since all transactions in this pool are gapless.
124    pub(crate) fn ancestor(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
125        self.all.get(&id.unchecked_ancestor()?)
126    }
127
128    /// Non-blocking read on the new pending transactions subscription channel
129    fn try_recv(&mut self) -> Option<PendingTransaction<T>> {
130        loop {
131            match self.new_transaction_receiver.as_mut()?.try_recv() {
132                Ok(tx) => {
133                    if let Some(last_priority) = &self.last_priority &&
134                        &tx.priority > last_priority
135                    {
136                        // we skip transactions if we already yielded a transaction with lower
137                        // priority
138                        return None
139                    }
140                    return Some(tx)
141                }
142                // note TryRecvError::Lagged can be returned here, which is an error that attempts
143                // to correct itself on consecutive try_recv() attempts
144
145                // the cost of ignoring this error is allowing old transactions to get
146                // overwritten after the chan buffer size is met
147                Err(TryRecvError::Lagged(_)) => {
148                    // Handle the case where the receiver lagged too far behind.
149                    // `num_skipped` indicates the number of messages that were skipped.
150                }
151
152                // this case is still better than the existing iterator behavior where no new
153                // pending txs are surfaced to consumers
154                Err(_) => return None,
155            }
156        }
157    }
158
159    /// Removes the currently best independent transaction from the independent set and the total
160    /// set.
161    fn pop_best(&mut self) -> Option<PendingTransaction<T>> {
162        self.independent.pop_last().inspect(|best| {
163            self.all.remove(best.transaction.id());
164        })
165    }
166
167    /// Checks for new transactions that have come into the `PendingPool` after this iterator was
168    /// created and inserts them
169    fn add_new_transactions(&mut self) {
170        for _ in 0..MAX_NEW_TRANSACTIONS_PER_BATCH {
171            if let Some(pending_tx) = self.try_recv() {
172                //  same logic as PendingPool::add_transaction/PendingPool::best_with_unlocked
173                let tx_id = *pending_tx.transaction.id();
174                if self.ancestor(&tx_id).is_none() {
175                    self.independent.insert(pending_tx.clone());
176                }
177                self.all.insert(tx_id, pending_tx);
178            } else {
179                break;
180            }
181        }
182    }
183}
184
185impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactions<T> {
186    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
187        Self::mark_invalid(self, tx, kind)
188    }
189
190    fn no_updates(&mut self) {
191        self.new_transaction_receiver.take();
192        self.last_priority.take();
193    }
194
195    fn skip_blobs(&mut self) {
196        self.set_skip_blobs(true);
197    }
198
199    fn set_skip_blobs(&mut self, skip_blobs: bool) {
200        self.skip_blobs = skip_blobs;
201    }
202}
203
204impl<T: TransactionOrdering> Iterator for BestTransactions<T> {
205    type Item = Arc<ValidPoolTransaction<T::Transaction>>;
206
207    fn next(&mut self) -> Option<Self::Item> {
208        loop {
209            self.add_new_transactions();
210            // Remove the next independent tx with the highest priority
211            let best = self.pop_best()?;
212            let sender_id = best.transaction.sender_id();
213
214            // skip transactions for which sender was marked as invalid
215            if self.invalid.contains(&sender_id) {
216                debug!(
217                    target: "txpool",
218                    "[{:?}] skipping invalid transaction",
219                    best.transaction.hash()
220                );
221                continue
222            }
223
224            // Insert transactions that just got unlocked.
225            if let Some(unlocked) = self.all.get(&best.unlocks()) {
226                self.independent.insert(unlocked.clone());
227            }
228
229            if self.skip_blobs && best.transaction.transaction.is_eip4844() {
230                // blobs should be skipped, marking them as invalid will ensure that no dependent
231                // transactions are returned
232                self.mark_invalid(
233                    &best.transaction,
234                    InvalidPoolTransactionError::Eip4844(
235                        Eip4844PoolTransactionError::NoEip4844Blobs,
236                    ),
237                )
238            } else {
239                if self.new_transaction_receiver.is_some() {
240                    self.last_priority = Some(best.priority.clone())
241                }
242                return Some(best.transaction)
243            }
244        }
245    }
246}
247
248/// A [`BestTransactions`](crate::traits::BestTransactions) implementation that filters the
249/// transactions of iter with predicate.
250///
251/// Filter out transactions are marked as invalid:
252/// [`BestTransactions::mark_invalid`](crate::traits::BestTransactions::mark_invalid).
253pub struct BestTransactionFilter<I, P> {
254    pub(crate) best: I,
255    pub(crate) predicate: P,
256}
257
258impl<I, P> BestTransactionFilter<I, P> {
259    /// Create a new [`BestTransactionFilter`] with the given predicate.
260    pub const fn new(best: I, predicate: P) -> Self {
261        Self { best, predicate }
262    }
263}
264
265impl<I, P> Iterator for BestTransactionFilter<I, P>
266where
267    I: crate::traits::BestTransactions,
268    P: FnMut(&<I as Iterator>::Item) -> bool,
269{
270    type Item = <I as Iterator>::Item;
271
272    fn next(&mut self) -> Option<Self::Item> {
273        loop {
274            let best = self.best.next()?;
275            if (self.predicate)(&best) {
276                return Some(best)
277            }
278            self.best.mark_invalid(
279                &best,
280                InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
281            );
282        }
283    }
284}
285
286impl<I, P> crate::traits::BestTransactions for BestTransactionFilter<I, P>
287where
288    I: crate::traits::BestTransactions,
289    P: FnMut(&<I as Iterator>::Item) -> bool + Send,
290{
291    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
292        crate::traits::BestTransactions::mark_invalid(&mut self.best, tx, kind)
293    }
294
295    fn no_updates(&mut self) {
296        self.best.no_updates()
297    }
298
299    fn skip_blobs(&mut self) {
300        self.set_skip_blobs(true)
301    }
302
303    fn set_skip_blobs(&mut self, skip_blobs: bool) {
304        self.best.set_skip_blobs(skip_blobs)
305    }
306}
307
308impl<I: fmt::Debug, P> fmt::Debug for BestTransactionFilter<I, P> {
309    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
310        f.debug_struct("BestTransactionFilter").field("best", &self.best).finish()
311    }
312}
313
314/// Wrapper over [`crate::traits::BestTransactions`] that prioritizes transactions of certain
315/// senders capping total gas used by such transactions.
316#[derive(Debug)]
317pub struct BestTransactionsWithPrioritizedSenders<I: Iterator> {
318    /// Inner iterator
319    inner: I,
320    /// A set of senders which transactions should be prioritized
321    prioritized_senders: HashSet<Address>,
322    /// Maximum total gas limit of prioritized transactions
323    max_prioritized_gas: u64,
324    /// Buffer with transactions that are not being prioritized. Those will be the first to be
325    /// included after the prioritized transactions
326    buffer: VecDeque<I::Item>,
327    /// Tracker of total gas limit of prioritized transactions. Once it reaches
328    /// `max_prioritized_gas` no more transactions will be prioritized
329    prioritized_gas: u64,
330}
331
332impl<I: Iterator> BestTransactionsWithPrioritizedSenders<I> {
333    /// Constructs a new [`BestTransactionsWithPrioritizedSenders`].
334    pub fn new(prioritized_senders: HashSet<Address>, max_prioritized_gas: u64, inner: I) -> Self {
335        Self {
336            inner,
337            prioritized_senders,
338            max_prioritized_gas,
339            buffer: Default::default(),
340            prioritized_gas: Default::default(),
341        }
342    }
343}
344
345impl<I, T> Iterator for BestTransactionsWithPrioritizedSenders<I>
346where
347    I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
348    T: PoolTransaction,
349{
350    type Item = <I as Iterator>::Item;
351
352    fn next(&mut self) -> Option<Self::Item> {
353        // If we have space, try prioritizing transactions
354        if self.prioritized_gas < self.max_prioritized_gas {
355            for item in &mut self.inner {
356                if self.prioritized_senders.contains(&item.transaction.sender()) &&
357                    self.prioritized_gas + item.transaction.gas_limit() <=
358                        self.max_prioritized_gas
359                {
360                    self.prioritized_gas += item.transaction.gas_limit();
361                    return Some(item)
362                }
363                self.buffer.push_back(item);
364            }
365        }
366
367        if let Some(item) = self.buffer.pop_front() {
368            Some(item)
369        } else {
370            self.inner.next()
371        }
372    }
373}
374
375impl<I, T> crate::traits::BestTransactions for BestTransactionsWithPrioritizedSenders<I>
376where
377    I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
378    T: PoolTransaction,
379{
380    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
381        self.inner.mark_invalid(tx, kind)
382    }
383
384    fn no_updates(&mut self) {
385        self.inner.no_updates()
386    }
387
388    fn set_skip_blobs(&mut self, skip_blobs: bool) {
389        if skip_blobs {
390            self.buffer.retain(|tx| !tx.transaction.is_eip4844())
391        }
392        self.inner.set_skip_blobs(skip_blobs)
393    }
394}
395
396#[cfg(test)]
397mod tests {
398    use super::*;
399    use crate::{
400        pool::pending::PendingPool,
401        test_utils::{MockOrdering, MockTransaction, MockTransactionFactory},
402        BestTransactions, Priority,
403    };
404
405    #[test]
406    fn test_best_iter() {
407        let mut pool = PendingPool::new(MockOrdering::default());
408        let mut f = MockTransactionFactory::default();
409
410        let num_tx = 10;
411        // insert 10 gapless tx
412        let tx = MockTransaction::eip1559();
413        for nonce in 0..num_tx {
414            let tx = tx.clone().rng_hash().with_nonce(nonce);
415            let valid_tx = f.validated(tx);
416            pool.add_transaction(Arc::new(valid_tx), 0);
417        }
418
419        let mut best = pool.best();
420        assert_eq!(best.all.len(), num_tx as usize);
421        assert_eq!(best.independent.len(), 1);
422
423        // check tx are returned in order
424        for nonce in 0..num_tx {
425            assert_eq!(best.independent.len(), 1);
426            let tx = best.next().unwrap();
427            assert_eq!(tx.nonce(), nonce);
428        }
429    }
430
431    #[test]
432    fn test_best_iter_invalid() {
433        let mut pool = PendingPool::new(MockOrdering::default());
434        let mut f = MockTransactionFactory::default();
435
436        let num_tx = 10;
437        // insert 10 gapless tx
438        let tx = MockTransaction::eip1559();
439        for nonce in 0..num_tx {
440            let tx = tx.clone().rng_hash().with_nonce(nonce);
441            let valid_tx = f.validated(tx);
442            pool.add_transaction(Arc::new(valid_tx), 0);
443        }
444
445        let mut best = pool.best();
446
447        // mark the first tx as invalid
448        let invalid = best.independent.iter().next().unwrap();
449        best.mark_invalid(
450            &invalid.transaction.clone(),
451            InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
452        );
453
454        // iterator is empty
455        assert!(best.next().is_none());
456    }
457
458    #[test]
459    fn test_best_transactions_iter_invalid() {
460        let mut pool = PendingPool::new(MockOrdering::default());
461        let mut f = MockTransactionFactory::default();
462
463        let num_tx = 10;
464        // insert 10 gapless tx
465        let tx = MockTransaction::eip1559();
466        for nonce in 0..num_tx {
467            let tx = tx.clone().rng_hash().with_nonce(nonce);
468            let valid_tx = f.validated(tx);
469            pool.add_transaction(Arc::new(valid_tx), 0);
470        }
471
472        let mut best: Box<
473            dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<MockTransaction>>>,
474        > = Box::new(pool.best());
475
476        let tx = Iterator::next(&mut best).unwrap();
477        crate::traits::BestTransactions::mark_invalid(
478            &mut *best,
479            &tx,
480            InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
481        );
482        assert!(Iterator::next(&mut best).is_none());
483    }
484
485    #[test]
486    fn test_best_with_fees_iter_base_fee_satisfied() {
487        let mut pool = PendingPool::new(MockOrdering::default());
488        let mut f = MockTransactionFactory::default();
489
490        let num_tx = 5;
491        let base_fee: u64 = 10;
492        let base_fee_per_blob_gas: u64 = 15;
493
494        // Insert transactions with a max_fee_per_gas greater than or equal to the base fee
495        // Without blob fee
496        for nonce in 0..num_tx {
497            let tx = MockTransaction::eip1559()
498                .rng_hash()
499                .with_nonce(nonce)
500                .with_max_fee(base_fee as u128 + 5);
501            let valid_tx = f.validated(tx);
502            pool.add_transaction(Arc::new(valid_tx), 0);
503        }
504
505        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
506
507        for nonce in 0..num_tx {
508            let tx = best.next().expect("Transaction should be returned");
509            assert_eq!(tx.nonce(), nonce);
510            assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
511        }
512    }
513
514    #[test]
515    fn test_best_with_fees_iter_base_fee_violated() {
516        let mut pool = PendingPool::new(MockOrdering::default());
517        let mut f = MockTransactionFactory::default();
518
519        let num_tx = 5;
520        let base_fee: u64 = 20;
521        let base_fee_per_blob_gas: u64 = 15;
522
523        // Insert transactions with a max_fee_per_gas less than the base fee
524        for nonce in 0..num_tx {
525            let tx = MockTransaction::eip1559()
526                .rng_hash()
527                .with_nonce(nonce)
528                .with_max_fee(base_fee as u128 - 5);
529            let valid_tx = f.validated(tx);
530            pool.add_transaction(Arc::new(valid_tx), 0);
531        }
532
533        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
534
535        // No transaction should be returned since all violate the base fee
536        assert!(best.next().is_none());
537    }
538
539    #[test]
540    fn test_best_with_fees_iter_blob_fee_satisfied() {
541        let mut pool = PendingPool::new(MockOrdering::default());
542        let mut f = MockTransactionFactory::default();
543
544        let num_tx = 5;
545        let base_fee: u64 = 10;
546        let base_fee_per_blob_gas: u64 = 20;
547
548        // Insert transactions with a max_fee_per_blob_gas greater than or equal to the base fee per
549        // blob gas
550        for nonce in 0..num_tx {
551            let tx = MockTransaction::eip4844()
552                .rng_hash()
553                .with_nonce(nonce)
554                .with_max_fee(base_fee as u128 + 5)
555                .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
556            let valid_tx = f.validated(tx);
557            pool.add_transaction(Arc::new(valid_tx), 0);
558        }
559
560        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
561
562        // All transactions should be returned in order since they satisfy both base fee and blob
563        // fee
564        for nonce in 0..num_tx {
565            let tx = best.next().expect("Transaction should be returned");
566            assert_eq!(tx.nonce(), nonce);
567            assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
568            assert!(
569                tx.transaction.max_fee_per_blob_gas().unwrap() >= base_fee_per_blob_gas as u128
570            );
571        }
572
573        // No more transactions should be returned
574        assert!(best.next().is_none());
575    }
576
577    #[test]
578    fn test_best_with_fees_iter_blob_fee_violated() {
579        let mut pool = PendingPool::new(MockOrdering::default());
580        let mut f = MockTransactionFactory::default();
581
582        let num_tx = 5;
583        let base_fee: u64 = 10;
584        let base_fee_per_blob_gas: u64 = 20;
585
586        // Insert transactions with a max_fee_per_blob_gas less than the base fee per blob gas
587        for nonce in 0..num_tx {
588            let tx = MockTransaction::eip4844()
589                .rng_hash()
590                .with_nonce(nonce)
591                .with_max_fee(base_fee as u128 + 5)
592                .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
593            let valid_tx = f.validated(tx);
594            pool.add_transaction(Arc::new(valid_tx), 0);
595        }
596
597        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
598
599        // No transaction should be returned since all violate the blob fee
600        assert!(best.next().is_none());
601    }
602
603    #[test]
604    fn test_best_with_fees_iter_mixed_fees() {
605        let mut pool = PendingPool::new(MockOrdering::default());
606        let mut f = MockTransactionFactory::default();
607
608        let base_fee: u64 = 10;
609        let base_fee_per_blob_gas: u64 = 20;
610
611        // Insert transactions with varying max_fee_per_gas and max_fee_per_blob_gas
612        let tx1 =
613            MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
614        let tx2 = MockTransaction::eip4844()
615            .rng_hash()
616            .with_nonce(1)
617            .with_max_fee(base_fee as u128 + 5)
618            .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
619        let tx3 = MockTransaction::eip4844()
620            .rng_hash()
621            .with_nonce(2)
622            .with_max_fee(base_fee as u128 + 5)
623            .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
624        let tx4 =
625            MockTransaction::eip1559().rng_hash().with_nonce(3).with_max_fee(base_fee as u128 - 5);
626
627        pool.add_transaction(Arc::new(f.validated(tx1.clone())), 0);
628        pool.add_transaction(Arc::new(f.validated(tx2.clone())), 0);
629        pool.add_transaction(Arc::new(f.validated(tx3)), 0);
630        pool.add_transaction(Arc::new(f.validated(tx4)), 0);
631
632        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
633
634        let expected_order = vec![tx1, tx2];
635        for expected_tx in expected_order {
636            let tx = best.next().expect("Transaction should be returned");
637            assert_eq!(tx.transaction, expected_tx);
638        }
639
640        // No more transactions should be returned
641        assert!(best.next().is_none());
642    }
643
644    #[test]
645    fn test_best_add_transaction_with_next_nonce() {
646        let mut pool = PendingPool::new(MockOrdering::default());
647        let mut f = MockTransactionFactory::default();
648
649        // Add 5 transactions with increasing nonces to the pool
650        let num_tx = 5;
651        let tx = MockTransaction::eip1559();
652        for nonce in 0..num_tx {
653            let tx = tx.clone().rng_hash().with_nonce(nonce);
654            let valid_tx = f.validated(tx);
655            pool.add_transaction(Arc::new(valid_tx), 0);
656        }
657
658        // Create a BestTransactions iterator from the pool
659        let mut best = pool.best();
660
661        // Use a broadcast channel for transaction updates
662        let (tx_sender, tx_receiver) =
663            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
664        best.new_transaction_receiver = Some(tx_receiver);
665
666        // Create a new transaction with nonce 5 and validate it
667        let new_tx = MockTransaction::eip1559().rng_hash().with_nonce(5);
668        let valid_new_tx = f.validated(new_tx);
669
670        // Send the new transaction through the broadcast channel
671        let pending_tx = PendingTransaction {
672            submission_id: 10,
673            transaction: Arc::new(valid_new_tx.clone()),
674            priority: Priority::Value(1000),
675        };
676        tx_sender.send(pending_tx.clone()).unwrap();
677
678        // Add new transactions to the iterator
679        best.add_new_transactions();
680
681        // Verify that the new transaction has been added to the 'all' map
682        assert_eq!(best.all.len(), 6);
683        assert!(best.all.contains_key(valid_new_tx.id()));
684
685        // Verify that the new transaction has been added to the 'independent' set
686        assert_eq!(best.independent.len(), 2);
687        assert!(best.independent.contains(&pending_tx));
688    }
689
690    #[test]
691    fn test_best_add_transaction_with_ancestor() {
692        // Initialize a new PendingPool with default MockOrdering and MockTransactionFactory
693        let mut pool = PendingPool::new(MockOrdering::default());
694        let mut f = MockTransactionFactory::default();
695
696        // Add 5 transactions with increasing nonces to the pool
697        let num_tx = 5;
698        let tx = MockTransaction::eip1559();
699        for nonce in 0..num_tx {
700            let tx = tx.clone().rng_hash().with_nonce(nonce);
701            let valid_tx = f.validated(tx);
702            pool.add_transaction(Arc::new(valid_tx), 0);
703        }
704
705        // Create a BestTransactions iterator from the pool
706        let mut best = pool.best();
707
708        // Use a broadcast channel for transaction updates
709        let (tx_sender, tx_receiver) =
710            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
711        best.new_transaction_receiver = Some(tx_receiver);
712
713        // Create a new transaction with nonce 5 and validate it
714        let base_tx1 = MockTransaction::eip1559().rng_hash().with_nonce(5);
715        let valid_new_tx1 = f.validated(base_tx1.clone());
716
717        // Send the new transaction through the broadcast channel
718        let pending_tx1 = PendingTransaction {
719            submission_id: 10,
720            transaction: Arc::new(valid_new_tx1.clone()),
721            priority: Priority::Value(1000),
722        };
723        tx_sender.send(pending_tx1.clone()).unwrap();
724
725        // Add new transactions to the iterator
726        best.add_new_transactions();
727
728        // Verify that the new transaction has been added to the 'all' map
729        assert_eq!(best.all.len(), 6);
730        assert!(best.all.contains_key(valid_new_tx1.id()));
731
732        // Verify that the new transaction has been added to the 'independent' set
733        assert_eq!(best.independent.len(), 2);
734        assert!(best.independent.contains(&pending_tx1));
735
736        // Attempt to add a new transaction with a different nonce (not a duplicate)
737        let base_tx2 = base_tx1.with_nonce(6);
738        let valid_new_tx2 = f.validated(base_tx2);
739
740        // Send the new transaction through the broadcast channel
741        let pending_tx2 = PendingTransaction {
742            submission_id: 11, // Different submission ID
743            transaction: Arc::new(valid_new_tx2.clone()),
744            priority: Priority::Value(1000),
745        };
746        tx_sender.send(pending_tx2.clone()).unwrap();
747
748        // Add new transactions to the iterator
749        best.add_new_transactions();
750
751        // Verify that the new transaction has been added to 'all'
752        assert_eq!(best.all.len(), 7);
753        assert!(best.all.contains_key(valid_new_tx2.id()));
754
755        // Verify that the new transaction has not been added to the 'independent' set
756        assert_eq!(best.independent.len(), 2);
757        assert!(!best.independent.contains(&pending_tx2));
758    }
759
760    #[test]
761    fn test_best_transactions_filter_trait_object() {
762        // Initialize a new PendingPool with default MockOrdering and MockTransactionFactory
763        let mut pool = PendingPool::new(MockOrdering::default());
764        let mut f = MockTransactionFactory::default();
765
766        // Add 5 transactions with increasing nonces to the pool
767        let num_tx = 5;
768        let tx = MockTransaction::eip1559();
769        for nonce in 0..num_tx {
770            let tx = tx.clone().rng_hash().with_nonce(nonce);
771            let valid_tx = f.validated(tx);
772            pool.add_transaction(Arc::new(valid_tx), 0);
773        }
774
775        // Create a trait object of BestTransactions iterator from the pool
776        let best: Box<dyn crate::traits::BestTransactions<Item = _>> = Box::new(pool.best());
777
778        // Create a filter that only returns transactions with even nonces
779        let filter =
780            BestTransactionFilter::new(best, |tx: &Arc<ValidPoolTransaction<MockTransaction>>| {
781                tx.nonce().is_multiple_of(2)
782            });
783
784        // Verify that the filter only returns transactions with even nonces
785        for tx in filter {
786            assert_eq!(tx.nonce() % 2, 0);
787        }
788    }
789
790    #[test]
791    fn test_best_transactions_prioritized_senders() {
792        let mut pool = PendingPool::new(MockOrdering::default());
793        let mut f = MockTransactionFactory::default();
794
795        // Add 5 plain transactions from different senders with increasing gas price
796        for gas_price in 0..5 {
797            let tx = MockTransaction::eip1559().with_gas_price((gas_price + 1) * 10);
798            let valid_tx = f.validated(tx);
799            pool.add_transaction(Arc::new(valid_tx), 0);
800        }
801
802        // Add another transaction with 5 gas price that's going to be prioritized by sender
803        let prioritized_tx = MockTransaction::eip1559().with_gas_price(5).with_gas_limit(200);
804        let valid_prioritized_tx = f.validated(prioritized_tx.clone());
805        pool.add_transaction(Arc::new(valid_prioritized_tx), 0);
806
807        // Add another transaction with 3 gas price that should not be prioritized by sender because
808        // of gas limit.
809        let prioritized_tx2 = MockTransaction::eip1559().with_gas_price(3);
810        let valid_prioritized_tx2 = f.validated(prioritized_tx2.clone());
811        pool.add_transaction(Arc::new(valid_prioritized_tx2), 0);
812
813        let prioritized_senders =
814            HashSet::from([prioritized_tx.sender(), prioritized_tx2.sender()]);
815        let best =
816            BestTransactionsWithPrioritizedSenders::new(prioritized_senders, 200, pool.best());
817
818        // Verify that the prioritized transaction is returned first
819        // and the rest are returned in the reverse order of gas price
820        let mut iter = best.into_iter();
821        let top_of_block_tx = iter.next().unwrap();
822        assert_eq!(top_of_block_tx.max_fee_per_gas(), 5);
823        assert_eq!(top_of_block_tx.sender(), prioritized_tx.sender());
824        for gas_price in (0..5).rev() {
825            assert_eq!(iter.next().unwrap().max_fee_per_gas(), (gas_price + 1) * 10);
826        }
827
828        // Due to the gas limit, the transaction from second-prioritized sender was not
829        // prioritized.
830        let top_of_block_tx2 = iter.next().unwrap();
831        assert_eq!(top_of_block_tx2.max_fee_per_gas(), 3);
832        assert_eq!(top_of_block_tx2.sender(), prioritized_tx2.sender());
833    }
834
835    #[test]
836    fn test_best_with_fees_iter_no_blob_fee_required() {
837        // Tests transactions without blob fees where base fees are checked.
838        let mut pool = PendingPool::new(MockOrdering::default());
839        let mut f = MockTransactionFactory::default();
840
841        let base_fee: u64 = 10;
842        let base_fee_per_blob_gas: u64 = 0; // No blob fee requirement
843
844        // Insert transactions with max_fee_per_gas above the base fee
845        for nonce in 0..5 {
846            let tx = MockTransaction::eip1559()
847                .rng_hash()
848                .with_nonce(nonce)
849                .with_max_fee(base_fee as u128 + 5);
850            let valid_tx = f.validated(tx);
851            pool.add_transaction(Arc::new(valid_tx), 0);
852        }
853
854        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
855
856        // All transactions should be returned as no blob fee requirement is imposed
857        for nonce in 0..5 {
858            let tx = best.next().expect("Transaction should be returned");
859            assert_eq!(tx.nonce(), nonce);
860        }
861
862        // Ensure no more transactions are left
863        assert!(best.next().is_none());
864    }
865
866    #[test]
867    fn test_best_with_fees_iter_mix_of_blob_and_non_blob_transactions() {
868        // Tests mixed scenarios with both blob and non-blob transactions.
869        let mut pool = PendingPool::new(MockOrdering::default());
870        let mut f = MockTransactionFactory::default();
871
872        let base_fee: u64 = 10;
873        let base_fee_per_blob_gas: u64 = 15;
874
875        // Add a non-blob transaction that satisfies the base fee
876        let tx_non_blob =
877            MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
878        pool.add_transaction(Arc::new(f.validated(tx_non_blob.clone())), 0);
879
880        // Add a blob transaction that satisfies both base fee and blob fee
881        let tx_blob = MockTransaction::eip4844()
882            .rng_hash()
883            .with_nonce(1)
884            .with_max_fee(base_fee as u128 + 5)
885            .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
886        pool.add_transaction(Arc::new(f.validated(tx_blob.clone())), 0);
887
888        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
889
890        // Verify both transactions are returned
891        let tx = best.next().expect("Transaction should be returned");
892        assert_eq!(tx.transaction, tx_non_blob);
893
894        let tx = best.next().expect("Transaction should be returned");
895        assert_eq!(tx.transaction, tx_blob);
896
897        // Ensure no more transactions are left
898        assert!(best.next().is_none());
899    }
900
901    #[test]
902    fn test_best_transactions_with_skipping_blobs() {
903        // Tests the skip_blobs functionality to ensure blob transactions are skipped.
904        let mut pool = PendingPool::new(MockOrdering::default());
905        let mut f = MockTransactionFactory::default();
906
907        // Add a blob transaction
908        let tx_blob = MockTransaction::eip4844().rng_hash().with_nonce(0).with_blob_fee(100);
909        let valid_blob_tx = f.validated(tx_blob);
910        pool.add_transaction(Arc::new(valid_blob_tx), 0);
911
912        // Add a non-blob transaction
913        let tx_non_blob = MockTransaction::eip1559().rng_hash().with_nonce(1).with_max_fee(200);
914        let valid_non_blob_tx = f.validated(tx_non_blob.clone());
915        pool.add_transaction(Arc::new(valid_non_blob_tx), 0);
916
917        let mut best = pool.best();
918        best.skip_blobs();
919
920        // Only the non-blob transaction should be returned
921        let tx = best.next().expect("Transaction should be returned");
922        assert_eq!(tx.transaction, tx_non_blob);
923
924        // Ensure no more transactions are left
925        assert!(best.next().is_none());
926    }
927
928    #[test]
929    fn test_best_transactions_no_updates() {
930        // Tests the no_updates functionality to ensure it properly clears the
931        // new_transaction_receiver.
932        let mut pool = PendingPool::new(MockOrdering::default());
933        let mut f = MockTransactionFactory::default();
934
935        // Add a transaction
936        let tx = MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(100);
937        let valid_tx = f.validated(tx);
938        pool.add_transaction(Arc::new(valid_tx), 0);
939
940        let mut best = pool.best();
941
942        // Use a broadcast channel for transaction updates
943        let (_tx_sender, tx_receiver) =
944            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
945        best.new_transaction_receiver = Some(tx_receiver);
946
947        // Ensure receiver is set
948        assert!(best.new_transaction_receiver.is_some());
949
950        // Call no_updates to clear the receiver
951        best.no_updates();
952
953        // Ensure receiver is cleared
954        assert!(best.new_transaction_receiver.is_none());
955    }
956
957    #[test]
958    fn test_best_update_transaction_priority() {
959        let mut pool = PendingPool::new(MockOrdering::default());
960        let mut f = MockTransactionFactory::default();
961
962        // Add 5 transactions with increasing nonces to the pool
963        let num_tx = 5;
964        let tx = MockTransaction::eip1559();
965        for nonce in 0..num_tx {
966            let tx = tx.clone().rng_hash().with_nonce(nonce);
967            let valid_tx = f.validated(tx);
968            pool.add_transaction(Arc::new(valid_tx), 0);
969        }
970
971        // Create a BestTransactions iterator from the pool
972        let mut best = pool.best();
973
974        // Use a broadcast channel for transaction updates
975        let (tx_sender, tx_receiver) =
976            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
977        best.new_transaction_receiver = Some(tx_receiver);
978
979        // yield one tx, effectively locking in the highest prio
980        let first = best.next().unwrap();
981
982        // Create a new transaction with nonce 5 and validate it
983        let new_higher_fee_tx = MockTransaction::eip1559().with_nonce(0);
984        let valid_new_higher_fee_tx = f.validated(new_higher_fee_tx);
985
986        // Send the new transaction through the broadcast channel
987        let pending_tx = PendingTransaction {
988            submission_id: 10,
989            transaction: Arc::new(valid_new_higher_fee_tx.clone()),
990            priority: Priority::Value(u128::MAX),
991        };
992        tx_sender.send(pending_tx).unwrap();
993
994        // ensure that the higher prio tx is skipped since we yielded a lower one
995        for tx in best {
996            assert_eq!(tx.sender_id(), first.sender_id());
997            assert_ne!(tx.sender_id(), valid_new_higher_fee_tx.sender_id());
998        }
999    }
1000}