reth_transaction_pool/pool/
best.rs

1use crate::{
2    error::{Eip4844PoolTransactionError, InvalidPoolTransactionError},
3    identifier::{SenderId, TransactionId},
4    pool::pending::PendingTransaction,
5    PoolTransaction, Priority, TransactionOrdering, ValidPoolTransaction,
6};
7use alloy_consensus::Transaction;
8use alloy_eips::Typed2718;
9use alloy_primitives::Address;
10use core::fmt;
11use reth_primitives_traits::transaction::error::InvalidTransactionError;
12use std::{
13    collections::{BTreeMap, BTreeSet, HashSet, VecDeque},
14    sync::Arc,
15};
16use tokio::sync::broadcast::{error::TryRecvError, Receiver};
17use tracing::debug;
18
19/// An iterator that returns transactions that can be executed on the current state (*best*
20/// transactions).
21///
22/// This is a wrapper around [`BestTransactions`] that also enforces a specific basefee.
23///
24/// This iterator guarantees that all transactions it returns satisfy both the base fee and blob
25/// fee!
26pub(crate) struct BestTransactionsWithFees<T: TransactionOrdering> {
27    pub(crate) best: BestTransactions<T>,
28    pub(crate) base_fee: u64,
29    pub(crate) base_fee_per_blob_gas: u64,
30}
31
32impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactionsWithFees<T> {
33    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
34        BestTransactions::mark_invalid(&mut self.best, tx, kind)
35    }
36
37    fn no_updates(&mut self) {
38        self.best.no_updates()
39    }
40
41    fn skip_blobs(&mut self) {
42        self.set_skip_blobs(true)
43    }
44
45    fn set_skip_blobs(&mut self, skip_blobs: bool) {
46        self.best.set_skip_blobs(skip_blobs)
47    }
48}
49
50impl<T: TransactionOrdering> Iterator for BestTransactionsWithFees<T> {
51    type Item = Arc<ValidPoolTransaction<T::Transaction>>;
52
53    fn next(&mut self) -> Option<Self::Item> {
54        // find the next transaction that satisfies the base fee
55        loop {
56            let best = Iterator::next(&mut self.best)?;
57            // If both the base fee and blob fee (if applicable for EIP-4844) are satisfied, return
58            // the transaction
59            if best.transaction.max_fee_per_gas() >= self.base_fee as u128 &&
60                best.transaction
61                    .max_fee_per_blob_gas()
62                    .is_none_or(|fee| fee >= self.base_fee_per_blob_gas as u128)
63            {
64                return Some(best);
65            }
66            crate::traits::BestTransactions::mark_invalid(
67                self,
68                &best,
69                InvalidPoolTransactionError::Underpriced,
70            );
71        }
72    }
73}
74
75/// An iterator that returns transactions that can be executed on the current state (*best*
76/// transactions).
77///
78/// The [`PendingPool`](crate::pool::pending::PendingPool) contains transactions that *could* all
79/// be executed on the current state, but only yields transactions that are ready to be executed
80/// now. While it contains all gapless transactions of a sender, it _always_ only returns the
81/// transaction with the current on chain nonce.
82#[derive(Debug)]
83pub struct BestTransactions<T: TransactionOrdering> {
84    /// Contains a copy of _all_ transactions of the pending pool at the point in time this
85    /// iterator was created.
86    pub(crate) all: BTreeMap<TransactionId, PendingTransaction<T>>,
87    /// Transactions that can be executed right away: these have the expected nonce.
88    ///
89    /// Once an `independent` transaction with the nonce `N` is returned, it unlocks `N+1`, which
90    /// then can be moved from the `all` set to the `independent` set.
91    pub(crate) independent: BTreeSet<PendingTransaction<T>>,
92    /// There might be the case where a yielded transactions is invalid, this will track it.
93    pub(crate) invalid: HashSet<SenderId>,
94    /// Used to receive any new pending transactions that have been added to the pool after this
95    /// iterator was static filtered
96    ///
97    /// These new pending transactions are inserted into this iterator's pool before yielding the
98    /// next value
99    pub(crate) new_transaction_receiver: Option<Receiver<PendingTransaction<T>>>,
100    /// The priority value of most recently yielded transaction.
101    ///
102    /// This is required if new pending transactions are fed in while it yields new values.
103    pub(crate) last_priority: Option<Priority<T::PriorityValue>>,
104    /// Flag to control whether to skip blob transactions (EIP4844).
105    pub(crate) skip_blobs: bool,
106}
107
108impl<T: TransactionOrdering> BestTransactions<T> {
109    /// Mark the transaction and its descendants as invalid.
110    pub(crate) fn mark_invalid(
111        &mut self,
112        tx: &Arc<ValidPoolTransaction<T::Transaction>>,
113        _kind: InvalidPoolTransactionError,
114    ) {
115        self.invalid.insert(tx.sender_id());
116    }
117
118    /// Returns the ancestor the given transaction, the transaction with `nonce - 1`.
119    ///
120    /// Note: for a transaction with nonce higher than the current on chain nonce this will always
121    /// return an ancestor since all transactions in this pool are gapless.
122    pub(crate) fn ancestor(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
123        self.all.get(&id.unchecked_ancestor()?)
124    }
125
126    /// Non-blocking read on the new pending transactions subscription channel
127    fn try_recv(&mut self) -> Option<PendingTransaction<T>> {
128        loop {
129            match self.new_transaction_receiver.as_mut()?.try_recv() {
130                Ok(tx) => {
131                    if let Some(last_priority) = &self.last_priority &&
132                        &tx.priority > last_priority
133                    {
134                        // we skip transactions if we already yielded a transaction with lower
135                        // priority
136                        return None
137                    }
138                    return Some(tx)
139                }
140                // note TryRecvError::Lagged can be returned here, which is an error that attempts
141                // to correct itself on consecutive try_recv() attempts
142
143                // the cost of ignoring this error is allowing old transactions to get
144                // overwritten after the chan buffer size is met
145                Err(TryRecvError::Lagged(_)) => {
146                    // Handle the case where the receiver lagged too far behind.
147                    // `num_skipped` indicates the number of messages that were skipped.
148                }
149
150                // this case is still better than the existing iterator behavior where no new
151                // pending txs are surfaced to consumers
152                Err(_) => return None,
153            }
154        }
155    }
156
157    /// Removes the currently best independent transaction from the independent set and the total
158    /// set.
159    fn pop_best(&mut self) -> Option<PendingTransaction<T>> {
160        self.independent.pop_last().inspect(|best| {
161            self.all.remove(best.transaction.id());
162        })
163    }
164
165    /// Checks for new transactions that have come into the `PendingPool` after this iterator was
166    /// created and inserts them
167    fn add_new_transactions(&mut self) {
168        while let Some(pending_tx) = self.try_recv() {
169            //  same logic as PendingPool::add_transaction/PendingPool::best_with_unlocked
170            let tx_id = *pending_tx.transaction.id();
171            if self.ancestor(&tx_id).is_none() {
172                self.independent.insert(pending_tx.clone());
173            }
174            self.all.insert(tx_id, pending_tx);
175        }
176    }
177}
178
179impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactions<T> {
180    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
181        Self::mark_invalid(self, tx, kind)
182    }
183
184    fn no_updates(&mut self) {
185        self.new_transaction_receiver.take();
186        self.last_priority.take();
187    }
188
189    fn skip_blobs(&mut self) {
190        self.set_skip_blobs(true);
191    }
192
193    fn set_skip_blobs(&mut self, skip_blobs: bool) {
194        self.skip_blobs = skip_blobs;
195    }
196}
197
198impl<T: TransactionOrdering> Iterator for BestTransactions<T> {
199    type Item = Arc<ValidPoolTransaction<T::Transaction>>;
200
201    fn next(&mut self) -> Option<Self::Item> {
202        loop {
203            self.add_new_transactions();
204            // Remove the next independent tx with the highest priority
205            let best = self.pop_best()?;
206            let sender_id = best.transaction.sender_id();
207
208            // skip transactions for which sender was marked as invalid
209            if self.invalid.contains(&sender_id) {
210                debug!(
211                    target: "txpool",
212                    "[{:?}] skipping invalid transaction",
213                    best.transaction.hash()
214                );
215                continue
216            }
217
218            // Insert transactions that just got unlocked.
219            if let Some(unlocked) = self.all.get(&best.unlocks()) {
220                self.independent.insert(unlocked.clone());
221            }
222
223            if self.skip_blobs && best.transaction.transaction.is_eip4844() {
224                // blobs should be skipped, marking them as invalid will ensure that no dependent
225                // transactions are returned
226                self.mark_invalid(
227                    &best.transaction,
228                    InvalidPoolTransactionError::Eip4844(
229                        Eip4844PoolTransactionError::NoEip4844Blobs,
230                    ),
231                )
232            } else {
233                if self.new_transaction_receiver.is_some() {
234                    self.last_priority = Some(best.priority.clone())
235                }
236                return Some(best.transaction)
237            }
238        }
239    }
240}
241
242/// A [`BestTransactions`](crate::traits::BestTransactions) implementation that filters the
243/// transactions of iter with predicate.
244///
245/// Filter out transactions are marked as invalid:
246/// [`BestTransactions::mark_invalid`](crate::traits::BestTransactions::mark_invalid).
247pub struct BestTransactionFilter<I, P> {
248    pub(crate) best: I,
249    pub(crate) predicate: P,
250}
251
252impl<I, P> BestTransactionFilter<I, P> {
253    /// Create a new [`BestTransactionFilter`] with the given predicate.
254    pub const fn new(best: I, predicate: P) -> Self {
255        Self { best, predicate }
256    }
257}
258
259impl<I, P> Iterator for BestTransactionFilter<I, P>
260where
261    I: crate::traits::BestTransactions,
262    P: FnMut(&<I as Iterator>::Item) -> bool,
263{
264    type Item = <I as Iterator>::Item;
265
266    fn next(&mut self) -> Option<Self::Item> {
267        loop {
268            let best = self.best.next()?;
269            if (self.predicate)(&best) {
270                return Some(best)
271            }
272            self.best.mark_invalid(
273                &best,
274                InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
275            );
276        }
277    }
278}
279
280impl<I, P> crate::traits::BestTransactions for BestTransactionFilter<I, P>
281where
282    I: crate::traits::BestTransactions,
283    P: FnMut(&<I as Iterator>::Item) -> bool + Send,
284{
285    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
286        crate::traits::BestTransactions::mark_invalid(&mut self.best, tx, kind)
287    }
288
289    fn no_updates(&mut self) {
290        self.best.no_updates()
291    }
292
293    fn skip_blobs(&mut self) {
294        self.set_skip_blobs(true)
295    }
296
297    fn set_skip_blobs(&mut self, skip_blobs: bool) {
298        self.best.set_skip_blobs(skip_blobs)
299    }
300}
301
302impl<I: fmt::Debug, P> fmt::Debug for BestTransactionFilter<I, P> {
303    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
304        f.debug_struct("BestTransactionFilter").field("best", &self.best).finish()
305    }
306}
307
308/// Wrapper over [`crate::traits::BestTransactions`] that prioritizes transactions of certain
309/// senders capping total gas used by such transactions.
310#[derive(Debug)]
311pub struct BestTransactionsWithPrioritizedSenders<I: Iterator> {
312    /// Inner iterator
313    inner: I,
314    /// A set of senders which transactions should be prioritized
315    prioritized_senders: HashSet<Address>,
316    /// Maximum total gas limit of prioritized transactions
317    max_prioritized_gas: u64,
318    /// Buffer with transactions that are not being prioritized. Those will be the first to be
319    /// included after the prioritized transactions
320    buffer: VecDeque<I::Item>,
321    /// Tracker of total gas limit of prioritized transactions. Once it reaches
322    /// `max_prioritized_gas` no more transactions will be prioritized
323    prioritized_gas: u64,
324}
325
326impl<I: Iterator> BestTransactionsWithPrioritizedSenders<I> {
327    /// Constructs a new [`BestTransactionsWithPrioritizedSenders`].
328    pub fn new(prioritized_senders: HashSet<Address>, max_prioritized_gas: u64, inner: I) -> Self {
329        Self {
330            inner,
331            prioritized_senders,
332            max_prioritized_gas,
333            buffer: Default::default(),
334            prioritized_gas: Default::default(),
335        }
336    }
337}
338
339impl<I, T> Iterator for BestTransactionsWithPrioritizedSenders<I>
340where
341    I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
342    T: PoolTransaction,
343{
344    type Item = <I as Iterator>::Item;
345
346    fn next(&mut self) -> Option<Self::Item> {
347        // If we have space, try prioritizing transactions
348        if self.prioritized_gas < self.max_prioritized_gas {
349            for item in &mut self.inner {
350                if self.prioritized_senders.contains(&item.transaction.sender()) &&
351                    self.prioritized_gas + item.transaction.gas_limit() <=
352                        self.max_prioritized_gas
353                {
354                    self.prioritized_gas += item.transaction.gas_limit();
355                    return Some(item)
356                }
357                self.buffer.push_back(item);
358            }
359        }
360
361        if let Some(item) = self.buffer.pop_front() {
362            Some(item)
363        } else {
364            self.inner.next()
365        }
366    }
367}
368
369impl<I, T> crate::traits::BestTransactions for BestTransactionsWithPrioritizedSenders<I>
370where
371    I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
372    T: PoolTransaction,
373{
374    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
375        self.inner.mark_invalid(tx, kind)
376    }
377
378    fn no_updates(&mut self) {
379        self.inner.no_updates()
380    }
381
382    fn set_skip_blobs(&mut self, skip_blobs: bool) {
383        if skip_blobs {
384            self.buffer.retain(|tx| !tx.transaction.is_eip4844())
385        }
386        self.inner.set_skip_blobs(skip_blobs)
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393    use crate::{
394        pool::pending::PendingPool,
395        test_utils::{MockOrdering, MockTransaction, MockTransactionFactory},
396        BestTransactions, Priority,
397    };
398
399    #[test]
400    fn test_best_iter() {
401        let mut pool = PendingPool::new(MockOrdering::default());
402        let mut f = MockTransactionFactory::default();
403
404        let num_tx = 10;
405        // insert 10 gapless tx
406        let tx = MockTransaction::eip1559();
407        for nonce in 0..num_tx {
408            let tx = tx.clone().rng_hash().with_nonce(nonce);
409            let valid_tx = f.validated(tx);
410            pool.add_transaction(Arc::new(valid_tx), 0);
411        }
412
413        let mut best = pool.best();
414        assert_eq!(best.all.len(), num_tx as usize);
415        assert_eq!(best.independent.len(), 1);
416
417        // check tx are returned in order
418        for nonce in 0..num_tx {
419            assert_eq!(best.independent.len(), 1);
420            let tx = best.next().unwrap();
421            assert_eq!(tx.nonce(), nonce);
422        }
423    }
424
425    #[test]
426    fn test_best_iter_invalid() {
427        let mut pool = PendingPool::new(MockOrdering::default());
428        let mut f = MockTransactionFactory::default();
429
430        let num_tx = 10;
431        // insert 10 gapless tx
432        let tx = MockTransaction::eip1559();
433        for nonce in 0..num_tx {
434            let tx = tx.clone().rng_hash().with_nonce(nonce);
435            let valid_tx = f.validated(tx);
436            pool.add_transaction(Arc::new(valid_tx), 0);
437        }
438
439        let mut best = pool.best();
440
441        // mark the first tx as invalid
442        let invalid = best.independent.iter().next().unwrap();
443        best.mark_invalid(
444            &invalid.transaction.clone(),
445            InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
446        );
447
448        // iterator is empty
449        assert!(best.next().is_none());
450    }
451
452    #[test]
453    fn test_best_transactions_iter_invalid() {
454        let mut pool = PendingPool::new(MockOrdering::default());
455        let mut f = MockTransactionFactory::default();
456
457        let num_tx = 10;
458        // insert 10 gapless tx
459        let tx = MockTransaction::eip1559();
460        for nonce in 0..num_tx {
461            let tx = tx.clone().rng_hash().with_nonce(nonce);
462            let valid_tx = f.validated(tx);
463            pool.add_transaction(Arc::new(valid_tx), 0);
464        }
465
466        let mut best: Box<
467            dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<MockTransaction>>>,
468        > = Box::new(pool.best());
469
470        let tx = Iterator::next(&mut best).unwrap();
471        crate::traits::BestTransactions::mark_invalid(
472            &mut *best,
473            &tx,
474            InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
475        );
476        assert!(Iterator::next(&mut best).is_none());
477    }
478
479    #[test]
480    fn test_best_with_fees_iter_base_fee_satisfied() {
481        let mut pool = PendingPool::new(MockOrdering::default());
482        let mut f = MockTransactionFactory::default();
483
484        let num_tx = 5;
485        let base_fee: u64 = 10;
486        let base_fee_per_blob_gas: u64 = 15;
487
488        // Insert transactions with a max_fee_per_gas greater than or equal to the base fee
489        // Without blob fee
490        for nonce in 0..num_tx {
491            let tx = MockTransaction::eip1559()
492                .rng_hash()
493                .with_nonce(nonce)
494                .with_max_fee(base_fee as u128 + 5);
495            let valid_tx = f.validated(tx);
496            pool.add_transaction(Arc::new(valid_tx), 0);
497        }
498
499        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
500
501        for nonce in 0..num_tx {
502            let tx = best.next().expect("Transaction should be returned");
503            assert_eq!(tx.nonce(), nonce);
504            assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
505        }
506    }
507
508    #[test]
509    fn test_best_with_fees_iter_base_fee_violated() {
510        let mut pool = PendingPool::new(MockOrdering::default());
511        let mut f = MockTransactionFactory::default();
512
513        let num_tx = 5;
514        let base_fee: u64 = 20;
515        let base_fee_per_blob_gas: u64 = 15;
516
517        // Insert transactions with a max_fee_per_gas less than the base fee
518        for nonce in 0..num_tx {
519            let tx = MockTransaction::eip1559()
520                .rng_hash()
521                .with_nonce(nonce)
522                .with_max_fee(base_fee as u128 - 5);
523            let valid_tx = f.validated(tx);
524            pool.add_transaction(Arc::new(valid_tx), 0);
525        }
526
527        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
528
529        // No transaction should be returned since all violate the base fee
530        assert!(best.next().is_none());
531    }
532
533    #[test]
534    fn test_best_with_fees_iter_blob_fee_satisfied() {
535        let mut pool = PendingPool::new(MockOrdering::default());
536        let mut f = MockTransactionFactory::default();
537
538        let num_tx = 5;
539        let base_fee: u64 = 10;
540        let base_fee_per_blob_gas: u64 = 20;
541
542        // Insert transactions with a max_fee_per_blob_gas greater than or equal to the base fee per
543        // blob gas
544        for nonce in 0..num_tx {
545            let tx = MockTransaction::eip4844()
546                .rng_hash()
547                .with_nonce(nonce)
548                .with_max_fee(base_fee as u128 + 5)
549                .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
550            let valid_tx = f.validated(tx);
551            pool.add_transaction(Arc::new(valid_tx), 0);
552        }
553
554        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
555
556        // All transactions should be returned in order since they satisfy both base fee and blob
557        // fee
558        for nonce in 0..num_tx {
559            let tx = best.next().expect("Transaction should be returned");
560            assert_eq!(tx.nonce(), nonce);
561            assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
562            assert!(
563                tx.transaction.max_fee_per_blob_gas().unwrap() >= base_fee_per_blob_gas as u128
564            );
565        }
566
567        // No more transactions should be returned
568        assert!(best.next().is_none());
569    }
570
571    #[test]
572    fn test_best_with_fees_iter_blob_fee_violated() {
573        let mut pool = PendingPool::new(MockOrdering::default());
574        let mut f = MockTransactionFactory::default();
575
576        let num_tx = 5;
577        let base_fee: u64 = 10;
578        let base_fee_per_blob_gas: u64 = 20;
579
580        // Insert transactions with a max_fee_per_blob_gas less than the base fee per blob gas
581        for nonce in 0..num_tx {
582            let tx = MockTransaction::eip4844()
583                .rng_hash()
584                .with_nonce(nonce)
585                .with_max_fee(base_fee as u128 + 5)
586                .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
587            let valid_tx = f.validated(tx);
588            pool.add_transaction(Arc::new(valid_tx), 0);
589        }
590
591        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
592
593        // No transaction should be returned since all violate the blob fee
594        assert!(best.next().is_none());
595    }
596
597    #[test]
598    fn test_best_with_fees_iter_mixed_fees() {
599        let mut pool = PendingPool::new(MockOrdering::default());
600        let mut f = MockTransactionFactory::default();
601
602        let base_fee: u64 = 10;
603        let base_fee_per_blob_gas: u64 = 20;
604
605        // Insert transactions with varying max_fee_per_gas and max_fee_per_blob_gas
606        let tx1 =
607            MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
608        let tx2 = MockTransaction::eip4844()
609            .rng_hash()
610            .with_nonce(1)
611            .with_max_fee(base_fee as u128 + 5)
612            .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
613        let tx3 = MockTransaction::eip4844()
614            .rng_hash()
615            .with_nonce(2)
616            .with_max_fee(base_fee as u128 + 5)
617            .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
618        let tx4 =
619            MockTransaction::eip1559().rng_hash().with_nonce(3).with_max_fee(base_fee as u128 - 5);
620
621        pool.add_transaction(Arc::new(f.validated(tx1.clone())), 0);
622        pool.add_transaction(Arc::new(f.validated(tx2.clone())), 0);
623        pool.add_transaction(Arc::new(f.validated(tx3)), 0);
624        pool.add_transaction(Arc::new(f.validated(tx4)), 0);
625
626        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
627
628        let expected_order = vec![tx1, tx2];
629        for expected_tx in expected_order {
630            let tx = best.next().expect("Transaction should be returned");
631            assert_eq!(tx.transaction, expected_tx);
632        }
633
634        // No more transactions should be returned
635        assert!(best.next().is_none());
636    }
637
638    #[test]
639    fn test_best_add_transaction_with_next_nonce() {
640        let mut pool = PendingPool::new(MockOrdering::default());
641        let mut f = MockTransactionFactory::default();
642
643        // Add 5 transactions with increasing nonces to the pool
644        let num_tx = 5;
645        let tx = MockTransaction::eip1559();
646        for nonce in 0..num_tx {
647            let tx = tx.clone().rng_hash().with_nonce(nonce);
648            let valid_tx = f.validated(tx);
649            pool.add_transaction(Arc::new(valid_tx), 0);
650        }
651
652        // Create a BestTransactions iterator from the pool
653        let mut best = pool.best();
654
655        // Use a broadcast channel for transaction updates
656        let (tx_sender, tx_receiver) =
657            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
658        best.new_transaction_receiver = Some(tx_receiver);
659
660        // Create a new transaction with nonce 5 and validate it
661        let new_tx = MockTransaction::eip1559().rng_hash().with_nonce(5);
662        let valid_new_tx = f.validated(new_tx);
663
664        // Send the new transaction through the broadcast channel
665        let pending_tx = PendingTransaction {
666            submission_id: 10,
667            transaction: Arc::new(valid_new_tx.clone()),
668            priority: Priority::Value(1000),
669        };
670        tx_sender.send(pending_tx.clone()).unwrap();
671
672        // Add new transactions to the iterator
673        best.add_new_transactions();
674
675        // Verify that the new transaction has been added to the 'all' map
676        assert_eq!(best.all.len(), 6);
677        assert!(best.all.contains_key(valid_new_tx.id()));
678
679        // Verify that the new transaction has been added to the 'independent' set
680        assert_eq!(best.independent.len(), 2);
681        assert!(best.independent.contains(&pending_tx));
682    }
683
684    #[test]
685    fn test_best_add_transaction_with_ancestor() {
686        // Initialize a new PendingPool with default MockOrdering and MockTransactionFactory
687        let mut pool = PendingPool::new(MockOrdering::default());
688        let mut f = MockTransactionFactory::default();
689
690        // Add 5 transactions with increasing nonces to the pool
691        let num_tx = 5;
692        let tx = MockTransaction::eip1559();
693        for nonce in 0..num_tx {
694            let tx = tx.clone().rng_hash().with_nonce(nonce);
695            let valid_tx = f.validated(tx);
696            pool.add_transaction(Arc::new(valid_tx), 0);
697        }
698
699        // Create a BestTransactions iterator from the pool
700        let mut best = pool.best();
701
702        // Use a broadcast channel for transaction updates
703        let (tx_sender, tx_receiver) =
704            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
705        best.new_transaction_receiver = Some(tx_receiver);
706
707        // Create a new transaction with nonce 5 and validate it
708        let base_tx1 = MockTransaction::eip1559().rng_hash().with_nonce(5);
709        let valid_new_tx1 = f.validated(base_tx1.clone());
710
711        // Send the new transaction through the broadcast channel
712        let pending_tx1 = PendingTransaction {
713            submission_id: 10,
714            transaction: Arc::new(valid_new_tx1.clone()),
715            priority: Priority::Value(1000),
716        };
717        tx_sender.send(pending_tx1.clone()).unwrap();
718
719        // Add new transactions to the iterator
720        best.add_new_transactions();
721
722        // Verify that the new transaction has been added to the 'all' map
723        assert_eq!(best.all.len(), 6);
724        assert!(best.all.contains_key(valid_new_tx1.id()));
725
726        // Verify that the new transaction has been added to the 'independent' set
727        assert_eq!(best.independent.len(), 2);
728        assert!(best.independent.contains(&pending_tx1));
729
730        // Attempt to add a new transaction with a different nonce (not a duplicate)
731        let base_tx2 = base_tx1.with_nonce(6);
732        let valid_new_tx2 = f.validated(base_tx2);
733
734        // Send the new transaction through the broadcast channel
735        let pending_tx2 = PendingTransaction {
736            submission_id: 11, // Different submission ID
737            transaction: Arc::new(valid_new_tx2.clone()),
738            priority: Priority::Value(1000),
739        };
740        tx_sender.send(pending_tx2.clone()).unwrap();
741
742        // Add new transactions to the iterator
743        best.add_new_transactions();
744
745        // Verify that the new transaction has been added to 'all'
746        assert_eq!(best.all.len(), 7);
747        assert!(best.all.contains_key(valid_new_tx2.id()));
748
749        // Verify that the new transaction has not been added to the 'independent' set
750        assert_eq!(best.independent.len(), 2);
751        assert!(!best.independent.contains(&pending_tx2));
752    }
753
754    #[test]
755    fn test_best_transactions_filter_trait_object() {
756        // Initialize a new PendingPool with default MockOrdering and MockTransactionFactory
757        let mut pool = PendingPool::new(MockOrdering::default());
758        let mut f = MockTransactionFactory::default();
759
760        // Add 5 transactions with increasing nonces to the pool
761        let num_tx = 5;
762        let tx = MockTransaction::eip1559();
763        for nonce in 0..num_tx {
764            let tx = tx.clone().rng_hash().with_nonce(nonce);
765            let valid_tx = f.validated(tx);
766            pool.add_transaction(Arc::new(valid_tx), 0);
767        }
768
769        // Create a trait object of BestTransactions iterator from the pool
770        let best: Box<dyn crate::traits::BestTransactions<Item = _>> = Box::new(pool.best());
771
772        // Create a filter that only returns transactions with even nonces
773        let filter =
774            BestTransactionFilter::new(best, |tx: &Arc<ValidPoolTransaction<MockTransaction>>| {
775                tx.nonce().is_multiple_of(2)
776            });
777
778        // Verify that the filter only returns transactions with even nonces
779        for tx in filter {
780            assert_eq!(tx.nonce() % 2, 0);
781        }
782    }
783
784    #[test]
785    fn test_best_transactions_prioritized_senders() {
786        let mut pool = PendingPool::new(MockOrdering::default());
787        let mut f = MockTransactionFactory::default();
788
789        // Add 5 plain transactions from different senders with increasing gas price
790        for gas_price in 0..5 {
791            let tx = MockTransaction::eip1559().with_gas_price((gas_price + 1) * 10);
792            let valid_tx = f.validated(tx);
793            pool.add_transaction(Arc::new(valid_tx), 0);
794        }
795
796        // Add another transaction with 5 gas price that's going to be prioritized by sender
797        let prioritized_tx = MockTransaction::eip1559().with_gas_price(5).with_gas_limit(200);
798        let valid_prioritized_tx = f.validated(prioritized_tx.clone());
799        pool.add_transaction(Arc::new(valid_prioritized_tx), 0);
800
801        // Add another transaction with 3 gas price that should not be prioritized by sender because
802        // of gas limit.
803        let prioritized_tx2 = MockTransaction::eip1559().with_gas_price(3);
804        let valid_prioritized_tx2 = f.validated(prioritized_tx2.clone());
805        pool.add_transaction(Arc::new(valid_prioritized_tx2), 0);
806
807        let prioritized_senders =
808            HashSet::from([prioritized_tx.sender(), prioritized_tx2.sender()]);
809        let best =
810            BestTransactionsWithPrioritizedSenders::new(prioritized_senders, 200, pool.best());
811
812        // Verify that the prioritized transaction is returned first
813        // and the rest are returned in the reverse order of gas price
814        let mut iter = best.into_iter();
815        let top_of_block_tx = iter.next().unwrap();
816        assert_eq!(top_of_block_tx.max_fee_per_gas(), 5);
817        assert_eq!(top_of_block_tx.sender(), prioritized_tx.sender());
818        for gas_price in (0..5).rev() {
819            assert_eq!(iter.next().unwrap().max_fee_per_gas(), (gas_price + 1) * 10);
820        }
821
822        // Due to the gas limit, the transaction from second-prioritized sender was not
823        // prioritized.
824        let top_of_block_tx2 = iter.next().unwrap();
825        assert_eq!(top_of_block_tx2.max_fee_per_gas(), 3);
826        assert_eq!(top_of_block_tx2.sender(), prioritized_tx2.sender());
827    }
828
829    #[test]
830    fn test_best_with_fees_iter_no_blob_fee_required() {
831        // Tests transactions without blob fees where base fees are checked.
832        let mut pool = PendingPool::new(MockOrdering::default());
833        let mut f = MockTransactionFactory::default();
834
835        let base_fee: u64 = 10;
836        let base_fee_per_blob_gas: u64 = 0; // No blob fee requirement
837
838        // Insert transactions with max_fee_per_gas above the base fee
839        for nonce in 0..5 {
840            let tx = MockTransaction::eip1559()
841                .rng_hash()
842                .with_nonce(nonce)
843                .with_max_fee(base_fee as u128 + 5);
844            let valid_tx = f.validated(tx);
845            pool.add_transaction(Arc::new(valid_tx), 0);
846        }
847
848        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
849
850        // All transactions should be returned as no blob fee requirement is imposed
851        for nonce in 0..5 {
852            let tx = best.next().expect("Transaction should be returned");
853            assert_eq!(tx.nonce(), nonce);
854        }
855
856        // Ensure no more transactions are left
857        assert!(best.next().is_none());
858    }
859
860    #[test]
861    fn test_best_with_fees_iter_mix_of_blob_and_non_blob_transactions() {
862        // Tests mixed scenarios with both blob and non-blob transactions.
863        let mut pool = PendingPool::new(MockOrdering::default());
864        let mut f = MockTransactionFactory::default();
865
866        let base_fee: u64 = 10;
867        let base_fee_per_blob_gas: u64 = 15;
868
869        // Add a non-blob transaction that satisfies the base fee
870        let tx_non_blob =
871            MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
872        pool.add_transaction(Arc::new(f.validated(tx_non_blob.clone())), 0);
873
874        // Add a blob transaction that satisfies both base fee and blob fee
875        let tx_blob = MockTransaction::eip4844()
876            .rng_hash()
877            .with_nonce(1)
878            .with_max_fee(base_fee as u128 + 5)
879            .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
880        pool.add_transaction(Arc::new(f.validated(tx_blob.clone())), 0);
881
882        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
883
884        // Verify both transactions are returned
885        let tx = best.next().expect("Transaction should be returned");
886        assert_eq!(tx.transaction, tx_non_blob);
887
888        let tx = best.next().expect("Transaction should be returned");
889        assert_eq!(tx.transaction, tx_blob);
890
891        // Ensure no more transactions are left
892        assert!(best.next().is_none());
893    }
894
895    #[test]
896    fn test_best_transactions_with_skipping_blobs() {
897        // Tests the skip_blobs functionality to ensure blob transactions are skipped.
898        let mut pool = PendingPool::new(MockOrdering::default());
899        let mut f = MockTransactionFactory::default();
900
901        // Add a blob transaction
902        let tx_blob = MockTransaction::eip4844().rng_hash().with_nonce(0).with_blob_fee(100);
903        let valid_blob_tx = f.validated(tx_blob);
904        pool.add_transaction(Arc::new(valid_blob_tx), 0);
905
906        // Add a non-blob transaction
907        let tx_non_blob = MockTransaction::eip1559().rng_hash().with_nonce(1).with_max_fee(200);
908        let valid_non_blob_tx = f.validated(tx_non_blob.clone());
909        pool.add_transaction(Arc::new(valid_non_blob_tx), 0);
910
911        let mut best = pool.best();
912        best.skip_blobs();
913
914        // Only the non-blob transaction should be returned
915        let tx = best.next().expect("Transaction should be returned");
916        assert_eq!(tx.transaction, tx_non_blob);
917
918        // Ensure no more transactions are left
919        assert!(best.next().is_none());
920    }
921
922    #[test]
923    fn test_best_transactions_no_updates() {
924        // Tests the no_updates functionality to ensure it properly clears the
925        // new_transaction_receiver.
926        let mut pool = PendingPool::new(MockOrdering::default());
927        let mut f = MockTransactionFactory::default();
928
929        // Add a transaction
930        let tx = MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(100);
931        let valid_tx = f.validated(tx);
932        pool.add_transaction(Arc::new(valid_tx), 0);
933
934        let mut best = pool.best();
935
936        // Use a broadcast channel for transaction updates
937        let (_tx_sender, tx_receiver) =
938            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
939        best.new_transaction_receiver = Some(tx_receiver);
940
941        // Ensure receiver is set
942        assert!(best.new_transaction_receiver.is_some());
943
944        // Call no_updates to clear the receiver
945        best.no_updates();
946
947        // Ensure receiver is cleared
948        assert!(best.new_transaction_receiver.is_none());
949    }
950
951    #[test]
952    fn test_best_update_transaction_priority() {
953        let mut pool = PendingPool::new(MockOrdering::default());
954        let mut f = MockTransactionFactory::default();
955
956        // Add 5 transactions with increasing nonces to the pool
957        let num_tx = 5;
958        let tx = MockTransaction::eip1559();
959        for nonce in 0..num_tx {
960            let tx = tx.clone().rng_hash().with_nonce(nonce);
961            let valid_tx = f.validated(tx);
962            pool.add_transaction(Arc::new(valid_tx), 0);
963        }
964
965        // Create a BestTransactions iterator from the pool
966        let mut best = pool.best();
967
968        // Use a broadcast channel for transaction updates
969        let (tx_sender, tx_receiver) =
970            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
971        best.new_transaction_receiver = Some(tx_receiver);
972
973        // yield one tx, effectively locking in the highest prio
974        let first = best.next().unwrap();
975
976        // Create a new transaction with nonce 5 and validate it
977        let new_higher_fee_tx = MockTransaction::eip1559().with_nonce(0);
978        let valid_new_higher_fee_tx = f.validated(new_higher_fee_tx);
979
980        // Send the new transaction through the broadcast channel
981        let pending_tx = PendingTransaction {
982            submission_id: 10,
983            transaction: Arc::new(valid_new_higher_fee_tx.clone()),
984            priority: Priority::Value(u128::MAX),
985        };
986        tx_sender.send(pending_tx).unwrap();
987
988        // ensure that the higher prio tx is skipped since we yielded a lower one
989        for tx in best {
990            assert_eq!(tx.sender_id(), first.sender_id());
991            assert_ne!(tx.sender_id(), valid_new_higher_fee_tx.sender_id());
992        }
993    }
994}