reth_transaction_pool/pool/
best.rs

1use crate::{
2    error::{Eip4844PoolTransactionError, InvalidPoolTransactionError},
3    identifier::{SenderId, TransactionId},
4    pool::pending::PendingTransaction,
5    PoolTransaction, Priority, TransactionOrdering, ValidPoolTransaction,
6};
7use alloy_consensus::Transaction;
8use alloy_eips::Typed2718;
9use alloy_primitives::Address;
10use core::fmt;
11use reth_primitives_traits::transaction::error::InvalidTransactionError;
12use std::{
13    collections::{BTreeMap, BTreeSet, HashSet, VecDeque},
14    sync::Arc,
15};
16use tokio::sync::broadcast::{error::TryRecvError, Receiver};
17use tracing::debug;
18
19/// An iterator that returns transactions that can be executed on the current state (*best*
20/// transactions).
21///
22/// This is a wrapper around [`BestTransactions`] that also enforces a specific basefee.
23///
24/// This iterator guarantees that all transaction it returns satisfy both the base fee and blob fee!
25pub(crate) struct BestTransactionsWithFees<T: TransactionOrdering> {
26    pub(crate) best: BestTransactions<T>,
27    pub(crate) base_fee: u64,
28    pub(crate) base_fee_per_blob_gas: u64,
29}
30
31impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactionsWithFees<T> {
32    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
33        BestTransactions::mark_invalid(&mut self.best, tx, kind)
34    }
35
36    fn no_updates(&mut self) {
37        self.best.no_updates()
38    }
39
40    fn skip_blobs(&mut self) {
41        self.set_skip_blobs(true)
42    }
43
44    fn set_skip_blobs(&mut self, skip_blobs: bool) {
45        self.best.set_skip_blobs(skip_blobs)
46    }
47}
48
49impl<T: TransactionOrdering> Iterator for BestTransactionsWithFees<T> {
50    type Item = Arc<ValidPoolTransaction<T::Transaction>>;
51
52    fn next(&mut self) -> Option<Self::Item> {
53        // find the next transaction that satisfies the base fee
54        loop {
55            let best = Iterator::next(&mut self.best)?;
56            // If both the base fee and blob fee (if applicable for EIP-4844) are satisfied, return
57            // the transaction
58            if best.transaction.max_fee_per_gas() >= self.base_fee as u128 &&
59                best.transaction
60                    .max_fee_per_blob_gas()
61                    .is_none_or(|fee| fee >= self.base_fee_per_blob_gas as u128)
62            {
63                return Some(best);
64            }
65            crate::traits::BestTransactions::mark_invalid(
66                self,
67                &best,
68                InvalidPoolTransactionError::Underpriced,
69            );
70        }
71    }
72}
73
74/// An iterator that returns transactions that can be executed on the current state (*best*
75/// transactions).
76///
77/// The [`PendingPool`](crate::pool::pending::PendingPool) contains transactions that *could* all
78/// be executed on the current state, but only yields transactions that are ready to be executed
79/// now. While it contains all gapless transactions of a sender, it _always_ only returns the
80/// transaction with the current on chain nonce.
81#[derive(Debug)]
82pub struct BestTransactions<T: TransactionOrdering> {
83    /// Contains a copy of _all_ transactions of the pending pool at the point in time this
84    /// iterator was created.
85    pub(crate) all: BTreeMap<TransactionId, PendingTransaction<T>>,
86    /// Transactions that can be executed right away: these have the expected nonce.
87    ///
88    /// Once an `independent` transaction with the nonce `N` is returned, it unlocks `N+1`, which
89    /// then can be moved from the `all` set to the `independent` set.
90    pub(crate) independent: BTreeSet<PendingTransaction<T>>,
91    /// There might be the case where a yielded transactions is invalid, this will track it.
92    pub(crate) invalid: HashSet<SenderId>,
93    /// Used to receive any new pending transactions that have been added to the pool after this
94    /// iterator was static filtered
95    ///
96    /// These new pending transactions are inserted into this iterator's pool before yielding the
97    /// next value
98    pub(crate) new_transaction_receiver: Option<Receiver<PendingTransaction<T>>>,
99    /// The priority value of most recently yielded transaction.
100    ///
101    /// This is required if we new pending transactions are fed in while it yields new values.
102    pub(crate) last_priority: Option<Priority<T::PriorityValue>>,
103    /// Flag to control whether to skip blob transactions (EIP4844).
104    pub(crate) skip_blobs: bool,
105}
106
107impl<T: TransactionOrdering> BestTransactions<T> {
108    /// Mark the transaction and it's descendants as invalid.
109    pub(crate) fn mark_invalid(
110        &mut self,
111        tx: &Arc<ValidPoolTransaction<T::Transaction>>,
112        _kind: InvalidPoolTransactionError,
113    ) {
114        self.invalid.insert(tx.sender_id());
115    }
116
117    /// Returns the ancestor the given transaction, the transaction with `nonce - 1`.
118    ///
119    /// Note: for a transaction with nonce higher than the current on chain nonce this will always
120    /// return an ancestor since all transaction in this pool are gapless.
121    pub(crate) fn ancestor(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
122        self.all.get(&id.unchecked_ancestor()?)
123    }
124
125    /// Non-blocking read on the new pending transactions subscription channel
126    fn try_recv(&mut self) -> Option<PendingTransaction<T>> {
127        loop {
128            match self.new_transaction_receiver.as_mut()?.try_recv() {
129                Ok(tx) => {
130                    if let Some(last_priority) = &self.last_priority {
131                        if &tx.priority > last_priority {
132                            // we skip transactions if we already yielded a transaction with lower
133                            // priority
134                            return None
135                        }
136                    }
137                    return Some(tx)
138                }
139                // note TryRecvError::Lagged can be returned here, which is an error that attempts
140                // to correct itself on consecutive try_recv() attempts
141
142                // the cost of ignoring this error is allowing old transactions to get
143                // overwritten after the chan buffer size is met
144                Err(TryRecvError::Lagged(_)) => {
145                    // Handle the case where the receiver lagged too far behind.
146                    // `num_skipped` indicates the number of messages that were skipped.
147                }
148
149                // this case is still better than the existing iterator behavior where no new
150                // pending txs are surfaced to consumers
151                Err(_) => return None,
152            }
153        }
154    }
155
156    /// Removes the currently best independent transaction from the independent set and the total
157    /// set.
158    fn pop_best(&mut self) -> Option<PendingTransaction<T>> {
159        self.independent.pop_last().inspect(|best| {
160            self.all.remove(best.transaction.id());
161        })
162    }
163
164    /// Checks for new transactions that have come into the `PendingPool` after this iterator was
165    /// created and inserts them
166    fn add_new_transactions(&mut self) {
167        while let Some(pending_tx) = self.try_recv() {
168            //  same logic as PendingPool::add_transaction/PendingPool::best_with_unlocked
169            let tx_id = *pending_tx.transaction.id();
170            if self.ancestor(&tx_id).is_none() {
171                self.independent.insert(pending_tx.clone());
172            }
173            self.all.insert(tx_id, pending_tx);
174        }
175    }
176}
177
178impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactions<T> {
179    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
180        Self::mark_invalid(self, tx, kind)
181    }
182
183    fn no_updates(&mut self) {
184        self.new_transaction_receiver.take();
185        self.last_priority.take();
186    }
187
188    fn skip_blobs(&mut self) {
189        self.set_skip_blobs(true);
190    }
191
192    fn set_skip_blobs(&mut self, skip_blobs: bool) {
193        self.skip_blobs = skip_blobs;
194    }
195}
196
197impl<T: TransactionOrdering> Iterator for BestTransactions<T> {
198    type Item = Arc<ValidPoolTransaction<T::Transaction>>;
199
200    fn next(&mut self) -> Option<Self::Item> {
201        loop {
202            self.add_new_transactions();
203            // Remove the next independent tx with the highest priority
204            let best = self.pop_best()?;
205            let sender_id = best.transaction.sender_id();
206
207            // skip transactions for which sender was marked as invalid
208            if self.invalid.contains(&sender_id) {
209                debug!(
210                    target: "txpool",
211                    "[{:?}] skipping invalid transaction",
212                    best.transaction.hash()
213                );
214                continue
215            }
216
217            // Insert transactions that just got unlocked.
218            if let Some(unlocked) = self.all.get(&best.unlocks()) {
219                self.independent.insert(unlocked.clone());
220            }
221
222            if self.skip_blobs && best.transaction.transaction.is_eip4844() {
223                // blobs should be skipped, marking them as invalid will ensure that no dependent
224                // transactions are returned
225                self.mark_invalid(
226                    &best.transaction,
227                    InvalidPoolTransactionError::Eip4844(
228                        Eip4844PoolTransactionError::NoEip4844Blobs,
229                    ),
230                )
231            } else {
232                if self.new_transaction_receiver.is_some() {
233                    self.last_priority = Some(best.priority.clone())
234                }
235                return Some(best.transaction)
236            }
237        }
238    }
239}
240
241/// A [`BestTransactions`](crate::traits::BestTransactions) implementation that filters the
242/// transactions of iter with predicate.
243///
244/// Filter out transactions are marked as invalid:
245/// [`BestTransactions::mark_invalid`](crate::traits::BestTransactions::mark_invalid).
246pub struct BestTransactionFilter<I, P> {
247    pub(crate) best: I,
248    pub(crate) predicate: P,
249}
250
251impl<I, P> BestTransactionFilter<I, P> {
252    /// Create a new [`BestTransactionFilter`] with the given predicate.
253    pub const fn new(best: I, predicate: P) -> Self {
254        Self { best, predicate }
255    }
256}
257
258impl<I, P> Iterator for BestTransactionFilter<I, P>
259where
260    I: crate::traits::BestTransactions,
261    P: FnMut(&<I as Iterator>::Item) -> bool,
262{
263    type Item = <I as Iterator>::Item;
264
265    fn next(&mut self) -> Option<Self::Item> {
266        loop {
267            let best = self.best.next()?;
268            if (self.predicate)(&best) {
269                return Some(best)
270            }
271            self.best.mark_invalid(
272                &best,
273                InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
274            );
275        }
276    }
277}
278
279impl<I, P> crate::traits::BestTransactions for BestTransactionFilter<I, P>
280where
281    I: crate::traits::BestTransactions,
282    P: FnMut(&<I as Iterator>::Item) -> bool + Send,
283{
284    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
285        crate::traits::BestTransactions::mark_invalid(&mut self.best, tx, kind)
286    }
287
288    fn no_updates(&mut self) {
289        self.best.no_updates()
290    }
291
292    fn skip_blobs(&mut self) {
293        self.set_skip_blobs(true)
294    }
295
296    fn set_skip_blobs(&mut self, skip_blobs: bool) {
297        self.best.set_skip_blobs(skip_blobs)
298    }
299}
300
301impl<I: fmt::Debug, P> fmt::Debug for BestTransactionFilter<I, P> {
302    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
303        f.debug_struct("BestTransactionFilter").field("best", &self.best).finish()
304    }
305}
306
307/// Wrapper over [`crate::traits::BestTransactions`] that prioritizes transactions of certain
308/// senders capping total gas used by such transactions.
309#[derive(Debug)]
310pub struct BestTransactionsWithPrioritizedSenders<I: Iterator> {
311    /// Inner iterator
312    inner: I,
313    /// A set of senders which transactions should be prioritized
314    prioritized_senders: HashSet<Address>,
315    /// Maximum total gas limit of prioritized transactions
316    max_prioritized_gas: u64,
317    /// Buffer with transactions that are not being prioritized. Those will be the first to be
318    /// included after the prioritized transactions
319    buffer: VecDeque<I::Item>,
320    /// Tracker of total gas limit of prioritized transactions. Once it reaches
321    /// `max_prioritized_gas` no more transactions will be prioritized
322    prioritized_gas: u64,
323}
324
325impl<I: Iterator> BestTransactionsWithPrioritizedSenders<I> {
326    /// Constructs a new [`BestTransactionsWithPrioritizedSenders`].
327    pub fn new(prioritized_senders: HashSet<Address>, max_prioritized_gas: u64, inner: I) -> Self {
328        Self {
329            inner,
330            prioritized_senders,
331            max_prioritized_gas,
332            buffer: Default::default(),
333            prioritized_gas: Default::default(),
334        }
335    }
336}
337
338impl<I, T> Iterator for BestTransactionsWithPrioritizedSenders<I>
339where
340    I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
341    T: PoolTransaction,
342{
343    type Item = <I as Iterator>::Item;
344
345    fn next(&mut self) -> Option<Self::Item> {
346        // If we have space, try prioritizing transactions
347        if self.prioritized_gas < self.max_prioritized_gas {
348            for item in &mut self.inner {
349                if self.prioritized_senders.contains(&item.transaction.sender()) &&
350                    self.prioritized_gas + item.transaction.gas_limit() <=
351                        self.max_prioritized_gas
352                {
353                    self.prioritized_gas += item.transaction.gas_limit();
354                    return Some(item)
355                }
356                self.buffer.push_back(item);
357            }
358        }
359
360        if let Some(item) = self.buffer.pop_front() {
361            Some(item)
362        } else {
363            self.inner.next()
364        }
365    }
366}
367
368impl<I, T> crate::traits::BestTransactions for BestTransactionsWithPrioritizedSenders<I>
369where
370    I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
371    T: PoolTransaction,
372{
373    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
374        self.inner.mark_invalid(tx, kind)
375    }
376
377    fn no_updates(&mut self) {
378        self.inner.no_updates()
379    }
380
381    fn set_skip_blobs(&mut self, skip_blobs: bool) {
382        if skip_blobs {
383            self.buffer.retain(|tx| !tx.transaction.is_eip4844())
384        }
385        self.inner.set_skip_blobs(skip_blobs)
386    }
387}
388
389#[cfg(test)]
390mod tests {
391    use super::*;
392    use crate::{
393        pool::pending::PendingPool,
394        test_utils::{MockOrdering, MockTransaction, MockTransactionFactory},
395        BestTransactions, Priority,
396    };
397
398    #[test]
399    fn test_best_iter() {
400        let mut pool = PendingPool::new(MockOrdering::default());
401        let mut f = MockTransactionFactory::default();
402
403        let num_tx = 10;
404        // insert 10 gapless tx
405        let tx = MockTransaction::eip1559();
406        for nonce in 0..num_tx {
407            let tx = tx.clone().rng_hash().with_nonce(nonce);
408            let valid_tx = f.validated(tx);
409            pool.add_transaction(Arc::new(valid_tx), 0);
410        }
411
412        let mut best = pool.best();
413        assert_eq!(best.all.len(), num_tx as usize);
414        assert_eq!(best.independent.len(), 1);
415
416        // check tx are returned in order
417        for nonce in 0..num_tx {
418            assert_eq!(best.independent.len(), 1);
419            let tx = best.next().unwrap();
420            assert_eq!(tx.nonce(), nonce);
421        }
422    }
423
424    #[test]
425    fn test_best_iter_invalid() {
426        let mut pool = PendingPool::new(MockOrdering::default());
427        let mut f = MockTransactionFactory::default();
428
429        let num_tx = 10;
430        // insert 10 gapless tx
431        let tx = MockTransaction::eip1559();
432        for nonce in 0..num_tx {
433            let tx = tx.clone().rng_hash().with_nonce(nonce);
434            let valid_tx = f.validated(tx);
435            pool.add_transaction(Arc::new(valid_tx), 0);
436        }
437
438        let mut best = pool.best();
439
440        // mark the first tx as invalid
441        let invalid = best.independent.iter().next().unwrap();
442        best.mark_invalid(
443            &invalid.transaction.clone(),
444            InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
445        );
446
447        // iterator is empty
448        assert!(best.next().is_none());
449    }
450
451    #[test]
452    fn test_best_transactions_iter_invalid() {
453        let mut pool = PendingPool::new(MockOrdering::default());
454        let mut f = MockTransactionFactory::default();
455
456        let num_tx = 10;
457        // insert 10 gapless tx
458        let tx = MockTransaction::eip1559();
459        for nonce in 0..num_tx {
460            let tx = tx.clone().rng_hash().with_nonce(nonce);
461            let valid_tx = f.validated(tx);
462            pool.add_transaction(Arc::new(valid_tx), 0);
463        }
464
465        let mut best: Box<
466            dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<MockTransaction>>>,
467        > = Box::new(pool.best());
468
469        let tx = Iterator::next(&mut best).unwrap();
470        crate::traits::BestTransactions::mark_invalid(
471            &mut *best,
472            &tx,
473            InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
474        );
475        assert!(Iterator::next(&mut best).is_none());
476    }
477
478    #[test]
479    fn test_best_with_fees_iter_base_fee_satisfied() {
480        let mut pool = PendingPool::new(MockOrdering::default());
481        let mut f = MockTransactionFactory::default();
482
483        let num_tx = 5;
484        let base_fee: u64 = 10;
485        let base_fee_per_blob_gas: u64 = 15;
486
487        // Insert transactions with a max_fee_per_gas greater than or equal to the base fee
488        // Without blob fee
489        for nonce in 0..num_tx {
490            let tx = MockTransaction::eip1559()
491                .rng_hash()
492                .with_nonce(nonce)
493                .with_max_fee(base_fee as u128 + 5);
494            let valid_tx = f.validated(tx);
495            pool.add_transaction(Arc::new(valid_tx), 0);
496        }
497
498        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
499
500        for nonce in 0..num_tx {
501            let tx = best.next().expect("Transaction should be returned");
502            assert_eq!(tx.nonce(), nonce);
503            assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
504        }
505    }
506
507    #[test]
508    fn test_best_with_fees_iter_base_fee_violated() {
509        let mut pool = PendingPool::new(MockOrdering::default());
510        let mut f = MockTransactionFactory::default();
511
512        let num_tx = 5;
513        let base_fee: u64 = 20;
514        let base_fee_per_blob_gas: u64 = 15;
515
516        // Insert transactions with a max_fee_per_gas less than the base fee
517        for nonce in 0..num_tx {
518            let tx = MockTransaction::eip1559()
519                .rng_hash()
520                .with_nonce(nonce)
521                .with_max_fee(base_fee as u128 - 5);
522            let valid_tx = f.validated(tx);
523            pool.add_transaction(Arc::new(valid_tx), 0);
524        }
525
526        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
527
528        // No transaction should be returned since all violate the base fee
529        assert!(best.next().is_none());
530    }
531
532    #[test]
533    fn test_best_with_fees_iter_blob_fee_satisfied() {
534        let mut pool = PendingPool::new(MockOrdering::default());
535        let mut f = MockTransactionFactory::default();
536
537        let num_tx = 5;
538        let base_fee: u64 = 10;
539        let base_fee_per_blob_gas: u64 = 20;
540
541        // Insert transactions with a max_fee_per_blob_gas greater than or equal to the base fee per
542        // blob gas
543        for nonce in 0..num_tx {
544            let tx = MockTransaction::eip4844()
545                .rng_hash()
546                .with_nonce(nonce)
547                .with_max_fee(base_fee as u128 + 5)
548                .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
549            let valid_tx = f.validated(tx);
550            pool.add_transaction(Arc::new(valid_tx), 0);
551        }
552
553        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
554
555        // All transactions should be returned in order since they satisfy both base fee and blob
556        // fee
557        for nonce in 0..num_tx {
558            let tx = best.next().expect("Transaction should be returned");
559            assert_eq!(tx.nonce(), nonce);
560            assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
561            assert!(
562                tx.transaction.max_fee_per_blob_gas().unwrap() >= base_fee_per_blob_gas as u128
563            );
564        }
565
566        // No more transactions should be returned
567        assert!(best.next().is_none());
568    }
569
570    #[test]
571    fn test_best_with_fees_iter_blob_fee_violated() {
572        let mut pool = PendingPool::new(MockOrdering::default());
573        let mut f = MockTransactionFactory::default();
574
575        let num_tx = 5;
576        let base_fee: u64 = 10;
577        let base_fee_per_blob_gas: u64 = 20;
578
579        // Insert transactions with a max_fee_per_blob_gas less than the base fee per blob gas
580        for nonce in 0..num_tx {
581            let tx = MockTransaction::eip4844()
582                .rng_hash()
583                .with_nonce(nonce)
584                .with_max_fee(base_fee as u128 + 5)
585                .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
586            let valid_tx = f.validated(tx);
587            pool.add_transaction(Arc::new(valid_tx), 0);
588        }
589
590        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
591
592        // No transaction should be returned since all violate the blob fee
593        assert!(best.next().is_none());
594    }
595
596    #[test]
597    fn test_best_with_fees_iter_mixed_fees() {
598        let mut pool = PendingPool::new(MockOrdering::default());
599        let mut f = MockTransactionFactory::default();
600
601        let base_fee: u64 = 10;
602        let base_fee_per_blob_gas: u64 = 20;
603
604        // Insert transactions with varying max_fee_per_gas and max_fee_per_blob_gas
605        let tx1 =
606            MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
607        let tx2 = MockTransaction::eip4844()
608            .rng_hash()
609            .with_nonce(1)
610            .with_max_fee(base_fee as u128 + 5)
611            .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
612        let tx3 = MockTransaction::eip4844()
613            .rng_hash()
614            .with_nonce(2)
615            .with_max_fee(base_fee as u128 + 5)
616            .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
617        let tx4 =
618            MockTransaction::eip1559().rng_hash().with_nonce(3).with_max_fee(base_fee as u128 - 5);
619
620        pool.add_transaction(Arc::new(f.validated(tx1.clone())), 0);
621        pool.add_transaction(Arc::new(f.validated(tx2.clone())), 0);
622        pool.add_transaction(Arc::new(f.validated(tx3)), 0);
623        pool.add_transaction(Arc::new(f.validated(tx4)), 0);
624
625        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
626
627        let expected_order = vec![tx1, tx2];
628        for expected_tx in expected_order {
629            let tx = best.next().expect("Transaction should be returned");
630            assert_eq!(tx.transaction, expected_tx);
631        }
632
633        // No more transactions should be returned
634        assert!(best.next().is_none());
635    }
636
637    #[test]
638    fn test_best_add_transaction_with_next_nonce() {
639        let mut pool = PendingPool::new(MockOrdering::default());
640        let mut f = MockTransactionFactory::default();
641
642        // Add 5 transactions with increasing nonces to the pool
643        let num_tx = 5;
644        let tx = MockTransaction::eip1559();
645        for nonce in 0..num_tx {
646            let tx = tx.clone().rng_hash().with_nonce(nonce);
647            let valid_tx = f.validated(tx);
648            pool.add_transaction(Arc::new(valid_tx), 0);
649        }
650
651        // Create a BestTransactions iterator from the pool
652        let mut best = pool.best();
653
654        // Use a broadcast channel for transaction updates
655        let (tx_sender, tx_receiver) =
656            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
657        best.new_transaction_receiver = Some(tx_receiver);
658
659        // Create a new transaction with nonce 5 and validate it
660        let new_tx = MockTransaction::eip1559().rng_hash().with_nonce(5);
661        let valid_new_tx = f.validated(new_tx);
662
663        // Send the new transaction through the broadcast channel
664        let pending_tx = PendingTransaction {
665            submission_id: 10,
666            transaction: Arc::new(valid_new_tx.clone()),
667            priority: Priority::Value(1000),
668        };
669        tx_sender.send(pending_tx.clone()).unwrap();
670
671        // Add new transactions to the iterator
672        best.add_new_transactions();
673
674        // Verify that the new transaction has been added to the 'all' map
675        assert_eq!(best.all.len(), 6);
676        assert!(best.all.contains_key(valid_new_tx.id()));
677
678        // Verify that the new transaction has been added to the 'independent' set
679        assert_eq!(best.independent.len(), 2);
680        assert!(best.independent.contains(&pending_tx));
681    }
682
683    #[test]
684    fn test_best_add_transaction_with_ancestor() {
685        // Initialize a new PendingPool with default MockOrdering and MockTransactionFactory
686        let mut pool = PendingPool::new(MockOrdering::default());
687        let mut f = MockTransactionFactory::default();
688
689        // Add 5 transactions with increasing nonces to the pool
690        let num_tx = 5;
691        let tx = MockTransaction::eip1559();
692        for nonce in 0..num_tx {
693            let tx = tx.clone().rng_hash().with_nonce(nonce);
694            let valid_tx = f.validated(tx);
695            pool.add_transaction(Arc::new(valid_tx), 0);
696        }
697
698        // Create a BestTransactions iterator from the pool
699        let mut best = pool.best();
700
701        // Use a broadcast channel for transaction updates
702        let (tx_sender, tx_receiver) =
703            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
704        best.new_transaction_receiver = Some(tx_receiver);
705
706        // Create a new transaction with nonce 5 and validate it
707        let base_tx1 = MockTransaction::eip1559().rng_hash().with_nonce(5);
708        let valid_new_tx1 = f.validated(base_tx1.clone());
709
710        // Send the new transaction through the broadcast channel
711        let pending_tx1 = PendingTransaction {
712            submission_id: 10,
713            transaction: Arc::new(valid_new_tx1.clone()),
714            priority: Priority::Value(1000),
715        };
716        tx_sender.send(pending_tx1.clone()).unwrap();
717
718        // Add new transactions to the iterator
719        best.add_new_transactions();
720
721        // Verify that the new transaction has been added to the 'all' map
722        assert_eq!(best.all.len(), 6);
723        assert!(best.all.contains_key(valid_new_tx1.id()));
724
725        // Verify that the new transaction has been added to the 'independent' set
726        assert_eq!(best.independent.len(), 2);
727        assert!(best.independent.contains(&pending_tx1));
728
729        // Attempt to add a new transaction with a different nonce (not a duplicate)
730        let base_tx2 = base_tx1.with_nonce(6);
731        let valid_new_tx2 = f.validated(base_tx2);
732
733        // Send the new transaction through the broadcast channel
734        let pending_tx2 = PendingTransaction {
735            submission_id: 11, // Different submission ID
736            transaction: Arc::new(valid_new_tx2.clone()),
737            priority: Priority::Value(1000),
738        };
739        tx_sender.send(pending_tx2.clone()).unwrap();
740
741        // Add new transactions to the iterator
742        best.add_new_transactions();
743
744        // Verify that the new transaction has been added to 'all'
745        assert_eq!(best.all.len(), 7);
746        assert!(best.all.contains_key(valid_new_tx2.id()));
747
748        // Verify that the new transaction has not been added to the 'independent' set
749        assert_eq!(best.independent.len(), 2);
750        assert!(!best.independent.contains(&pending_tx2));
751    }
752
753    #[test]
754    fn test_best_transactions_filter_trait_object() {
755        // Initialize a new PendingPool with default MockOrdering and MockTransactionFactory
756        let mut pool = PendingPool::new(MockOrdering::default());
757        let mut f = MockTransactionFactory::default();
758
759        // Add 5 transactions with increasing nonces to the pool
760        let num_tx = 5;
761        let tx = MockTransaction::eip1559();
762        for nonce in 0..num_tx {
763            let tx = tx.clone().rng_hash().with_nonce(nonce);
764            let valid_tx = f.validated(tx);
765            pool.add_transaction(Arc::new(valid_tx), 0);
766        }
767
768        // Create a trait object of BestTransactions iterator from the pool
769        let best: Box<dyn crate::traits::BestTransactions<Item = _>> = Box::new(pool.best());
770
771        // Create a filter that only returns transactions with even nonces
772        let filter =
773            BestTransactionFilter::new(best, |tx: &Arc<ValidPoolTransaction<MockTransaction>>| {
774                tx.nonce().is_multiple_of(2)
775            });
776
777        // Verify that the filter only returns transactions with even nonces
778        for tx in filter {
779            assert_eq!(tx.nonce() % 2, 0);
780        }
781    }
782
783    #[test]
784    fn test_best_transactions_prioritized_senders() {
785        let mut pool = PendingPool::new(MockOrdering::default());
786        let mut f = MockTransactionFactory::default();
787
788        // Add 5 plain transactions from different senders with increasing gas price
789        for gas_price in 0..5 {
790            let tx = MockTransaction::eip1559().with_gas_price((gas_price + 1) * 10);
791            let valid_tx = f.validated(tx);
792            pool.add_transaction(Arc::new(valid_tx), 0);
793        }
794
795        // Add another transaction with 5 gas price that's going to be prioritized by sender
796        let prioritized_tx = MockTransaction::eip1559().with_gas_price(5).with_gas_limit(200);
797        let valid_prioritized_tx = f.validated(prioritized_tx.clone());
798        pool.add_transaction(Arc::new(valid_prioritized_tx), 0);
799
800        // Add another transaction with 3 gas price that should not be prioritized by sender because
801        // of gas limit.
802        let prioritized_tx2 = MockTransaction::eip1559().with_gas_price(3);
803        let valid_prioritized_tx2 = f.validated(prioritized_tx2.clone());
804        pool.add_transaction(Arc::new(valid_prioritized_tx2), 0);
805
806        let prioritized_senders =
807            HashSet::from([prioritized_tx.sender(), prioritized_tx2.sender()]);
808        let best =
809            BestTransactionsWithPrioritizedSenders::new(prioritized_senders, 200, pool.best());
810
811        // Verify that the prioritized transaction is returned first
812        // and the rest are returned in the reverse order of gas price
813        let mut iter = best.into_iter();
814        let top_of_block_tx = iter.next().unwrap();
815        assert_eq!(top_of_block_tx.max_fee_per_gas(), 5);
816        assert_eq!(top_of_block_tx.sender(), prioritized_tx.sender());
817        for gas_price in (0..5).rev() {
818            assert_eq!(iter.next().unwrap().max_fee_per_gas(), (gas_price + 1) * 10);
819        }
820
821        // Due to the gas limit, the transaction from second prioritized sender was not
822        // prioritized.
823        let top_of_block_tx2 = iter.next().unwrap();
824        assert_eq!(top_of_block_tx2.max_fee_per_gas(), 3);
825        assert_eq!(top_of_block_tx2.sender(), prioritized_tx2.sender());
826    }
827
828    #[test]
829    fn test_best_with_fees_iter_no_blob_fee_required() {
830        // Tests transactions without blob fees where base fees are checked.
831        let mut pool = PendingPool::new(MockOrdering::default());
832        let mut f = MockTransactionFactory::default();
833
834        let base_fee: u64 = 10;
835        let base_fee_per_blob_gas: u64 = 0; // No blob fee requirement
836
837        // Insert transactions with max_fee_per_gas above the base fee
838        for nonce in 0..5 {
839            let tx = MockTransaction::eip1559()
840                .rng_hash()
841                .with_nonce(nonce)
842                .with_max_fee(base_fee as u128 + 5);
843            let valid_tx = f.validated(tx);
844            pool.add_transaction(Arc::new(valid_tx), 0);
845        }
846
847        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
848
849        // All transactions should be returned as no blob fee requirement is imposed
850        for nonce in 0..5 {
851            let tx = best.next().expect("Transaction should be returned");
852            assert_eq!(tx.nonce(), nonce);
853        }
854
855        // Ensure no more transactions are left
856        assert!(best.next().is_none());
857    }
858
859    #[test]
860    fn test_best_with_fees_iter_mix_of_blob_and_non_blob_transactions() {
861        // Tests mixed scenarios with both blob and non-blob transactions.
862        let mut pool = PendingPool::new(MockOrdering::default());
863        let mut f = MockTransactionFactory::default();
864
865        let base_fee: u64 = 10;
866        let base_fee_per_blob_gas: u64 = 15;
867
868        // Add a non-blob transaction that satisfies the base fee
869        let tx_non_blob =
870            MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
871        pool.add_transaction(Arc::new(f.validated(tx_non_blob.clone())), 0);
872
873        // Add a blob transaction that satisfies both base fee and blob fee
874        let tx_blob = MockTransaction::eip4844()
875            .rng_hash()
876            .with_nonce(1)
877            .with_max_fee(base_fee as u128 + 5)
878            .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
879        pool.add_transaction(Arc::new(f.validated(tx_blob.clone())), 0);
880
881        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
882
883        // Verify both transactions are returned
884        let tx = best.next().expect("Transaction should be returned");
885        assert_eq!(tx.transaction, tx_non_blob);
886
887        let tx = best.next().expect("Transaction should be returned");
888        assert_eq!(tx.transaction, tx_blob);
889
890        // Ensure no more transactions are left
891        assert!(best.next().is_none());
892    }
893
894    #[test]
895    fn test_best_transactions_with_skipping_blobs() {
896        // Tests the skip_blobs functionality to ensure blob transactions are skipped.
897        let mut pool = PendingPool::new(MockOrdering::default());
898        let mut f = MockTransactionFactory::default();
899
900        // Add a blob transaction
901        let tx_blob = MockTransaction::eip4844().rng_hash().with_nonce(0).with_blob_fee(100);
902        let valid_blob_tx = f.validated(tx_blob);
903        pool.add_transaction(Arc::new(valid_blob_tx), 0);
904
905        // Add a non-blob transaction
906        let tx_non_blob = MockTransaction::eip1559().rng_hash().with_nonce(1).with_max_fee(200);
907        let valid_non_blob_tx = f.validated(tx_non_blob.clone());
908        pool.add_transaction(Arc::new(valid_non_blob_tx), 0);
909
910        let mut best = pool.best();
911        best.skip_blobs();
912
913        // Only the non-blob transaction should be returned
914        let tx = best.next().expect("Transaction should be returned");
915        assert_eq!(tx.transaction, tx_non_blob);
916
917        // Ensure no more transactions are left
918        assert!(best.next().is_none());
919    }
920
921    #[test]
922    fn test_best_transactions_no_updates() {
923        // Tests the no_updates functionality to ensure it properly clears the
924        // new_transaction_receiver.
925        let mut pool = PendingPool::new(MockOrdering::default());
926        let mut f = MockTransactionFactory::default();
927
928        // Add a transaction
929        let tx = MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(100);
930        let valid_tx = f.validated(tx);
931        pool.add_transaction(Arc::new(valid_tx), 0);
932
933        let mut best = pool.best();
934
935        // Use a broadcast channel for transaction updates
936        let (_tx_sender, tx_receiver) =
937            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
938        best.new_transaction_receiver = Some(tx_receiver);
939
940        // Ensure receiver is set
941        assert!(best.new_transaction_receiver.is_some());
942
943        // Call no_updates to clear the receiver
944        best.no_updates();
945
946        // Ensure receiver is cleared
947        assert!(best.new_transaction_receiver.is_none());
948    }
949
950    #[test]
951    fn test_best_update_transaction_priority() {
952        let mut pool = PendingPool::new(MockOrdering::default());
953        let mut f = MockTransactionFactory::default();
954
955        // Add 5 transactions with increasing nonces to the pool
956        let num_tx = 5;
957        let tx = MockTransaction::eip1559();
958        for nonce in 0..num_tx {
959            let tx = tx.clone().rng_hash().with_nonce(nonce);
960            let valid_tx = f.validated(tx);
961            pool.add_transaction(Arc::new(valid_tx), 0);
962        }
963
964        // Create a BestTransactions iterator from the pool
965        let mut best = pool.best();
966
967        // Use a broadcast channel for transaction updates
968        let (tx_sender, tx_receiver) =
969            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
970        best.new_transaction_receiver = Some(tx_receiver);
971
972        // yield one tx, effectively locking in the highest prio
973        let first = best.next().unwrap();
974
975        // Create a new transaction with nonce 5 and validate it
976        let new_higher_fee_tx = MockTransaction::eip1559().with_nonce(0);
977        let valid_new_higher_fee_tx = f.validated(new_higher_fee_tx);
978
979        // Send the new transaction through the broadcast channel
980        let pending_tx = PendingTransaction {
981            submission_id: 10,
982            transaction: Arc::new(valid_new_higher_fee_tx.clone()),
983            priority: Priority::Value(u128::MAX),
984        };
985        tx_sender.send(pending_tx).unwrap();
986
987        // ensure that the higher prio tx is skipped since we yielded a lower one
988        for tx in best {
989            assert_eq!(tx.sender_id(), first.sender_id());
990            assert_ne!(tx.sender_id(), valid_new_higher_fee_tx.sender_id());
991        }
992    }
993}