reth_transaction_pool/pool/
best.rs

1use crate::{
2    error::{Eip4844PoolTransactionError, InvalidPoolTransactionError},
3    identifier::{SenderId, TransactionId},
4    pool::pending::PendingTransaction,
5    PoolTransaction, TransactionOrdering, ValidPoolTransaction,
6};
7use alloy_consensus::Transaction;
8use alloy_eips::Typed2718;
9use alloy_primitives::Address;
10use core::fmt;
11use reth_primitives_traits::transaction::error::InvalidTransactionError;
12use std::{
13    collections::{BTreeMap, BTreeSet, HashSet, VecDeque},
14    sync::Arc,
15};
16use tokio::sync::broadcast::{error::TryRecvError, Receiver};
17use tracing::debug;
18
19/// An iterator that returns transactions that can be executed on the current state (*best*
20/// transactions).
21///
22/// This is a wrapper around [`BestTransactions`] that also enforces a specific basefee.
23///
24/// This iterator guarantees that all transaction it returns satisfy both the base fee and blob fee!
25pub(crate) struct BestTransactionsWithFees<T: TransactionOrdering> {
26    pub(crate) best: BestTransactions<T>,
27    pub(crate) base_fee: u64,
28    pub(crate) base_fee_per_blob_gas: u64,
29}
30
31impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactionsWithFees<T> {
32    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
33        BestTransactions::mark_invalid(&mut self.best, tx, kind)
34    }
35
36    fn no_updates(&mut self) {
37        self.best.no_updates()
38    }
39
40    fn skip_blobs(&mut self) {
41        self.set_skip_blobs(true)
42    }
43
44    fn set_skip_blobs(&mut self, skip_blobs: bool) {
45        self.best.set_skip_blobs(skip_blobs)
46    }
47}
48
49impl<T: TransactionOrdering> Iterator for BestTransactionsWithFees<T> {
50    type Item = Arc<ValidPoolTransaction<T::Transaction>>;
51
52    fn next(&mut self) -> Option<Self::Item> {
53        // find the next transaction that satisfies the base fee
54        loop {
55            let best = Iterator::next(&mut self.best)?;
56            // If both the base fee and blob fee (if applicable for EIP-4844) are satisfied, return
57            // the transaction
58            if best.transaction.max_fee_per_gas() >= self.base_fee as u128 &&
59                best.transaction
60                    .max_fee_per_blob_gas()
61                    .is_none_or(|fee| fee >= self.base_fee_per_blob_gas as u128)
62            {
63                return Some(best);
64            }
65            crate::traits::BestTransactions::mark_invalid(
66                self,
67                &best,
68                InvalidPoolTransactionError::Underpriced,
69            );
70        }
71    }
72}
73
74/// An iterator that returns transactions that can be executed on the current state (*best*
75/// transactions).
76///
77/// The [`PendingPool`](crate::pool::pending::PendingPool) contains transactions that *could* all
78/// be executed on the current state, but only yields transactions that are ready to be executed
79/// now. While it contains all gapless transactions of a sender, it _always_ only returns the
80/// transaction with the current on chain nonce.
81#[derive(Debug)]
82pub struct BestTransactions<T: TransactionOrdering> {
83    /// Contains a copy of _all_ transactions of the pending pool at the point in time this
84    /// iterator was created.
85    pub(crate) all: BTreeMap<TransactionId, PendingTransaction<T>>,
86    /// Transactions that can be executed right away: these have the expected nonce.
87    ///
88    /// Once an `independent` transaction with the nonce `N` is returned, it unlocks `N+1`, which
89    /// then can be moved from the `all` set to the `independent` set.
90    pub(crate) independent: BTreeSet<PendingTransaction<T>>,
91    /// There might be the case where a yielded transactions is invalid, this will track it.
92    pub(crate) invalid: HashSet<SenderId>,
93    /// Used to receive any new pending transactions that have been added to the pool after this
94    /// iterator was static fileted
95    ///
96    /// These new pending transactions are inserted into this iterator's pool before yielding the
97    /// next value
98    pub(crate) new_transaction_receiver: Option<Receiver<PendingTransaction<T>>>,
99    /// Flag to control whether to skip blob transactions (EIP4844).
100    pub(crate) skip_blobs: bool,
101}
102
103impl<T: TransactionOrdering> BestTransactions<T> {
104    /// Mark the transaction and it's descendants as invalid.
105    pub(crate) fn mark_invalid(
106        &mut self,
107        tx: &Arc<ValidPoolTransaction<T::Transaction>>,
108        _kind: InvalidPoolTransactionError,
109    ) {
110        self.invalid.insert(tx.sender_id());
111    }
112
113    /// Returns the ancestor the given transaction, the transaction with `nonce - 1`.
114    ///
115    /// Note: for a transaction with nonce higher than the current on chain nonce this will always
116    /// return an ancestor since all transaction in this pool are gapless.
117    pub(crate) fn ancestor(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
118        self.all.get(&id.unchecked_ancestor()?)
119    }
120
121    /// Non-blocking read on the new pending transactions subscription channel
122    fn try_recv(&mut self) -> Option<PendingTransaction<T>> {
123        loop {
124            match self.new_transaction_receiver.as_mut()?.try_recv() {
125                Ok(tx) => return Some(tx),
126                // note TryRecvError::Lagged can be returned here, which is an error that attempts
127                // to correct itself on consecutive try_recv() attempts
128
129                // the cost of ignoring this error is allowing old transactions to get
130                // overwritten after the chan buffer size is met
131                Err(TryRecvError::Lagged(_)) => {
132                    // Handle the case where the receiver lagged too far behind.
133                    // `num_skipped` indicates the number of messages that were skipped.
134                }
135
136                // this case is still better than the existing iterator behavior where no new
137                // pending txs are surfaced to consumers
138                Err(_) => return None,
139            }
140        }
141    }
142
143    /// Removes the currently best independent transaction from the independent set and the total
144    /// set.
145    fn pop_best(&mut self) -> Option<PendingTransaction<T>> {
146        self.independent.pop_last().inspect(|best| {
147            let removed = self.all.remove(best.transaction.id());
148            debug_assert!(removed.is_some(), "must be present in both sets");
149        })
150    }
151
152    /// Checks for new transactions that have come into the `PendingPool` after this iterator was
153    /// created and inserts them
154    fn add_new_transactions(&mut self) {
155        while let Some(pending_tx) = self.try_recv() {
156            //  same logic as PendingPool::add_transaction/PendingPool::best_with_unlocked
157            let tx_id = *pending_tx.transaction.id();
158            if self.ancestor(&tx_id).is_none() {
159                self.independent.insert(pending_tx.clone());
160            }
161            self.all.insert(tx_id, pending_tx);
162        }
163    }
164}
165
166impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactions<T> {
167    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
168        Self::mark_invalid(self, tx, kind)
169    }
170
171    fn no_updates(&mut self) {
172        self.new_transaction_receiver.take();
173    }
174
175    fn skip_blobs(&mut self) {
176        self.set_skip_blobs(true);
177    }
178
179    fn set_skip_blobs(&mut self, skip_blobs: bool) {
180        self.skip_blobs = skip_blobs;
181    }
182}
183
184impl<T: TransactionOrdering> Iterator for BestTransactions<T> {
185    type Item = Arc<ValidPoolTransaction<T::Transaction>>;
186
187    fn next(&mut self) -> Option<Self::Item> {
188        loop {
189            self.add_new_transactions();
190            // Remove the next independent tx with the highest priority
191            let best = self.pop_best()?;
192            let sender_id = best.transaction.sender_id();
193
194            // skip transactions for which sender was marked as invalid
195            if self.invalid.contains(&sender_id) {
196                debug!(
197                    target: "txpool",
198                    "[{:?}] skipping invalid transaction",
199                    best.transaction.hash()
200                );
201                continue
202            }
203
204            // Insert transactions that just got unlocked.
205            if let Some(unlocked) = self.all.get(&best.unlocks()) {
206                self.independent.insert(unlocked.clone());
207            }
208
209            if self.skip_blobs && best.transaction.transaction.is_eip4844() {
210                // blobs should be skipped, marking them as invalid will ensure that no dependent
211                // transactions are returned
212                self.mark_invalid(
213                    &best.transaction,
214                    InvalidPoolTransactionError::Eip4844(
215                        Eip4844PoolTransactionError::NoEip4844Blobs,
216                    ),
217                )
218            } else {
219                return Some(best.transaction)
220            }
221        }
222    }
223}
224
225/// A [`BestTransactions`](crate::traits::BestTransactions) implementation that filters the
226/// transactions of iter with predicate.
227///
228/// Filter out transactions are marked as invalid:
229/// [`BestTransactions::mark_invalid`](crate::traits::BestTransactions::mark_invalid).
230pub struct BestTransactionFilter<I, P> {
231    pub(crate) best: I,
232    pub(crate) predicate: P,
233}
234
235impl<I, P> BestTransactionFilter<I, P> {
236    /// Create a new [`BestTransactionFilter`] with the given predicate.
237    pub const fn new(best: I, predicate: P) -> Self {
238        Self { best, predicate }
239    }
240}
241
242impl<I, P> Iterator for BestTransactionFilter<I, P>
243where
244    I: crate::traits::BestTransactions,
245    P: FnMut(&<I as Iterator>::Item) -> bool,
246{
247    type Item = <I as Iterator>::Item;
248
249    fn next(&mut self) -> Option<Self::Item> {
250        loop {
251            let best = self.best.next()?;
252            if (self.predicate)(&best) {
253                return Some(best)
254            }
255            self.best.mark_invalid(
256                &best,
257                InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
258            );
259        }
260    }
261}
262
263impl<I, P> crate::traits::BestTransactions for BestTransactionFilter<I, P>
264where
265    I: crate::traits::BestTransactions,
266    P: FnMut(&<I as Iterator>::Item) -> bool + Send,
267{
268    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
269        crate::traits::BestTransactions::mark_invalid(&mut self.best, tx, kind)
270    }
271
272    fn no_updates(&mut self) {
273        self.best.no_updates()
274    }
275
276    fn skip_blobs(&mut self) {
277        self.set_skip_blobs(true)
278    }
279
280    fn set_skip_blobs(&mut self, skip_blobs: bool) {
281        self.best.set_skip_blobs(skip_blobs)
282    }
283}
284
285impl<I: fmt::Debug, P> fmt::Debug for BestTransactionFilter<I, P> {
286    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
287        f.debug_struct("BestTransactionFilter").field("best", &self.best).finish()
288    }
289}
290
291/// Wrapper over [`crate::traits::BestTransactions`] that prioritizes transactions of certain
292/// senders capping total gas used by such transactions.
293#[derive(Debug)]
294pub struct BestTransactionsWithPrioritizedSenders<I: Iterator> {
295    /// Inner iterator
296    inner: I,
297    /// A set of senders which transactions should be prioritized
298    prioritized_senders: HashSet<Address>,
299    /// Maximum total gas limit of prioritized transactions
300    max_prioritized_gas: u64,
301    /// Buffer with transactions that are not being prioritized. Those will be the first to be
302    /// included after the prioritized transactions
303    buffer: VecDeque<I::Item>,
304    /// Tracker of total gas limit of prioritized transactions. Once it reaches
305    /// `max_prioritized_gas` no more transactions will be prioritized
306    prioritized_gas: u64,
307}
308
309impl<I: Iterator> BestTransactionsWithPrioritizedSenders<I> {
310    /// Constructs a new [`BestTransactionsWithPrioritizedSenders`].
311    pub fn new(prioritized_senders: HashSet<Address>, max_prioritized_gas: u64, inner: I) -> Self {
312        Self {
313            inner,
314            prioritized_senders,
315            max_prioritized_gas,
316            buffer: Default::default(),
317            prioritized_gas: Default::default(),
318        }
319    }
320}
321
322impl<I, T> Iterator for BestTransactionsWithPrioritizedSenders<I>
323where
324    I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
325    T: PoolTransaction,
326{
327    type Item = <I as Iterator>::Item;
328
329    fn next(&mut self) -> Option<Self::Item> {
330        // If we have space, try prioritizing transactions
331        if self.prioritized_gas < self.max_prioritized_gas {
332            for item in &mut self.inner {
333                if self.prioritized_senders.contains(&item.transaction.sender()) &&
334                    self.prioritized_gas + item.transaction.gas_limit() <=
335                        self.max_prioritized_gas
336                {
337                    self.prioritized_gas += item.transaction.gas_limit();
338                    return Some(item)
339                }
340                self.buffer.push_back(item);
341            }
342        }
343
344        if let Some(item) = self.buffer.pop_front() {
345            Some(item)
346        } else {
347            self.inner.next()
348        }
349    }
350}
351
352impl<I, T> crate::traits::BestTransactions for BestTransactionsWithPrioritizedSenders<I>
353where
354    I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
355    T: PoolTransaction,
356{
357    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
358        self.inner.mark_invalid(tx, kind)
359    }
360
361    fn no_updates(&mut self) {
362        self.inner.no_updates()
363    }
364
365    fn set_skip_blobs(&mut self, skip_blobs: bool) {
366        if skip_blobs {
367            self.buffer.retain(|tx| !tx.transaction.is_eip4844())
368        }
369        self.inner.set_skip_blobs(skip_blobs)
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376    use crate::{
377        pool::pending::PendingPool,
378        test_utils::{MockOrdering, MockTransaction, MockTransactionFactory},
379        BestTransactions, Priority,
380    };
381    use alloy_primitives::U256;
382
383    #[test]
384    fn test_best_iter() {
385        let mut pool = PendingPool::new(MockOrdering::default());
386        let mut f = MockTransactionFactory::default();
387
388        let num_tx = 10;
389        // insert 10 gapless tx
390        let tx = MockTransaction::eip1559();
391        for nonce in 0..num_tx {
392            let tx = tx.clone().rng_hash().with_nonce(nonce);
393            let valid_tx = f.validated(tx);
394            pool.add_transaction(Arc::new(valid_tx), 0);
395        }
396
397        let mut best = pool.best();
398        assert_eq!(best.all.len(), num_tx as usize);
399        assert_eq!(best.independent.len(), 1);
400
401        // check tx are returned in order
402        for nonce in 0..num_tx {
403            assert_eq!(best.independent.len(), 1);
404            let tx = best.next().unwrap();
405            assert_eq!(tx.nonce(), nonce);
406        }
407    }
408
409    #[test]
410    fn test_best_iter_invalid() {
411        let mut pool = PendingPool::new(MockOrdering::default());
412        let mut f = MockTransactionFactory::default();
413
414        let num_tx = 10;
415        // insert 10 gapless tx
416        let tx = MockTransaction::eip1559();
417        for nonce in 0..num_tx {
418            let tx = tx.clone().rng_hash().with_nonce(nonce);
419            let valid_tx = f.validated(tx);
420            pool.add_transaction(Arc::new(valid_tx), 0);
421        }
422
423        let mut best = pool.best();
424
425        // mark the first tx as invalid
426        let invalid = best.independent.iter().next().unwrap();
427        best.mark_invalid(
428            &invalid.transaction.clone(),
429            InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
430        );
431
432        // iterator is empty
433        assert!(best.next().is_none());
434    }
435
436    #[test]
437    fn test_best_transactions_iter_invalid() {
438        let mut pool = PendingPool::new(MockOrdering::default());
439        let mut f = MockTransactionFactory::default();
440
441        let num_tx = 10;
442        // insert 10 gapless tx
443        let tx = MockTransaction::eip1559();
444        for nonce in 0..num_tx {
445            let tx = tx.clone().rng_hash().with_nonce(nonce);
446            let valid_tx = f.validated(tx);
447            pool.add_transaction(Arc::new(valid_tx), 0);
448        }
449
450        let mut best: Box<
451            dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<MockTransaction>>>,
452        > = Box::new(pool.best());
453
454        let tx = Iterator::next(&mut best).unwrap();
455        crate::traits::BestTransactions::mark_invalid(
456            &mut *best,
457            &tx,
458            InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
459        );
460        assert!(Iterator::next(&mut best).is_none());
461    }
462
463    #[test]
464    fn test_best_with_fees_iter_base_fee_satisfied() {
465        let mut pool = PendingPool::new(MockOrdering::default());
466        let mut f = MockTransactionFactory::default();
467
468        let num_tx = 5;
469        let base_fee: u64 = 10;
470        let base_fee_per_blob_gas: u64 = 15;
471
472        // Insert transactions with a max_fee_per_gas greater than or equal to the base fee
473        // Without blob fee
474        for nonce in 0..num_tx {
475            let tx = MockTransaction::eip1559()
476                .rng_hash()
477                .with_nonce(nonce)
478                .with_max_fee(base_fee as u128 + 5);
479            let valid_tx = f.validated(tx);
480            pool.add_transaction(Arc::new(valid_tx), 0);
481        }
482
483        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
484
485        for nonce in 0..num_tx {
486            let tx = best.next().expect("Transaction should be returned");
487            assert_eq!(tx.nonce(), nonce);
488            assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
489        }
490    }
491
492    #[test]
493    fn test_best_with_fees_iter_base_fee_violated() {
494        let mut pool = PendingPool::new(MockOrdering::default());
495        let mut f = MockTransactionFactory::default();
496
497        let num_tx = 5;
498        let base_fee: u64 = 20;
499        let base_fee_per_blob_gas: u64 = 15;
500
501        // Insert transactions with a max_fee_per_gas less than the base fee
502        for nonce in 0..num_tx {
503            let tx = MockTransaction::eip1559()
504                .rng_hash()
505                .with_nonce(nonce)
506                .with_max_fee(base_fee as u128 - 5);
507            let valid_tx = f.validated(tx);
508            pool.add_transaction(Arc::new(valid_tx), 0);
509        }
510
511        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
512
513        // No transaction should be returned since all violate the base fee
514        assert!(best.next().is_none());
515    }
516
517    #[test]
518    fn test_best_with_fees_iter_blob_fee_satisfied() {
519        let mut pool = PendingPool::new(MockOrdering::default());
520        let mut f = MockTransactionFactory::default();
521
522        let num_tx = 5;
523        let base_fee: u64 = 10;
524        let base_fee_per_blob_gas: u64 = 20;
525
526        // Insert transactions with a max_fee_per_blob_gas greater than or equal to the base fee per
527        // blob gas
528        for nonce in 0..num_tx {
529            let tx = MockTransaction::eip4844()
530                .rng_hash()
531                .with_nonce(nonce)
532                .with_max_fee(base_fee as u128 + 5)
533                .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
534            let valid_tx = f.validated(tx);
535            pool.add_transaction(Arc::new(valid_tx), 0);
536        }
537
538        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
539
540        // All transactions should be returned in order since they satisfy both base fee and blob
541        // fee
542        for nonce in 0..num_tx {
543            let tx = best.next().expect("Transaction should be returned");
544            assert_eq!(tx.nonce(), nonce);
545            assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
546            assert!(
547                tx.transaction.max_fee_per_blob_gas().unwrap() >= base_fee_per_blob_gas as u128
548            );
549        }
550
551        // No more transactions should be returned
552        assert!(best.next().is_none());
553    }
554
555    #[test]
556    fn test_best_with_fees_iter_blob_fee_violated() {
557        let mut pool = PendingPool::new(MockOrdering::default());
558        let mut f = MockTransactionFactory::default();
559
560        let num_tx = 5;
561        let base_fee: u64 = 10;
562        let base_fee_per_blob_gas: u64 = 20;
563
564        // Insert transactions with a max_fee_per_blob_gas less than the base fee per blob gas
565        for nonce in 0..num_tx {
566            let tx = MockTransaction::eip4844()
567                .rng_hash()
568                .with_nonce(nonce)
569                .with_max_fee(base_fee as u128 + 5)
570                .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
571            let valid_tx = f.validated(tx);
572            pool.add_transaction(Arc::new(valid_tx), 0);
573        }
574
575        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
576
577        // No transaction should be returned since all violate the blob fee
578        assert!(best.next().is_none());
579    }
580
581    #[test]
582    fn test_best_with_fees_iter_mixed_fees() {
583        let mut pool = PendingPool::new(MockOrdering::default());
584        let mut f = MockTransactionFactory::default();
585
586        let base_fee: u64 = 10;
587        let base_fee_per_blob_gas: u64 = 20;
588
589        // Insert transactions with varying max_fee_per_gas and max_fee_per_blob_gas
590        let tx1 =
591            MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
592        let tx2 = MockTransaction::eip4844()
593            .rng_hash()
594            .with_nonce(1)
595            .with_max_fee(base_fee as u128 + 5)
596            .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
597        let tx3 = MockTransaction::eip4844()
598            .rng_hash()
599            .with_nonce(2)
600            .with_max_fee(base_fee as u128 + 5)
601            .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
602        let tx4 =
603            MockTransaction::eip1559().rng_hash().with_nonce(3).with_max_fee(base_fee as u128 - 5);
604
605        pool.add_transaction(Arc::new(f.validated(tx1.clone())), 0);
606        pool.add_transaction(Arc::new(f.validated(tx2.clone())), 0);
607        pool.add_transaction(Arc::new(f.validated(tx3)), 0);
608        pool.add_transaction(Arc::new(f.validated(tx4)), 0);
609
610        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
611
612        let expected_order = vec![tx1, tx2];
613        for expected_tx in expected_order {
614            let tx = best.next().expect("Transaction should be returned");
615            assert_eq!(tx.transaction, expected_tx);
616        }
617
618        // No more transactions should be returned
619        assert!(best.next().is_none());
620    }
621
622    #[test]
623    fn test_best_add_transaction_with_next_nonce() {
624        let mut pool = PendingPool::new(MockOrdering::default());
625        let mut f = MockTransactionFactory::default();
626
627        // Add 5 transactions with increasing nonces to the pool
628        let num_tx = 5;
629        let tx = MockTransaction::eip1559();
630        for nonce in 0..num_tx {
631            let tx = tx.clone().rng_hash().with_nonce(nonce);
632            let valid_tx = f.validated(tx);
633            pool.add_transaction(Arc::new(valid_tx), 0);
634        }
635
636        // Create a BestTransactions iterator from the pool
637        let mut best = pool.best();
638
639        // Use a broadcast channel for transaction updates
640        let (tx_sender, tx_receiver) =
641            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
642        best.new_transaction_receiver = Some(tx_receiver);
643
644        // Create a new transaction with nonce 5 and validate it
645        let new_tx = MockTransaction::eip1559().rng_hash().with_nonce(5);
646        let valid_new_tx = f.validated(new_tx);
647
648        // Send the new transaction through the broadcast channel
649        let pending_tx = PendingTransaction {
650            submission_id: 10,
651            transaction: Arc::new(valid_new_tx.clone()),
652            priority: Priority::Value(U256::from(1000)),
653        };
654        tx_sender.send(pending_tx.clone()).unwrap();
655
656        // Add new transactions to the iterator
657        best.add_new_transactions();
658
659        // Verify that the new transaction has been added to the 'all' map
660        assert_eq!(best.all.len(), 6);
661        assert!(best.all.contains_key(valid_new_tx.id()));
662
663        // Verify that the new transaction has been added to the 'independent' set
664        assert_eq!(best.independent.len(), 2);
665        assert!(best.independent.contains(&pending_tx));
666    }
667
668    #[test]
669    fn test_best_add_transaction_with_ancestor() {
670        // Initialize a new PendingPool with default MockOrdering and MockTransactionFactory
671        let mut pool = PendingPool::new(MockOrdering::default());
672        let mut f = MockTransactionFactory::default();
673
674        // Add 5 transactions with increasing nonces to the pool
675        let num_tx = 5;
676        let tx = MockTransaction::eip1559();
677        for nonce in 0..num_tx {
678            let tx = tx.clone().rng_hash().with_nonce(nonce);
679            let valid_tx = f.validated(tx);
680            pool.add_transaction(Arc::new(valid_tx), 0);
681        }
682
683        // Create a BestTransactions iterator from the pool
684        let mut best = pool.best();
685
686        // Use a broadcast channel for transaction updates
687        let (tx_sender, tx_receiver) =
688            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
689        best.new_transaction_receiver = Some(tx_receiver);
690
691        // Create a new transaction with nonce 5 and validate it
692        let base_tx1 = MockTransaction::eip1559().rng_hash().with_nonce(5);
693        let valid_new_tx1 = f.validated(base_tx1.clone());
694
695        // Send the new transaction through the broadcast channel
696        let pending_tx1 = PendingTransaction {
697            submission_id: 10,
698            transaction: Arc::new(valid_new_tx1.clone()),
699            priority: Priority::Value(U256::from(1000)),
700        };
701        tx_sender.send(pending_tx1.clone()).unwrap();
702
703        // Add new transactions to the iterator
704        best.add_new_transactions();
705
706        // Verify that the new transaction has been added to the 'all' map
707        assert_eq!(best.all.len(), 6);
708        assert!(best.all.contains_key(valid_new_tx1.id()));
709
710        // Verify that the new transaction has been added to the 'independent' set
711        assert_eq!(best.independent.len(), 2);
712        assert!(best.independent.contains(&pending_tx1));
713
714        // Attempt to add a new transaction with a different nonce (not a duplicate)
715        let base_tx2 = base_tx1.with_nonce(6);
716        let valid_new_tx2 = f.validated(base_tx2);
717
718        // Send the new transaction through the broadcast channel
719        let pending_tx2 = PendingTransaction {
720            submission_id: 11, // Different submission ID
721            transaction: Arc::new(valid_new_tx2.clone()),
722            priority: Priority::Value(U256::from(1000)),
723        };
724        tx_sender.send(pending_tx2.clone()).unwrap();
725
726        // Add new transactions to the iterator
727        best.add_new_transactions();
728
729        // Verify that the new transaction has been added to 'all'
730        assert_eq!(best.all.len(), 7);
731        assert!(best.all.contains_key(valid_new_tx2.id()));
732
733        // Verify that the new transaction has not been added to the 'independent' set
734        assert_eq!(best.independent.len(), 2);
735        assert!(!best.independent.contains(&pending_tx2));
736    }
737
738    #[test]
739    fn test_best_transactions_filter_trait_object() {
740        // Initialize a new PendingPool with default MockOrdering and MockTransactionFactory
741        let mut pool = PendingPool::new(MockOrdering::default());
742        let mut f = MockTransactionFactory::default();
743
744        // Add 5 transactions with increasing nonces to the pool
745        let num_tx = 5;
746        let tx = MockTransaction::eip1559();
747        for nonce in 0..num_tx {
748            let tx = tx.clone().rng_hash().with_nonce(nonce);
749            let valid_tx = f.validated(tx);
750            pool.add_transaction(Arc::new(valid_tx), 0);
751        }
752
753        // Create a trait object of BestTransactions iterator from the pool
754        let best: Box<dyn crate::traits::BestTransactions<Item = _>> = Box::new(pool.best());
755
756        // Create a filter that only returns transactions with even nonces
757        let filter =
758            BestTransactionFilter::new(best, |tx: &Arc<ValidPoolTransaction<MockTransaction>>| {
759                tx.nonce() % 2 == 0
760            });
761
762        // Verify that the filter only returns transactions with even nonces
763        for tx in filter {
764            assert_eq!(tx.nonce() % 2, 0);
765        }
766    }
767
768    #[test]
769    fn test_best_transactions_prioritized_senders() {
770        let mut pool = PendingPool::new(MockOrdering::default());
771        let mut f = MockTransactionFactory::default();
772
773        // Add 5 plain transactions from different senders with increasing gas price
774        for gas_price in 0..5 {
775            let tx = MockTransaction::eip1559().with_gas_price(gas_price);
776            let valid_tx = f.validated(tx);
777            pool.add_transaction(Arc::new(valid_tx), 0);
778        }
779
780        // Add another transaction with 0 gas price that's going to be prioritized by sender
781        let prioritized_tx = MockTransaction::eip1559().with_gas_price(0);
782        let valid_prioritized_tx = f.validated(prioritized_tx.clone());
783        pool.add_transaction(Arc::new(valid_prioritized_tx), 0);
784
785        let prioritized_senders = HashSet::from([prioritized_tx.sender()]);
786        let best =
787            BestTransactionsWithPrioritizedSenders::new(prioritized_senders, 200, pool.best());
788
789        // Verify that the prioritized transaction is returned first
790        // and the rest are returned in the reverse order of gas price
791        let mut iter = best.into_iter();
792        let top_of_block_tx = iter.next().unwrap();
793        assert_eq!(top_of_block_tx.max_fee_per_gas(), 0);
794        assert_eq!(top_of_block_tx.sender(), prioritized_tx.sender());
795        for gas_price in (0..5).rev() {
796            assert_eq!(iter.next().unwrap().max_fee_per_gas(), gas_price);
797        }
798
799        // TODO: Test that gas limits for prioritized transactions are respected
800    }
801
802    #[test]
803    fn test_best_with_fees_iter_no_blob_fee_required() {
804        // Tests transactions without blob fees where base fees are checked.
805        let mut pool = PendingPool::new(MockOrdering::default());
806        let mut f = MockTransactionFactory::default();
807
808        let base_fee: u64 = 10;
809        let base_fee_per_blob_gas: u64 = 0; // No blob fee requirement
810
811        // Insert transactions with max_fee_per_gas above the base fee
812        for nonce in 0..5 {
813            let tx = MockTransaction::eip1559()
814                .rng_hash()
815                .with_nonce(nonce)
816                .with_max_fee(base_fee as u128 + 5);
817            let valid_tx = f.validated(tx);
818            pool.add_transaction(Arc::new(valid_tx), 0);
819        }
820
821        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
822
823        // All transactions should be returned as no blob fee requirement is imposed
824        for nonce in 0..5 {
825            let tx = best.next().expect("Transaction should be returned");
826            assert_eq!(tx.nonce(), nonce);
827        }
828
829        // Ensure no more transactions are left
830        assert!(best.next().is_none());
831    }
832
833    #[test]
834    fn test_best_with_fees_iter_mix_of_blob_and_non_blob_transactions() {
835        // Tests mixed scenarios with both blob and non-blob transactions.
836        let mut pool = PendingPool::new(MockOrdering::default());
837        let mut f = MockTransactionFactory::default();
838
839        let base_fee: u64 = 10;
840        let base_fee_per_blob_gas: u64 = 15;
841
842        // Add a non-blob transaction that satisfies the base fee
843        let tx_non_blob =
844            MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
845        pool.add_transaction(Arc::new(f.validated(tx_non_blob.clone())), 0);
846
847        // Add a blob transaction that satisfies both base fee and blob fee
848        let tx_blob = MockTransaction::eip4844()
849            .rng_hash()
850            .with_nonce(1)
851            .with_max_fee(base_fee as u128 + 5)
852            .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
853        pool.add_transaction(Arc::new(f.validated(tx_blob.clone())), 0);
854
855        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
856
857        // Verify both transactions are returned
858        let tx = best.next().expect("Transaction should be returned");
859        assert_eq!(tx.transaction, tx_non_blob);
860
861        let tx = best.next().expect("Transaction should be returned");
862        assert_eq!(tx.transaction, tx_blob);
863
864        // Ensure no more transactions are left
865        assert!(best.next().is_none());
866    }
867
868    #[test]
869    fn test_best_transactions_with_skipping_blobs() {
870        // Tests the skip_blobs functionality to ensure blob transactions are skipped.
871        let mut pool = PendingPool::new(MockOrdering::default());
872        let mut f = MockTransactionFactory::default();
873
874        // Add a blob transaction
875        let tx_blob = MockTransaction::eip4844().rng_hash().with_nonce(0).with_blob_fee(100);
876        let valid_blob_tx = f.validated(tx_blob);
877        pool.add_transaction(Arc::new(valid_blob_tx), 0);
878
879        // Add a non-blob transaction
880        let tx_non_blob = MockTransaction::eip1559().rng_hash().with_nonce(1).with_max_fee(200);
881        let valid_non_blob_tx = f.validated(tx_non_blob.clone());
882        pool.add_transaction(Arc::new(valid_non_blob_tx), 0);
883
884        let mut best = pool.best();
885        best.skip_blobs();
886
887        // Only the non-blob transaction should be returned
888        let tx = best.next().expect("Transaction should be returned");
889        assert_eq!(tx.transaction, tx_non_blob);
890
891        // Ensure no more transactions are left
892        assert!(best.next().is_none());
893    }
894
895    #[test]
896    fn test_best_transactions_no_updates() {
897        // Tests the no_updates functionality to ensure it properly clears the
898        // new_transaction_receiver.
899        let mut pool = PendingPool::new(MockOrdering::default());
900        let mut f = MockTransactionFactory::default();
901
902        // Add a transaction
903        let tx = MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(100);
904        let valid_tx = f.validated(tx);
905        pool.add_transaction(Arc::new(valid_tx), 0);
906
907        let mut best = pool.best();
908
909        // Use a broadcast channel for transaction updates
910        let (_tx_sender, tx_receiver) =
911            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
912        best.new_transaction_receiver = Some(tx_receiver);
913
914        // Ensure receiver is set
915        assert!(best.new_transaction_receiver.is_some());
916
917        // Call no_updates to clear the receiver
918        best.no_updates();
919
920        // Ensure receiver is cleared
921        assert!(best.new_transaction_receiver.is_none());
922    }
923
924    // TODO: Same nonce test
925}