reth_transaction_pool/pool/
best.rs

1use crate::{
2    error::{Eip4844PoolTransactionError, InvalidPoolTransactionError},
3    identifier::{SenderId, TransactionId},
4    pool::pending::PendingTransaction,
5    PoolTransaction, TransactionOrdering, ValidPoolTransaction,
6};
7use alloy_consensus::Transaction;
8use alloy_eips::Typed2718;
9use alloy_primitives::Address;
10use core::fmt;
11use reth_primitives_traits::transaction::error::InvalidTransactionError;
12use std::{
13    collections::{BTreeMap, BTreeSet, HashSet, VecDeque},
14    sync::Arc,
15};
16use tokio::sync::broadcast::{error::TryRecvError, Receiver};
17use tracing::debug;
18
19/// An iterator that returns transactions that can be executed on the current state (*best*
20/// transactions).
21///
22/// This is a wrapper around [`BestTransactions`] that also enforces a specific basefee.
23///
24/// This iterator guarantees that all transaction it returns satisfy both the base fee and blob fee!
25pub(crate) struct BestTransactionsWithFees<T: TransactionOrdering> {
26    pub(crate) best: BestTransactions<T>,
27    pub(crate) base_fee: u64,
28    pub(crate) base_fee_per_blob_gas: u64,
29}
30
31impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactionsWithFees<T> {
32    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
33        BestTransactions::mark_invalid(&mut self.best, tx, kind)
34    }
35
36    fn no_updates(&mut self) {
37        self.best.no_updates()
38    }
39
40    fn skip_blobs(&mut self) {
41        self.set_skip_blobs(true)
42    }
43
44    fn set_skip_blobs(&mut self, skip_blobs: bool) {
45        self.best.set_skip_blobs(skip_blobs)
46    }
47}
48
49impl<T: TransactionOrdering> Iterator for BestTransactionsWithFees<T> {
50    type Item = Arc<ValidPoolTransaction<T::Transaction>>;
51
52    fn next(&mut self) -> Option<Self::Item> {
53        // find the next transaction that satisfies the base fee
54        loop {
55            let best = Iterator::next(&mut self.best)?;
56            // If both the base fee and blob fee (if applicable for EIP-4844) are satisfied, return
57            // the transaction
58            if best.transaction.max_fee_per_gas() >= self.base_fee as u128 &&
59                best.transaction
60                    .max_fee_per_blob_gas()
61                    .is_none_or(|fee| fee >= self.base_fee_per_blob_gas as u128)
62            {
63                return Some(best);
64            }
65            crate::traits::BestTransactions::mark_invalid(
66                self,
67                &best,
68                InvalidPoolTransactionError::Underpriced,
69            );
70        }
71    }
72}
73
74/// An iterator that returns transactions that can be executed on the current state (*best*
75/// transactions).
76///
77/// The [`PendingPool`](crate::pool::pending::PendingPool) contains transactions that *could* all
78/// be executed on the current state, but only yields transactions that are ready to be executed
79/// now. While it contains all gapless transactions of a sender, it _always_ only returns the
80/// transaction with the current on chain nonce.
81#[derive(Debug)]
82pub struct BestTransactions<T: TransactionOrdering> {
83    /// Contains a copy of _all_ transactions of the pending pool at the point in time this
84    /// iterator was created.
85    pub(crate) all: BTreeMap<TransactionId, PendingTransaction<T>>,
86    /// Transactions that can be executed right away: these have the expected nonce.
87    ///
88    /// Once an `independent` transaction with the nonce `N` is returned, it unlocks `N+1`, which
89    /// then can be moved from the `all` set to the `independent` set.
90    pub(crate) independent: BTreeSet<PendingTransaction<T>>,
91    /// There might be the case where a yielded transactions is invalid, this will track it.
92    pub(crate) invalid: HashSet<SenderId>,
93    /// Used to receive any new pending transactions that have been added to the pool after this
94    /// iterator was static fileted
95    ///
96    /// These new pending transactions are inserted into this iterator's pool before yielding the
97    /// next value
98    pub(crate) new_transaction_receiver: Option<Receiver<PendingTransaction<T>>>,
99    /// Flag to control whether to skip blob transactions (EIP4844).
100    pub(crate) skip_blobs: bool,
101}
102
103impl<T: TransactionOrdering> BestTransactions<T> {
104    /// Mark the transaction and it's descendants as invalid.
105    pub(crate) fn mark_invalid(
106        &mut self,
107        tx: &Arc<ValidPoolTransaction<T::Transaction>>,
108        _kind: InvalidPoolTransactionError,
109    ) {
110        self.invalid.insert(tx.sender_id());
111    }
112
113    /// Returns the ancestor the given transaction, the transaction with `nonce - 1`.
114    ///
115    /// Note: for a transaction with nonce higher than the current on chain nonce this will always
116    /// return an ancestor since all transaction in this pool are gapless.
117    pub(crate) fn ancestor(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
118        self.all.get(&id.unchecked_ancestor()?)
119    }
120
121    /// Non-blocking read on the new pending transactions subscription channel
122    fn try_recv(&mut self) -> Option<PendingTransaction<T>> {
123        loop {
124            match self.new_transaction_receiver.as_mut()?.try_recv() {
125                Ok(tx) => return Some(tx),
126                // note TryRecvError::Lagged can be returned here, which is an error that attempts
127                // to correct itself on consecutive try_recv() attempts
128
129                // the cost of ignoring this error is allowing old transactions to get
130                // overwritten after the chan buffer size is met
131                Err(TryRecvError::Lagged(_)) => {
132                    // Handle the case where the receiver lagged too far behind.
133                    // `num_skipped` indicates the number of messages that were skipped.
134                }
135
136                // this case is still better than the existing iterator behavior where no new
137                // pending txs are surfaced to consumers
138                Err(_) => return None,
139            }
140        }
141    }
142
143    /// Removes the currently best independent transaction from the independent set and the total
144    /// set.
145    fn pop_best(&mut self) -> Option<PendingTransaction<T>> {
146        self.independent.pop_last().inspect(|best| {
147            self.all.remove(best.transaction.id());
148        })
149    }
150
151    /// Checks for new transactions that have come into the `PendingPool` after this iterator was
152    /// created and inserts them
153    fn add_new_transactions(&mut self) {
154        while let Some(pending_tx) = self.try_recv() {
155            //  same logic as PendingPool::add_transaction/PendingPool::best_with_unlocked
156            let tx_id = *pending_tx.transaction.id();
157            if self.ancestor(&tx_id).is_none() {
158                self.independent.insert(pending_tx.clone());
159            }
160            self.all.insert(tx_id, pending_tx);
161        }
162    }
163}
164
165impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactions<T> {
166    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
167        Self::mark_invalid(self, tx, kind)
168    }
169
170    fn no_updates(&mut self) {
171        self.new_transaction_receiver.take();
172    }
173
174    fn skip_blobs(&mut self) {
175        self.set_skip_blobs(true);
176    }
177
178    fn set_skip_blobs(&mut self, skip_blobs: bool) {
179        self.skip_blobs = skip_blobs;
180    }
181}
182
183impl<T: TransactionOrdering> Iterator for BestTransactions<T> {
184    type Item = Arc<ValidPoolTransaction<T::Transaction>>;
185
186    fn next(&mut self) -> Option<Self::Item> {
187        loop {
188            self.add_new_transactions();
189            // Remove the next independent tx with the highest priority
190            let best = self.pop_best()?;
191            let sender_id = best.transaction.sender_id();
192
193            // skip transactions for which sender was marked as invalid
194            if self.invalid.contains(&sender_id) {
195                debug!(
196                    target: "txpool",
197                    "[{:?}] skipping invalid transaction",
198                    best.transaction.hash()
199                );
200                continue
201            }
202
203            // Insert transactions that just got unlocked.
204            if let Some(unlocked) = self.all.get(&best.unlocks()) {
205                self.independent.insert(unlocked.clone());
206            }
207
208            if self.skip_blobs && best.transaction.transaction.is_eip4844() {
209                // blobs should be skipped, marking them as invalid will ensure that no dependent
210                // transactions are returned
211                self.mark_invalid(
212                    &best.transaction,
213                    InvalidPoolTransactionError::Eip4844(
214                        Eip4844PoolTransactionError::NoEip4844Blobs,
215                    ),
216                )
217            } else {
218                return Some(best.transaction)
219            }
220        }
221    }
222}
223
224/// A [`BestTransactions`](crate::traits::BestTransactions) implementation that filters the
225/// transactions of iter with predicate.
226///
227/// Filter out transactions are marked as invalid:
228/// [`BestTransactions::mark_invalid`](crate::traits::BestTransactions::mark_invalid).
229pub struct BestTransactionFilter<I, P> {
230    pub(crate) best: I,
231    pub(crate) predicate: P,
232}
233
234impl<I, P> BestTransactionFilter<I, P> {
235    /// Create a new [`BestTransactionFilter`] with the given predicate.
236    pub const fn new(best: I, predicate: P) -> Self {
237        Self { best, predicate }
238    }
239}
240
241impl<I, P> Iterator for BestTransactionFilter<I, P>
242where
243    I: crate::traits::BestTransactions,
244    P: FnMut(&<I as Iterator>::Item) -> bool,
245{
246    type Item = <I as Iterator>::Item;
247
248    fn next(&mut self) -> Option<Self::Item> {
249        loop {
250            let best = self.best.next()?;
251            if (self.predicate)(&best) {
252                return Some(best)
253            }
254            self.best.mark_invalid(
255                &best,
256                InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
257            );
258        }
259    }
260}
261
262impl<I, P> crate::traits::BestTransactions for BestTransactionFilter<I, P>
263where
264    I: crate::traits::BestTransactions,
265    P: FnMut(&<I as Iterator>::Item) -> bool + Send,
266{
267    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
268        crate::traits::BestTransactions::mark_invalid(&mut self.best, tx, kind)
269    }
270
271    fn no_updates(&mut self) {
272        self.best.no_updates()
273    }
274
275    fn skip_blobs(&mut self) {
276        self.set_skip_blobs(true)
277    }
278
279    fn set_skip_blobs(&mut self, skip_blobs: bool) {
280        self.best.set_skip_blobs(skip_blobs)
281    }
282}
283
284impl<I: fmt::Debug, P> fmt::Debug for BestTransactionFilter<I, P> {
285    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
286        f.debug_struct("BestTransactionFilter").field("best", &self.best).finish()
287    }
288}
289
290/// Wrapper over [`crate::traits::BestTransactions`] that prioritizes transactions of certain
291/// senders capping total gas used by such transactions.
292#[derive(Debug)]
293pub struct BestTransactionsWithPrioritizedSenders<I: Iterator> {
294    /// Inner iterator
295    inner: I,
296    /// A set of senders which transactions should be prioritized
297    prioritized_senders: HashSet<Address>,
298    /// Maximum total gas limit of prioritized transactions
299    max_prioritized_gas: u64,
300    /// Buffer with transactions that are not being prioritized. Those will be the first to be
301    /// included after the prioritized transactions
302    buffer: VecDeque<I::Item>,
303    /// Tracker of total gas limit of prioritized transactions. Once it reaches
304    /// `max_prioritized_gas` no more transactions will be prioritized
305    prioritized_gas: u64,
306}
307
308impl<I: Iterator> BestTransactionsWithPrioritizedSenders<I> {
309    /// Constructs a new [`BestTransactionsWithPrioritizedSenders`].
310    pub fn new(prioritized_senders: HashSet<Address>, max_prioritized_gas: u64, inner: I) -> Self {
311        Self {
312            inner,
313            prioritized_senders,
314            max_prioritized_gas,
315            buffer: Default::default(),
316            prioritized_gas: Default::default(),
317        }
318    }
319}
320
321impl<I, T> Iterator for BestTransactionsWithPrioritizedSenders<I>
322where
323    I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
324    T: PoolTransaction,
325{
326    type Item = <I as Iterator>::Item;
327
328    fn next(&mut self) -> Option<Self::Item> {
329        // If we have space, try prioritizing transactions
330        if self.prioritized_gas < self.max_prioritized_gas {
331            for item in &mut self.inner {
332                if self.prioritized_senders.contains(&item.transaction.sender()) &&
333                    self.prioritized_gas + item.transaction.gas_limit() <=
334                        self.max_prioritized_gas
335                {
336                    self.prioritized_gas += item.transaction.gas_limit();
337                    return Some(item)
338                }
339                self.buffer.push_back(item);
340            }
341        }
342
343        if let Some(item) = self.buffer.pop_front() {
344            Some(item)
345        } else {
346            self.inner.next()
347        }
348    }
349}
350
351impl<I, T> crate::traits::BestTransactions for BestTransactionsWithPrioritizedSenders<I>
352where
353    I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
354    T: PoolTransaction,
355{
356    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
357        self.inner.mark_invalid(tx, kind)
358    }
359
360    fn no_updates(&mut self) {
361        self.inner.no_updates()
362    }
363
364    fn set_skip_blobs(&mut self, skip_blobs: bool) {
365        if skip_blobs {
366            self.buffer.retain(|tx| !tx.transaction.is_eip4844())
367        }
368        self.inner.set_skip_blobs(skip_blobs)
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375    use crate::{
376        pool::pending::PendingPool,
377        test_utils::{MockOrdering, MockTransaction, MockTransactionFactory},
378        BestTransactions, Priority,
379    };
380    use alloy_primitives::U256;
381
382    #[test]
383    fn test_best_iter() {
384        let mut pool = PendingPool::new(MockOrdering::default());
385        let mut f = MockTransactionFactory::default();
386
387        let num_tx = 10;
388        // insert 10 gapless tx
389        let tx = MockTransaction::eip1559();
390        for nonce in 0..num_tx {
391            let tx = tx.clone().rng_hash().with_nonce(nonce);
392            let valid_tx = f.validated(tx);
393            pool.add_transaction(Arc::new(valid_tx), 0);
394        }
395
396        let mut best = pool.best();
397        assert_eq!(best.all.len(), num_tx as usize);
398        assert_eq!(best.independent.len(), 1);
399
400        // check tx are returned in order
401        for nonce in 0..num_tx {
402            assert_eq!(best.independent.len(), 1);
403            let tx = best.next().unwrap();
404            assert_eq!(tx.nonce(), nonce);
405        }
406    }
407
408    #[test]
409    fn test_best_iter_invalid() {
410        let mut pool = PendingPool::new(MockOrdering::default());
411        let mut f = MockTransactionFactory::default();
412
413        let num_tx = 10;
414        // insert 10 gapless tx
415        let tx = MockTransaction::eip1559();
416        for nonce in 0..num_tx {
417            let tx = tx.clone().rng_hash().with_nonce(nonce);
418            let valid_tx = f.validated(tx);
419            pool.add_transaction(Arc::new(valid_tx), 0);
420        }
421
422        let mut best = pool.best();
423
424        // mark the first tx as invalid
425        let invalid = best.independent.iter().next().unwrap();
426        best.mark_invalid(
427            &invalid.transaction.clone(),
428            InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
429        );
430
431        // iterator is empty
432        assert!(best.next().is_none());
433    }
434
435    #[test]
436    fn test_best_transactions_iter_invalid() {
437        let mut pool = PendingPool::new(MockOrdering::default());
438        let mut f = MockTransactionFactory::default();
439
440        let num_tx = 10;
441        // insert 10 gapless tx
442        let tx = MockTransaction::eip1559();
443        for nonce in 0..num_tx {
444            let tx = tx.clone().rng_hash().with_nonce(nonce);
445            let valid_tx = f.validated(tx);
446            pool.add_transaction(Arc::new(valid_tx), 0);
447        }
448
449        let mut best: Box<
450            dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<MockTransaction>>>,
451        > = Box::new(pool.best());
452
453        let tx = Iterator::next(&mut best).unwrap();
454        crate::traits::BestTransactions::mark_invalid(
455            &mut *best,
456            &tx,
457            InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
458        );
459        assert!(Iterator::next(&mut best).is_none());
460    }
461
462    #[test]
463    fn test_best_with_fees_iter_base_fee_satisfied() {
464        let mut pool = PendingPool::new(MockOrdering::default());
465        let mut f = MockTransactionFactory::default();
466
467        let num_tx = 5;
468        let base_fee: u64 = 10;
469        let base_fee_per_blob_gas: u64 = 15;
470
471        // Insert transactions with a max_fee_per_gas greater than or equal to the base fee
472        // Without blob fee
473        for nonce in 0..num_tx {
474            let tx = MockTransaction::eip1559()
475                .rng_hash()
476                .with_nonce(nonce)
477                .with_max_fee(base_fee as u128 + 5);
478            let valid_tx = f.validated(tx);
479            pool.add_transaction(Arc::new(valid_tx), 0);
480        }
481
482        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
483
484        for nonce in 0..num_tx {
485            let tx = best.next().expect("Transaction should be returned");
486            assert_eq!(tx.nonce(), nonce);
487            assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
488        }
489    }
490
491    #[test]
492    fn test_best_with_fees_iter_base_fee_violated() {
493        let mut pool = PendingPool::new(MockOrdering::default());
494        let mut f = MockTransactionFactory::default();
495
496        let num_tx = 5;
497        let base_fee: u64 = 20;
498        let base_fee_per_blob_gas: u64 = 15;
499
500        // Insert transactions with a max_fee_per_gas less than the base fee
501        for nonce in 0..num_tx {
502            let tx = MockTransaction::eip1559()
503                .rng_hash()
504                .with_nonce(nonce)
505                .with_max_fee(base_fee as u128 - 5);
506            let valid_tx = f.validated(tx);
507            pool.add_transaction(Arc::new(valid_tx), 0);
508        }
509
510        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
511
512        // No transaction should be returned since all violate the base fee
513        assert!(best.next().is_none());
514    }
515
516    #[test]
517    fn test_best_with_fees_iter_blob_fee_satisfied() {
518        let mut pool = PendingPool::new(MockOrdering::default());
519        let mut f = MockTransactionFactory::default();
520
521        let num_tx = 5;
522        let base_fee: u64 = 10;
523        let base_fee_per_blob_gas: u64 = 20;
524
525        // Insert transactions with a max_fee_per_blob_gas greater than or equal to the base fee per
526        // blob gas
527        for nonce in 0..num_tx {
528            let tx = MockTransaction::eip4844()
529                .rng_hash()
530                .with_nonce(nonce)
531                .with_max_fee(base_fee as u128 + 5)
532                .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
533            let valid_tx = f.validated(tx);
534            pool.add_transaction(Arc::new(valid_tx), 0);
535        }
536
537        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
538
539        // All transactions should be returned in order since they satisfy both base fee and blob
540        // fee
541        for nonce in 0..num_tx {
542            let tx = best.next().expect("Transaction should be returned");
543            assert_eq!(tx.nonce(), nonce);
544            assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
545            assert!(
546                tx.transaction.max_fee_per_blob_gas().unwrap() >= base_fee_per_blob_gas as u128
547            );
548        }
549
550        // No more transactions should be returned
551        assert!(best.next().is_none());
552    }
553
554    #[test]
555    fn test_best_with_fees_iter_blob_fee_violated() {
556        let mut pool = PendingPool::new(MockOrdering::default());
557        let mut f = MockTransactionFactory::default();
558
559        let num_tx = 5;
560        let base_fee: u64 = 10;
561        let base_fee_per_blob_gas: u64 = 20;
562
563        // Insert transactions with a max_fee_per_blob_gas less than the base fee per blob gas
564        for nonce in 0..num_tx {
565            let tx = MockTransaction::eip4844()
566                .rng_hash()
567                .with_nonce(nonce)
568                .with_max_fee(base_fee as u128 + 5)
569                .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
570            let valid_tx = f.validated(tx);
571            pool.add_transaction(Arc::new(valid_tx), 0);
572        }
573
574        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
575
576        // No transaction should be returned since all violate the blob fee
577        assert!(best.next().is_none());
578    }
579
580    #[test]
581    fn test_best_with_fees_iter_mixed_fees() {
582        let mut pool = PendingPool::new(MockOrdering::default());
583        let mut f = MockTransactionFactory::default();
584
585        let base_fee: u64 = 10;
586        let base_fee_per_blob_gas: u64 = 20;
587
588        // Insert transactions with varying max_fee_per_gas and max_fee_per_blob_gas
589        let tx1 =
590            MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
591        let tx2 = MockTransaction::eip4844()
592            .rng_hash()
593            .with_nonce(1)
594            .with_max_fee(base_fee as u128 + 5)
595            .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
596        let tx3 = MockTransaction::eip4844()
597            .rng_hash()
598            .with_nonce(2)
599            .with_max_fee(base_fee as u128 + 5)
600            .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
601        let tx4 =
602            MockTransaction::eip1559().rng_hash().with_nonce(3).with_max_fee(base_fee as u128 - 5);
603
604        pool.add_transaction(Arc::new(f.validated(tx1.clone())), 0);
605        pool.add_transaction(Arc::new(f.validated(tx2.clone())), 0);
606        pool.add_transaction(Arc::new(f.validated(tx3)), 0);
607        pool.add_transaction(Arc::new(f.validated(tx4)), 0);
608
609        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
610
611        let expected_order = vec![tx1, tx2];
612        for expected_tx in expected_order {
613            let tx = best.next().expect("Transaction should be returned");
614            assert_eq!(tx.transaction, expected_tx);
615        }
616
617        // No more transactions should be returned
618        assert!(best.next().is_none());
619    }
620
621    #[test]
622    fn test_best_add_transaction_with_next_nonce() {
623        let mut pool = PendingPool::new(MockOrdering::default());
624        let mut f = MockTransactionFactory::default();
625
626        // Add 5 transactions with increasing nonces to the pool
627        let num_tx = 5;
628        let tx = MockTransaction::eip1559();
629        for nonce in 0..num_tx {
630            let tx = tx.clone().rng_hash().with_nonce(nonce);
631            let valid_tx = f.validated(tx);
632            pool.add_transaction(Arc::new(valid_tx), 0);
633        }
634
635        // Create a BestTransactions iterator from the pool
636        let mut best = pool.best();
637
638        // Use a broadcast channel for transaction updates
639        let (tx_sender, tx_receiver) =
640            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
641        best.new_transaction_receiver = Some(tx_receiver);
642
643        // Create a new transaction with nonce 5 and validate it
644        let new_tx = MockTransaction::eip1559().rng_hash().with_nonce(5);
645        let valid_new_tx = f.validated(new_tx);
646
647        // Send the new transaction through the broadcast channel
648        let pending_tx = PendingTransaction {
649            submission_id: 10,
650            transaction: Arc::new(valid_new_tx.clone()),
651            priority: Priority::Value(U256::from(1000)),
652        };
653        tx_sender.send(pending_tx.clone()).unwrap();
654
655        // Add new transactions to the iterator
656        best.add_new_transactions();
657
658        // Verify that the new transaction has been added to the 'all' map
659        assert_eq!(best.all.len(), 6);
660        assert!(best.all.contains_key(valid_new_tx.id()));
661
662        // Verify that the new transaction has been added to the 'independent' set
663        assert_eq!(best.independent.len(), 2);
664        assert!(best.independent.contains(&pending_tx));
665    }
666
667    #[test]
668    fn test_best_add_transaction_with_ancestor() {
669        // Initialize a new PendingPool with default MockOrdering and MockTransactionFactory
670        let mut pool = PendingPool::new(MockOrdering::default());
671        let mut f = MockTransactionFactory::default();
672
673        // Add 5 transactions with increasing nonces to the pool
674        let num_tx = 5;
675        let tx = MockTransaction::eip1559();
676        for nonce in 0..num_tx {
677            let tx = tx.clone().rng_hash().with_nonce(nonce);
678            let valid_tx = f.validated(tx);
679            pool.add_transaction(Arc::new(valid_tx), 0);
680        }
681
682        // Create a BestTransactions iterator from the pool
683        let mut best = pool.best();
684
685        // Use a broadcast channel for transaction updates
686        let (tx_sender, tx_receiver) =
687            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
688        best.new_transaction_receiver = Some(tx_receiver);
689
690        // Create a new transaction with nonce 5 and validate it
691        let base_tx1 = MockTransaction::eip1559().rng_hash().with_nonce(5);
692        let valid_new_tx1 = f.validated(base_tx1.clone());
693
694        // Send the new transaction through the broadcast channel
695        let pending_tx1 = PendingTransaction {
696            submission_id: 10,
697            transaction: Arc::new(valid_new_tx1.clone()),
698            priority: Priority::Value(U256::from(1000)),
699        };
700        tx_sender.send(pending_tx1.clone()).unwrap();
701
702        // Add new transactions to the iterator
703        best.add_new_transactions();
704
705        // Verify that the new transaction has been added to the 'all' map
706        assert_eq!(best.all.len(), 6);
707        assert!(best.all.contains_key(valid_new_tx1.id()));
708
709        // Verify that the new transaction has been added to the 'independent' set
710        assert_eq!(best.independent.len(), 2);
711        assert!(best.independent.contains(&pending_tx1));
712
713        // Attempt to add a new transaction with a different nonce (not a duplicate)
714        let base_tx2 = base_tx1.with_nonce(6);
715        let valid_new_tx2 = f.validated(base_tx2);
716
717        // Send the new transaction through the broadcast channel
718        let pending_tx2 = PendingTransaction {
719            submission_id: 11, // Different submission ID
720            transaction: Arc::new(valid_new_tx2.clone()),
721            priority: Priority::Value(U256::from(1000)),
722        };
723        tx_sender.send(pending_tx2.clone()).unwrap();
724
725        // Add new transactions to the iterator
726        best.add_new_transactions();
727
728        // Verify that the new transaction has been added to 'all'
729        assert_eq!(best.all.len(), 7);
730        assert!(best.all.contains_key(valid_new_tx2.id()));
731
732        // Verify that the new transaction has not been added to the 'independent' set
733        assert_eq!(best.independent.len(), 2);
734        assert!(!best.independent.contains(&pending_tx2));
735    }
736
737    #[test]
738    fn test_best_transactions_filter_trait_object() {
739        // Initialize a new PendingPool with default MockOrdering and MockTransactionFactory
740        let mut pool = PendingPool::new(MockOrdering::default());
741        let mut f = MockTransactionFactory::default();
742
743        // Add 5 transactions with increasing nonces to the pool
744        let num_tx = 5;
745        let tx = MockTransaction::eip1559();
746        for nonce in 0..num_tx {
747            let tx = tx.clone().rng_hash().with_nonce(nonce);
748            let valid_tx = f.validated(tx);
749            pool.add_transaction(Arc::new(valid_tx), 0);
750        }
751
752        // Create a trait object of BestTransactions iterator from the pool
753        let best: Box<dyn crate::traits::BestTransactions<Item = _>> = Box::new(pool.best());
754
755        // Create a filter that only returns transactions with even nonces
756        let filter =
757            BestTransactionFilter::new(best, |tx: &Arc<ValidPoolTransaction<MockTransaction>>| {
758                tx.nonce() % 2 == 0
759            });
760
761        // Verify that the filter only returns transactions with even nonces
762        for tx in filter {
763            assert_eq!(tx.nonce() % 2, 0);
764        }
765    }
766
767    #[test]
768    fn test_best_transactions_prioritized_senders() {
769        let mut pool = PendingPool::new(MockOrdering::default());
770        let mut f = MockTransactionFactory::default();
771
772        // Add 5 plain transactions from different senders with increasing gas price
773        for gas_price in 0..5 {
774            let tx = MockTransaction::eip1559().with_gas_price((gas_price + 1) * 10);
775            let valid_tx = f.validated(tx);
776            pool.add_transaction(Arc::new(valid_tx), 0);
777        }
778
779        // Add another transaction with 5 gas price that's going to be prioritized by sender
780        let prioritized_tx = MockTransaction::eip1559().with_gas_price(5).with_gas_limit(200);
781        let valid_prioritized_tx = f.validated(prioritized_tx.clone());
782        pool.add_transaction(Arc::new(valid_prioritized_tx), 0);
783
784        // Add another transaction with 3 gas price that should not be prioritized by sender because
785        // of gas limit.
786        let prioritized_tx2 = MockTransaction::eip1559().with_gas_price(3);
787        let valid_prioritized_tx2 = f.validated(prioritized_tx2.clone());
788        pool.add_transaction(Arc::new(valid_prioritized_tx2), 0);
789
790        let prioritized_senders =
791            HashSet::from([prioritized_tx.sender(), prioritized_tx2.sender()]);
792        let best =
793            BestTransactionsWithPrioritizedSenders::new(prioritized_senders, 200, pool.best());
794
795        // Verify that the prioritized transaction is returned first
796        // and the rest are returned in the reverse order of gas price
797        let mut iter = best.into_iter();
798        let top_of_block_tx = iter.next().unwrap();
799        assert_eq!(top_of_block_tx.max_fee_per_gas(), 5);
800        assert_eq!(top_of_block_tx.sender(), prioritized_tx.sender());
801        for gas_price in (0..5).rev() {
802            assert_eq!(iter.next().unwrap().max_fee_per_gas(), (gas_price + 1) * 10);
803        }
804
805        // Due to the gas limit, the transaction from second prioritized sender was not
806        // prioritized.
807        let top_of_block_tx2 = iter.next().unwrap();
808        assert_eq!(top_of_block_tx2.max_fee_per_gas(), 3);
809        assert_eq!(top_of_block_tx2.sender(), prioritized_tx2.sender());
810    }
811
812    #[test]
813    fn test_best_with_fees_iter_no_blob_fee_required() {
814        // Tests transactions without blob fees where base fees are checked.
815        let mut pool = PendingPool::new(MockOrdering::default());
816        let mut f = MockTransactionFactory::default();
817
818        let base_fee: u64 = 10;
819        let base_fee_per_blob_gas: u64 = 0; // No blob fee requirement
820
821        // Insert transactions with max_fee_per_gas above the base fee
822        for nonce in 0..5 {
823            let tx = MockTransaction::eip1559()
824                .rng_hash()
825                .with_nonce(nonce)
826                .with_max_fee(base_fee as u128 + 5);
827            let valid_tx = f.validated(tx);
828            pool.add_transaction(Arc::new(valid_tx), 0);
829        }
830
831        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
832
833        // All transactions should be returned as no blob fee requirement is imposed
834        for nonce in 0..5 {
835            let tx = best.next().expect("Transaction should be returned");
836            assert_eq!(tx.nonce(), nonce);
837        }
838
839        // Ensure no more transactions are left
840        assert!(best.next().is_none());
841    }
842
843    #[test]
844    fn test_best_with_fees_iter_mix_of_blob_and_non_blob_transactions() {
845        // Tests mixed scenarios with both blob and non-blob transactions.
846        let mut pool = PendingPool::new(MockOrdering::default());
847        let mut f = MockTransactionFactory::default();
848
849        let base_fee: u64 = 10;
850        let base_fee_per_blob_gas: u64 = 15;
851
852        // Add a non-blob transaction that satisfies the base fee
853        let tx_non_blob =
854            MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
855        pool.add_transaction(Arc::new(f.validated(tx_non_blob.clone())), 0);
856
857        // Add a blob transaction that satisfies both base fee and blob fee
858        let tx_blob = MockTransaction::eip4844()
859            .rng_hash()
860            .with_nonce(1)
861            .with_max_fee(base_fee as u128 + 5)
862            .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
863        pool.add_transaction(Arc::new(f.validated(tx_blob.clone())), 0);
864
865        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
866
867        // Verify both transactions are returned
868        let tx = best.next().expect("Transaction should be returned");
869        assert_eq!(tx.transaction, tx_non_blob);
870
871        let tx = best.next().expect("Transaction should be returned");
872        assert_eq!(tx.transaction, tx_blob);
873
874        // Ensure no more transactions are left
875        assert!(best.next().is_none());
876    }
877
878    #[test]
879    fn test_best_transactions_with_skipping_blobs() {
880        // Tests the skip_blobs functionality to ensure blob transactions are skipped.
881        let mut pool = PendingPool::new(MockOrdering::default());
882        let mut f = MockTransactionFactory::default();
883
884        // Add a blob transaction
885        let tx_blob = MockTransaction::eip4844().rng_hash().with_nonce(0).with_blob_fee(100);
886        let valid_blob_tx = f.validated(tx_blob);
887        pool.add_transaction(Arc::new(valid_blob_tx), 0);
888
889        // Add a non-blob transaction
890        let tx_non_blob = MockTransaction::eip1559().rng_hash().with_nonce(1).with_max_fee(200);
891        let valid_non_blob_tx = f.validated(tx_non_blob.clone());
892        pool.add_transaction(Arc::new(valid_non_blob_tx), 0);
893
894        let mut best = pool.best();
895        best.skip_blobs();
896
897        // Only the non-blob transaction should be returned
898        let tx = best.next().expect("Transaction should be returned");
899        assert_eq!(tx.transaction, tx_non_blob);
900
901        // Ensure no more transactions are left
902        assert!(best.next().is_none());
903    }
904
905    #[test]
906    fn test_best_transactions_no_updates() {
907        // Tests the no_updates functionality to ensure it properly clears the
908        // new_transaction_receiver.
909        let mut pool = PendingPool::new(MockOrdering::default());
910        let mut f = MockTransactionFactory::default();
911
912        // Add a transaction
913        let tx = MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(100);
914        let valid_tx = f.validated(tx);
915        pool.add_transaction(Arc::new(valid_tx), 0);
916
917        let mut best = pool.best();
918
919        // Use a broadcast channel for transaction updates
920        let (_tx_sender, tx_receiver) =
921            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
922        best.new_transaction_receiver = Some(tx_receiver);
923
924        // Ensure receiver is set
925        assert!(best.new_transaction_receiver.is_some());
926
927        // Call no_updates to clear the receiver
928        best.no_updates();
929
930        // Ensure receiver is cleared
931        assert!(best.new_transaction_receiver.is_none());
932    }
933
934    // TODO: Same nonce test
935}