reth_transaction_pool/pool/
best.rs

1use crate::{
2    error::{Eip4844PoolTransactionError, InvalidPoolTransactionError},
3    identifier::{SenderId, TransactionId},
4    pool::pending::PendingTransaction,
5    PoolTransaction, Priority, TransactionOrdering, ValidPoolTransaction,
6};
7use alloy_consensus::Transaction;
8use alloy_eips::Typed2718;
9use alloy_primitives::Address;
10use core::fmt;
11use reth_primitives_traits::transaction::error::InvalidTransactionError;
12use std::{
13    collections::{BTreeMap, BTreeSet, HashSet, VecDeque},
14    sync::Arc,
15};
16use tokio::sync::broadcast::{error::TryRecvError, Receiver};
17use tracing::debug;
18
19const MAX_NEW_TRANSACTIONS_PER_BATCH: usize = 16;
20
21/// An iterator that returns transactions that can be executed on the current state (*best*
22/// transactions).
23///
24/// This is a wrapper around [`BestTransactions`] that also enforces a specific basefee.
25///
26/// This iterator guarantees that all transactions it returns satisfy both the base fee and blob
27/// fee!
28pub(crate) struct BestTransactionsWithFees<T: TransactionOrdering> {
29    pub(crate) best: BestTransactions<T>,
30    pub(crate) base_fee: u64,
31    pub(crate) base_fee_per_blob_gas: u64,
32}
33
34impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactionsWithFees<T> {
35    fn mark_invalid(&mut self, tx: &Self::Item, kind: &InvalidPoolTransactionError) {
36        BestTransactions::mark_invalid(&mut self.best, tx, kind)
37    }
38
39    fn no_updates(&mut self) {
40        self.best.no_updates()
41    }
42
43    fn skip_blobs(&mut self) {
44        self.set_skip_blobs(true)
45    }
46
47    fn set_skip_blobs(&mut self, skip_blobs: bool) {
48        self.best.set_skip_blobs(skip_blobs)
49    }
50}
51
52impl<T: TransactionOrdering> Iterator for BestTransactionsWithFees<T> {
53    type Item = Arc<ValidPoolTransaction<T::Transaction>>;
54
55    fn next(&mut self) -> Option<Self::Item> {
56        // find the next transaction that satisfies the base fee
57        loop {
58            let best = Iterator::next(&mut self.best)?;
59            // If both the base fee and blob fee (if applicable for EIP-4844) are satisfied, return
60            // the transaction
61            if best.transaction.max_fee_per_gas() >= self.base_fee as u128 &&
62                best.transaction
63                    .max_fee_per_blob_gas()
64                    .is_none_or(|fee| fee >= self.base_fee_per_blob_gas as u128)
65            {
66                return Some(best);
67            }
68            crate::traits::BestTransactions::mark_invalid(
69                self,
70                &best,
71                &InvalidPoolTransactionError::Underpriced,
72            );
73        }
74    }
75}
76
77/// An iterator that returns transactions that can be executed on the current state (*best*
78/// transactions).
79///
80/// The [`PendingPool`](crate::pool::pending::PendingPool) contains transactions that *could* all
81/// be executed on the current state, but only yields transactions that are ready to be executed
82/// now. While it contains all gapless transactions of a sender, it _always_ only returns the
83/// transaction with the current on chain nonce.
84#[derive(Debug)]
85pub struct BestTransactions<T: TransactionOrdering> {
86    /// Contains a copy of _all_ transactions of the pending pool at the point in time this
87    /// iterator was created.
88    pub(crate) all: BTreeMap<TransactionId, PendingTransaction<T>>,
89    /// Transactions that can be executed right away: these have the expected nonce.
90    ///
91    /// Once an `independent` transaction with the nonce `N` is returned, it unlocks `N+1`, which
92    /// then can be moved from the `all` set to the `independent` set.
93    pub(crate) independent: BTreeSet<PendingTransaction<T>>,
94    /// There might be the case where a yielded transactions is invalid, this will track it.
95    pub(crate) invalid: HashSet<SenderId>,
96    /// Used to receive any new pending transactions that have been added to the pool after this
97    /// iterator was static filtered
98    ///
99    /// These new pending transactions are inserted into this iterator's pool before yielding the
100    /// next value
101    pub(crate) new_transaction_receiver: Option<Receiver<PendingTransaction<T>>>,
102    /// The priority value of most recently yielded transaction.
103    ///
104    /// This is required if new pending transactions are fed in while it yields new values.
105    pub(crate) last_priority: Option<Priority<T::PriorityValue>>,
106    /// Flag to control whether to skip blob transactions (EIP4844).
107    pub(crate) skip_blobs: bool,
108}
109
110impl<T: TransactionOrdering> BestTransactions<T> {
111    /// Mark the transaction and its descendants as invalid.
112    pub(crate) fn mark_invalid(
113        &mut self,
114        tx: &Arc<ValidPoolTransaction<T::Transaction>>,
115        _kind: &InvalidPoolTransactionError,
116    ) {
117        self.invalid.insert(tx.sender_id());
118    }
119
120    /// Returns the ancestor the given transaction, the transaction with `nonce - 1`.
121    ///
122    /// Note: for a transaction with nonce higher than the current on chain nonce this will always
123    /// return an ancestor since all transactions in this pool are gapless.
124    pub(crate) fn ancestor(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
125        self.all.get(&id.unchecked_ancestor()?)
126    }
127
128    /// Non-blocking read on the new pending transactions subscription channel
129    fn try_recv(&mut self) -> Option<IncomingTransaction<T>> {
130        loop {
131            match self.new_transaction_receiver.as_mut()?.try_recv() {
132                Ok(tx) => {
133                    if let Some(last_priority) = &self.last_priority &&
134                        &tx.priority > last_priority
135                    {
136                        // we skip transactions if we already yielded a transaction with lower
137                        // priority
138                        return Some(IncomingTransaction::Stash(tx))
139                    }
140                    return Some(IncomingTransaction::Process(tx))
141                }
142                // note TryRecvError::Lagged can be returned here, which is an error that attempts
143                // to correct itself on consecutive try_recv() attempts
144
145                // the cost of ignoring this error is allowing old transactions to get
146                // overwritten after the chan buffer size is met
147                Err(TryRecvError::Lagged(_)) => {
148                    // Handle the case where the receiver lagged too far behind.
149                    // `num_skipped` indicates the number of messages that were skipped.
150                }
151
152                // this case is still better than the existing iterator behavior where no new
153                // pending txs are surfaced to consumers
154                Err(_) => return None,
155            }
156        }
157    }
158
159    /// Removes the currently best independent transaction from the independent set and the total
160    /// set.
161    fn pop_best(&mut self) -> Option<PendingTransaction<T>> {
162        self.independent.pop_last().inspect(|best| {
163            self.all.remove(best.transaction.id());
164        })
165    }
166
167    /// Checks for new transactions that have come into the `PendingPool` after this iterator was
168    /// created and inserts them
169    fn add_new_transactions(&mut self) {
170        for _ in 0..MAX_NEW_TRANSACTIONS_PER_BATCH {
171            if let Some(pending_tx) = self.try_recv() {
172                //  same logic as PendingPool::add_transaction/PendingPool::best_with_unlocked
173
174                match pending_tx {
175                    IncomingTransaction::Process(tx) => {
176                        let tx_id = *tx.transaction.id();
177                        if self.ancestor(&tx_id).is_none() {
178                            self.independent.insert(tx.clone());
179                        }
180                        self.all.insert(tx_id, tx);
181                    }
182                    IncomingTransaction::Stash(tx) => {
183                        let tx_id = *tx.transaction.id();
184                        self.all.insert(tx_id, tx);
185                    }
186                }
187            } else {
188                break;
189            }
190        }
191    }
192
193    /// Returns the next best transaction and its priority value.
194    #[allow(clippy::type_complexity)]
195    pub fn next_tx_and_priority(
196        &mut self,
197    ) -> Option<(Arc<ValidPoolTransaction<T::Transaction>>, Priority<T::PriorityValue>)> {
198        loop {
199            self.add_new_transactions();
200            // Remove the next independent tx with the highest priority
201            let best = self.pop_best()?;
202            let sender_id = best.transaction.sender_id();
203
204            // skip transactions for which sender was marked as invalid
205            if self.invalid.contains(&sender_id) {
206                debug!(
207                    target: "txpool",
208                    "[{:?}] skipping invalid transaction",
209                    best.transaction.hash()
210                );
211                continue
212            }
213
214            // Insert transactions that just got unlocked.
215            if let Some(unlocked) = self.all.get(&best.unlocks()) {
216                self.independent.insert(unlocked.clone());
217            }
218
219            if self.skip_blobs && best.transaction.transaction.is_eip4844() {
220                // blobs should be skipped, marking them as invalid will ensure that no dependent
221                // transactions are returned
222                self.mark_invalid(
223                    &best.transaction,
224                    &InvalidPoolTransactionError::Eip4844(
225                        Eip4844PoolTransactionError::NoEip4844Blobs,
226                    ),
227                )
228            } else {
229                if self.new_transaction_receiver.is_some() {
230                    self.last_priority = Some(best.priority.clone())
231                }
232                return Some((best.transaction, best.priority))
233            }
234        }
235    }
236}
237
238/// Result of attempting to receive a new transaction from the channel during iteration.
239///
240/// This enum determines how a newly received transaction should be handled based on its priority
241/// relative to transactions already yielded by the iterator.
242enum IncomingTransaction<T: TransactionOrdering> {
243    /// Process the transaction normally: add to both `all` map and potentially to `independent`
244    /// set (if it has no ancestor).
245    ///
246    /// This variant is used when the transaction's priority is lower than or equal to the last
247    /// yielded transaction, meaning it can be safely processed without breaking the descending
248    /// priority order.
249    Process(PendingTransaction<T>),
250
251    /// Stash the transaction: add only to the `all` map, but NOT to the `independent` set.
252    ///
253    /// This variant is used when the transaction has a higher priority than the last yielded
254    /// transaction. We cannot yield it immediately (to maintain strict priority ordering), but we
255    /// must still track it so that:
256    /// - Its descendants can find it via `ancestor()` lookups
257    /// - We prevent those descendants from being incorrectly promoted to `independent`
258    ///
259    /// Without stashing, if a child of this transaction arrives later, it would fail to find its
260    /// parent in `all`, be marked as `independent`, and be yielded out of order (before its
261    /// parent), causing nonce gaps.
262    Stash(PendingTransaction<T>),
263}
264
265impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactions<T> {
266    fn mark_invalid(&mut self, tx: &Self::Item, kind: &InvalidPoolTransactionError) {
267        Self::mark_invalid(self, tx, kind)
268    }
269
270    fn no_updates(&mut self) {
271        self.new_transaction_receiver.take();
272        self.last_priority.take();
273    }
274
275    fn skip_blobs(&mut self) {
276        self.set_skip_blobs(true);
277    }
278
279    fn set_skip_blobs(&mut self, skip_blobs: bool) {
280        self.skip_blobs = skip_blobs;
281    }
282}
283
284impl<T: TransactionOrdering> Iterator for BestTransactions<T> {
285    type Item = Arc<ValidPoolTransaction<T::Transaction>>;
286
287    fn next(&mut self) -> Option<Self::Item> {
288        self.next_tx_and_priority().map(|(tx, _)| tx)
289    }
290}
291
292/// A [`BestTransactions`](crate::traits::BestTransactions) implementation that filters the
293/// transactions of iter with predicate.
294///
295/// Filter out transactions are marked as invalid:
296/// [`BestTransactions::mark_invalid`](crate::traits::BestTransactions::mark_invalid).
297pub struct BestTransactionFilter<I, P> {
298    pub(crate) best: I,
299    pub(crate) predicate: P,
300}
301
302impl<I, P> BestTransactionFilter<I, P> {
303    /// Create a new [`BestTransactionFilter`] with the given predicate.
304    pub const fn new(best: I, predicate: P) -> Self {
305        Self { best, predicate }
306    }
307}
308
309impl<I, P> Iterator for BestTransactionFilter<I, P>
310where
311    I: crate::traits::BestTransactions,
312    P: FnMut(&<I as Iterator>::Item) -> bool,
313{
314    type Item = <I as Iterator>::Item;
315
316    fn next(&mut self) -> Option<Self::Item> {
317        loop {
318            let best = self.best.next()?;
319            if (self.predicate)(&best) {
320                return Some(best)
321            }
322            self.best.mark_invalid(
323                &best,
324                &InvalidPoolTransactionError::Consensus(
325                    InvalidTransactionError::TxTypeNotSupported,
326                ),
327            );
328        }
329    }
330}
331
332impl<I, P> crate::traits::BestTransactions for BestTransactionFilter<I, P>
333where
334    I: crate::traits::BestTransactions,
335    P: FnMut(&<I as Iterator>::Item) -> bool + Send,
336{
337    fn mark_invalid(&mut self, tx: &Self::Item, kind: &InvalidPoolTransactionError) {
338        crate::traits::BestTransactions::mark_invalid(&mut self.best, tx, kind)
339    }
340
341    fn no_updates(&mut self) {
342        self.best.no_updates()
343    }
344
345    fn skip_blobs(&mut self) {
346        self.set_skip_blobs(true)
347    }
348
349    fn set_skip_blobs(&mut self, skip_blobs: bool) {
350        self.best.set_skip_blobs(skip_blobs)
351    }
352}
353
354impl<I: fmt::Debug, P> fmt::Debug for BestTransactionFilter<I, P> {
355    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
356        f.debug_struct("BestTransactionFilter").field("best", &self.best).finish()
357    }
358}
359
360/// Wrapper over [`crate::traits::BestTransactions`] that prioritizes transactions of certain
361/// senders capping total gas used by such transactions.
362#[derive(Debug)]
363pub struct BestTransactionsWithPrioritizedSenders<I: Iterator> {
364    /// Inner iterator
365    inner: I,
366    /// A set of senders which transactions should be prioritized
367    prioritized_senders: HashSet<Address>,
368    /// Maximum total gas limit of prioritized transactions
369    max_prioritized_gas: u64,
370    /// Buffer with transactions that are not being prioritized. Those will be the first to be
371    /// included after the prioritized transactions
372    buffer: VecDeque<I::Item>,
373    /// Tracker of total gas limit of prioritized transactions. Once it reaches
374    /// `max_prioritized_gas` no more transactions will be prioritized
375    prioritized_gas: u64,
376}
377
378impl<I: Iterator> BestTransactionsWithPrioritizedSenders<I> {
379    /// Constructs a new [`BestTransactionsWithPrioritizedSenders`].
380    pub fn new(prioritized_senders: HashSet<Address>, max_prioritized_gas: u64, inner: I) -> Self {
381        Self {
382            inner,
383            prioritized_senders,
384            max_prioritized_gas,
385            buffer: Default::default(),
386            prioritized_gas: Default::default(),
387        }
388    }
389}
390
391impl<I, T> Iterator for BestTransactionsWithPrioritizedSenders<I>
392where
393    I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
394    T: PoolTransaction,
395{
396    type Item = <I as Iterator>::Item;
397
398    fn next(&mut self) -> Option<Self::Item> {
399        // If we have space, try prioritizing transactions
400        if self.prioritized_gas < self.max_prioritized_gas {
401            for item in &mut self.inner {
402                if self.prioritized_senders.contains(&item.transaction.sender()) &&
403                    self.prioritized_gas + item.transaction.gas_limit() <=
404                        self.max_prioritized_gas
405                {
406                    self.prioritized_gas += item.transaction.gas_limit();
407                    return Some(item)
408                }
409                self.buffer.push_back(item);
410            }
411        }
412
413        if let Some(item) = self.buffer.pop_front() {
414            Some(item)
415        } else {
416            self.inner.next()
417        }
418    }
419}
420
421impl<I, T> crate::traits::BestTransactions for BestTransactionsWithPrioritizedSenders<I>
422where
423    I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
424    T: PoolTransaction,
425{
426    fn mark_invalid(&mut self, tx: &Self::Item, kind: &InvalidPoolTransactionError) {
427        self.inner.mark_invalid(tx, kind)
428    }
429
430    fn no_updates(&mut self) {
431        self.inner.no_updates()
432    }
433
434    fn set_skip_blobs(&mut self, skip_blobs: bool) {
435        if skip_blobs {
436            self.buffer.retain(|tx| !tx.transaction.is_eip4844())
437        }
438        self.inner.set_skip_blobs(skip_blobs)
439    }
440}
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445    use crate::{
446        pool::pending::PendingPool,
447        test_utils::{MockOrdering, MockTransaction, MockTransactionFactory},
448        BestTransactions, Priority,
449    };
450
451    #[test]
452    fn test_best_iter() {
453        let mut pool = PendingPool::new(MockOrdering::default());
454        let mut f = MockTransactionFactory::default();
455
456        let num_tx = 10;
457        // insert 10 gapless tx
458        let tx = MockTransaction::eip1559();
459        for nonce in 0..num_tx {
460            let tx = tx.clone().rng_hash().with_nonce(nonce);
461            let valid_tx = f.validated(tx);
462            pool.add_transaction(Arc::new(valid_tx), 0);
463        }
464
465        let mut best = pool.best();
466        assert_eq!(best.all.len(), num_tx as usize);
467        assert_eq!(best.independent.len(), 1);
468
469        // check tx are returned in order
470        for nonce in 0..num_tx {
471            assert_eq!(best.independent.len(), 1);
472            let tx = best.next().unwrap();
473            assert_eq!(tx.nonce(), nonce);
474        }
475    }
476
477    #[test]
478    fn test_best_iter_invalid() {
479        let mut pool = PendingPool::new(MockOrdering::default());
480        let mut f = MockTransactionFactory::default();
481
482        let num_tx = 10;
483        // insert 10 gapless tx
484        let tx = MockTransaction::eip1559();
485        for nonce in 0..num_tx {
486            let tx = tx.clone().rng_hash().with_nonce(nonce);
487            let valid_tx = f.validated(tx);
488            pool.add_transaction(Arc::new(valid_tx), 0);
489        }
490
491        let mut best = pool.best();
492
493        // mark the first tx as invalid
494        let invalid = best.independent.iter().next().unwrap();
495        best.mark_invalid(
496            &invalid.transaction.clone(),
497            &InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
498        );
499
500        // iterator is empty
501        assert!(best.next().is_none());
502    }
503
504    #[test]
505    fn test_best_transactions_iter_invalid() {
506        let mut pool = PendingPool::new(MockOrdering::default());
507        let mut f = MockTransactionFactory::default();
508
509        let num_tx = 10;
510        // insert 10 gapless tx
511        let tx = MockTransaction::eip1559();
512        for nonce in 0..num_tx {
513            let tx = tx.clone().rng_hash().with_nonce(nonce);
514            let valid_tx = f.validated(tx);
515            pool.add_transaction(Arc::new(valid_tx), 0);
516        }
517
518        let mut best: Box<
519            dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<MockTransaction>>>,
520        > = Box::new(pool.best());
521
522        let tx = Iterator::next(&mut best).unwrap();
523        crate::traits::BestTransactions::mark_invalid(
524            &mut *best,
525            &tx,
526            &InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
527        );
528        assert!(Iterator::next(&mut best).is_none());
529    }
530
531    #[test]
532    fn test_best_with_fees_iter_base_fee_satisfied() {
533        let mut pool = PendingPool::new(MockOrdering::default());
534        let mut f = MockTransactionFactory::default();
535
536        let num_tx = 5;
537        let base_fee: u64 = 10;
538        let base_fee_per_blob_gas: u64 = 15;
539
540        // Insert transactions with a max_fee_per_gas greater than or equal to the base fee
541        // Without blob fee
542        for nonce in 0..num_tx {
543            let tx = MockTransaction::eip1559()
544                .rng_hash()
545                .with_nonce(nonce)
546                .with_max_fee(base_fee as u128 + 5);
547            let valid_tx = f.validated(tx);
548            pool.add_transaction(Arc::new(valid_tx), 0);
549        }
550
551        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
552
553        for nonce in 0..num_tx {
554            let tx = best.next().expect("Transaction should be returned");
555            assert_eq!(tx.nonce(), nonce);
556            assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
557        }
558    }
559
560    #[test]
561    fn test_best_with_fees_iter_base_fee_violated() {
562        let mut pool = PendingPool::new(MockOrdering::default());
563        let mut f = MockTransactionFactory::default();
564
565        let num_tx = 5;
566        let base_fee: u64 = 20;
567        let base_fee_per_blob_gas: u64 = 15;
568
569        // Insert transactions with a max_fee_per_gas less than the base fee
570        for nonce in 0..num_tx {
571            let tx = MockTransaction::eip1559()
572                .rng_hash()
573                .with_nonce(nonce)
574                .with_max_fee(base_fee as u128 - 5);
575            let valid_tx = f.validated(tx);
576            pool.add_transaction(Arc::new(valid_tx), 0);
577        }
578
579        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
580
581        // No transaction should be returned since all violate the base fee
582        assert!(best.next().is_none());
583    }
584
585    #[test]
586    fn test_best_with_fees_iter_blob_fee_satisfied() {
587        let mut pool = PendingPool::new(MockOrdering::default());
588        let mut f = MockTransactionFactory::default();
589
590        let num_tx = 5;
591        let base_fee: u64 = 10;
592        let base_fee_per_blob_gas: u64 = 20;
593
594        // Insert transactions with a max_fee_per_blob_gas greater than or equal to the base fee per
595        // blob gas
596        for nonce in 0..num_tx {
597            let tx = MockTransaction::eip4844()
598                .rng_hash()
599                .with_nonce(nonce)
600                .with_max_fee(base_fee as u128 + 5)
601                .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
602            let valid_tx = f.validated(tx);
603            pool.add_transaction(Arc::new(valid_tx), 0);
604        }
605
606        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
607
608        // All transactions should be returned in order since they satisfy both base fee and blob
609        // fee
610        for nonce in 0..num_tx {
611            let tx = best.next().expect("Transaction should be returned");
612            assert_eq!(tx.nonce(), nonce);
613            assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
614            assert!(
615                tx.transaction.max_fee_per_blob_gas().unwrap() >= base_fee_per_blob_gas as u128
616            );
617        }
618
619        // No more transactions should be returned
620        assert!(best.next().is_none());
621    }
622
623    #[test]
624    fn test_best_with_fees_iter_blob_fee_violated() {
625        let mut pool = PendingPool::new(MockOrdering::default());
626        let mut f = MockTransactionFactory::default();
627
628        let num_tx = 5;
629        let base_fee: u64 = 10;
630        let base_fee_per_blob_gas: u64 = 20;
631
632        // Insert transactions with a max_fee_per_blob_gas less than the base fee per blob gas
633        for nonce in 0..num_tx {
634            let tx = MockTransaction::eip4844()
635                .rng_hash()
636                .with_nonce(nonce)
637                .with_max_fee(base_fee as u128 + 5)
638                .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
639            let valid_tx = f.validated(tx);
640            pool.add_transaction(Arc::new(valid_tx), 0);
641        }
642
643        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
644
645        // No transaction should be returned since all violate the blob fee
646        assert!(best.next().is_none());
647    }
648
649    #[test]
650    fn test_best_with_fees_iter_mixed_fees() {
651        let mut pool = PendingPool::new(MockOrdering::default());
652        let mut f = MockTransactionFactory::default();
653
654        let base_fee: u64 = 10;
655        let base_fee_per_blob_gas: u64 = 20;
656
657        // Insert transactions with varying max_fee_per_gas and max_fee_per_blob_gas
658        let tx1 =
659            MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
660        let tx2 = MockTransaction::eip4844()
661            .rng_hash()
662            .with_nonce(1)
663            .with_max_fee(base_fee as u128 + 5)
664            .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
665        let tx3 = MockTransaction::eip4844()
666            .rng_hash()
667            .with_nonce(2)
668            .with_max_fee(base_fee as u128 + 5)
669            .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
670        let tx4 =
671            MockTransaction::eip1559().rng_hash().with_nonce(3).with_max_fee(base_fee as u128 - 5);
672
673        pool.add_transaction(Arc::new(f.validated(tx1.clone())), 0);
674        pool.add_transaction(Arc::new(f.validated(tx2.clone())), 0);
675        pool.add_transaction(Arc::new(f.validated(tx3)), 0);
676        pool.add_transaction(Arc::new(f.validated(tx4)), 0);
677
678        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
679
680        let expected_order = vec![tx1, tx2];
681        for expected_tx in expected_order {
682            let tx = best.next().expect("Transaction should be returned");
683            assert_eq!(tx.transaction, expected_tx);
684        }
685
686        // No more transactions should be returned
687        assert!(best.next().is_none());
688    }
689
690    #[test]
691    fn test_best_add_transaction_with_next_nonce() {
692        let mut pool = PendingPool::new(MockOrdering::default());
693        let mut f = MockTransactionFactory::default();
694
695        // Add 5 transactions with increasing nonces to the pool
696        let num_tx = 5;
697        let tx = MockTransaction::eip1559();
698        for nonce in 0..num_tx {
699            let tx = tx.clone().rng_hash().with_nonce(nonce);
700            let valid_tx = f.validated(tx);
701            pool.add_transaction(Arc::new(valid_tx), 0);
702        }
703
704        // Create a BestTransactions iterator from the pool
705        let mut best = pool.best();
706
707        // Use a broadcast channel for transaction updates
708        let (tx_sender, tx_receiver) =
709            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
710        best.new_transaction_receiver = Some(tx_receiver);
711
712        // Create a new transaction with nonce 5 and validate it
713        let new_tx = MockTransaction::eip1559().rng_hash().with_nonce(5);
714        let valid_new_tx = f.validated(new_tx);
715
716        // Send the new transaction through the broadcast channel
717        let pending_tx = PendingTransaction {
718            submission_id: 10,
719            transaction: Arc::new(valid_new_tx.clone()),
720            priority: Priority::Value(1000),
721        };
722        tx_sender.send(pending_tx.clone()).unwrap();
723
724        // Add new transactions to the iterator
725        best.add_new_transactions();
726
727        // Verify that the new transaction has been added to the 'all' map
728        assert_eq!(best.all.len(), 6);
729        assert!(best.all.contains_key(valid_new_tx.id()));
730
731        // Verify that the new transaction has been added to the 'independent' set
732        assert_eq!(best.independent.len(), 2);
733        assert!(best.independent.contains(&pending_tx));
734    }
735
736    #[test]
737    fn test_best_add_transaction_with_ancestor() {
738        // Initialize a new PendingPool with default MockOrdering and MockTransactionFactory
739        let mut pool = PendingPool::new(MockOrdering::default());
740        let mut f = MockTransactionFactory::default();
741
742        // Add 5 transactions with increasing nonces to the pool
743        let num_tx = 5;
744        let tx = MockTransaction::eip1559();
745        for nonce in 0..num_tx {
746            let tx = tx.clone().rng_hash().with_nonce(nonce);
747            let valid_tx = f.validated(tx);
748            pool.add_transaction(Arc::new(valid_tx), 0);
749        }
750
751        // Create a BestTransactions iterator from the pool
752        let mut best = pool.best();
753
754        // Use a broadcast channel for transaction updates
755        let (tx_sender, tx_receiver) =
756            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
757        best.new_transaction_receiver = Some(tx_receiver);
758
759        // Create a new transaction with nonce 5 and validate it
760        let base_tx1 = MockTransaction::eip1559().rng_hash().with_nonce(5);
761        let valid_new_tx1 = f.validated(base_tx1.clone());
762
763        // Send the new transaction through the broadcast channel
764        let pending_tx1 = PendingTransaction {
765            submission_id: 10,
766            transaction: Arc::new(valid_new_tx1.clone()),
767            priority: Priority::Value(1000),
768        };
769        tx_sender.send(pending_tx1.clone()).unwrap();
770
771        // Add new transactions to the iterator
772        best.add_new_transactions();
773
774        // Verify that the new transaction has been added to the 'all' map
775        assert_eq!(best.all.len(), 6);
776        assert!(best.all.contains_key(valid_new_tx1.id()));
777
778        // Verify that the new transaction has been added to the 'independent' set
779        assert_eq!(best.independent.len(), 2);
780        assert!(best.independent.contains(&pending_tx1));
781
782        // Attempt to add a new transaction with a different nonce (not a duplicate)
783        let base_tx2 = base_tx1.with_nonce(6);
784        let valid_new_tx2 = f.validated(base_tx2);
785
786        // Send the new transaction through the broadcast channel
787        let pending_tx2 = PendingTransaction {
788            submission_id: 11, // Different submission ID
789            transaction: Arc::new(valid_new_tx2.clone()),
790            priority: Priority::Value(1000),
791        };
792        tx_sender.send(pending_tx2.clone()).unwrap();
793
794        // Add new transactions to the iterator
795        best.add_new_transactions();
796
797        // Verify that the new transaction has been added to 'all'
798        assert_eq!(best.all.len(), 7);
799        assert!(best.all.contains_key(valid_new_tx2.id()));
800
801        // Verify that the new transaction has not been added to the 'independent' set
802        assert_eq!(best.independent.len(), 2);
803        assert!(!best.independent.contains(&pending_tx2));
804    }
805
806    #[test]
807    fn test_best_transactions_filter_trait_object() {
808        // Initialize a new PendingPool with default MockOrdering and MockTransactionFactory
809        let mut pool = PendingPool::new(MockOrdering::default());
810        let mut f = MockTransactionFactory::default();
811
812        // Add 5 transactions with increasing nonces to the pool
813        let num_tx = 5;
814        let tx = MockTransaction::eip1559();
815        for nonce in 0..num_tx {
816            let tx = tx.clone().rng_hash().with_nonce(nonce);
817            let valid_tx = f.validated(tx);
818            pool.add_transaction(Arc::new(valid_tx), 0);
819        }
820
821        // Create a trait object of BestTransactions iterator from the pool
822        let best: Box<dyn crate::traits::BestTransactions<Item = _>> = Box::new(pool.best());
823
824        // Create a filter that only returns transactions with even nonces
825        let filter =
826            BestTransactionFilter::new(best, |tx: &Arc<ValidPoolTransaction<MockTransaction>>| {
827                tx.nonce().is_multiple_of(2)
828            });
829
830        // Verify that the filter only returns transactions with even nonces
831        for tx in filter {
832            assert_eq!(tx.nonce() % 2, 0);
833        }
834    }
835
836    #[test]
837    fn test_best_transactions_prioritized_senders() {
838        let mut pool = PendingPool::new(MockOrdering::default());
839        let mut f = MockTransactionFactory::default();
840
841        // Add 5 plain transactions from different senders with increasing gas price
842        for gas_price in 0..5 {
843            let tx = MockTransaction::eip1559().with_gas_price((gas_price + 1) * 10);
844            let valid_tx = f.validated(tx);
845            pool.add_transaction(Arc::new(valid_tx), 0);
846        }
847
848        // Add another transaction with 5 gas price that's going to be prioritized by sender
849        let prioritized_tx = MockTransaction::eip1559().with_gas_price(5).with_gas_limit(200);
850        let valid_prioritized_tx = f.validated(prioritized_tx.clone());
851        pool.add_transaction(Arc::new(valid_prioritized_tx), 0);
852
853        // Add another transaction with 3 gas price that should not be prioritized by sender because
854        // of gas limit.
855        let prioritized_tx2 = MockTransaction::eip1559().with_gas_price(3);
856        let valid_prioritized_tx2 = f.validated(prioritized_tx2.clone());
857        pool.add_transaction(Arc::new(valid_prioritized_tx2), 0);
858
859        let prioritized_senders =
860            HashSet::from([prioritized_tx.sender(), prioritized_tx2.sender()]);
861        let best =
862            BestTransactionsWithPrioritizedSenders::new(prioritized_senders, 200, pool.best());
863
864        // Verify that the prioritized transaction is returned first
865        // and the rest are returned in the reverse order of gas price
866        let mut iter = best.into_iter();
867        let top_of_block_tx = iter.next().unwrap();
868        assert_eq!(top_of_block_tx.max_fee_per_gas(), 5);
869        assert_eq!(top_of_block_tx.sender(), prioritized_tx.sender());
870        for gas_price in (0..5).rev() {
871            assert_eq!(iter.next().unwrap().max_fee_per_gas(), (gas_price + 1) * 10);
872        }
873
874        // Due to the gas limit, the transaction from second-prioritized sender was not
875        // prioritized.
876        let top_of_block_tx2 = iter.next().unwrap();
877        assert_eq!(top_of_block_tx2.max_fee_per_gas(), 3);
878        assert_eq!(top_of_block_tx2.sender(), prioritized_tx2.sender());
879    }
880
881    #[test]
882    fn test_best_with_fees_iter_no_blob_fee_required() {
883        // Tests transactions without blob fees where base fees are checked.
884        let mut pool = PendingPool::new(MockOrdering::default());
885        let mut f = MockTransactionFactory::default();
886
887        let base_fee: u64 = 10;
888        let base_fee_per_blob_gas: u64 = 0; // No blob fee requirement
889
890        // Insert transactions with max_fee_per_gas above the base fee
891        for nonce in 0..5 {
892            let tx = MockTransaction::eip1559()
893                .rng_hash()
894                .with_nonce(nonce)
895                .with_max_fee(base_fee as u128 + 5);
896            let valid_tx = f.validated(tx);
897            pool.add_transaction(Arc::new(valid_tx), 0);
898        }
899
900        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
901
902        // All transactions should be returned as no blob fee requirement is imposed
903        for nonce in 0..5 {
904            let tx = best.next().expect("Transaction should be returned");
905            assert_eq!(tx.nonce(), nonce);
906        }
907
908        // Ensure no more transactions are left
909        assert!(best.next().is_none());
910    }
911
912    #[test]
913    fn test_best_with_fees_iter_mix_of_blob_and_non_blob_transactions() {
914        // Tests mixed scenarios with both blob and non-blob transactions.
915        let mut pool = PendingPool::new(MockOrdering::default());
916        let mut f = MockTransactionFactory::default();
917
918        let base_fee: u64 = 10;
919        let base_fee_per_blob_gas: u64 = 15;
920
921        // Add a non-blob transaction that satisfies the base fee
922        let tx_non_blob =
923            MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
924        pool.add_transaction(Arc::new(f.validated(tx_non_blob.clone())), 0);
925
926        // Add a blob transaction that satisfies both base fee and blob fee
927        let tx_blob = MockTransaction::eip4844()
928            .rng_hash()
929            .with_nonce(1)
930            .with_max_fee(base_fee as u128 + 5)
931            .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
932        pool.add_transaction(Arc::new(f.validated(tx_blob.clone())), 0);
933
934        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
935
936        // Verify both transactions are returned
937        let tx = best.next().expect("Transaction should be returned");
938        assert_eq!(tx.transaction, tx_non_blob);
939
940        let tx = best.next().expect("Transaction should be returned");
941        assert_eq!(tx.transaction, tx_blob);
942
943        // Ensure no more transactions are left
944        assert!(best.next().is_none());
945    }
946
947    #[test]
948    fn test_best_transactions_with_skipping_blobs() {
949        // Tests the skip_blobs functionality to ensure blob transactions are skipped.
950        let mut pool = PendingPool::new(MockOrdering::default());
951        let mut f = MockTransactionFactory::default();
952
953        // Add a blob transaction
954        let tx_blob = MockTransaction::eip4844().rng_hash().with_nonce(0).with_blob_fee(100);
955        let valid_blob_tx = f.validated(tx_blob);
956        pool.add_transaction(Arc::new(valid_blob_tx), 0);
957
958        // Add a non-blob transaction
959        let tx_non_blob = MockTransaction::eip1559().rng_hash().with_nonce(1).with_max_fee(200);
960        let valid_non_blob_tx = f.validated(tx_non_blob.clone());
961        pool.add_transaction(Arc::new(valid_non_blob_tx), 0);
962
963        let mut best = pool.best();
964        best.skip_blobs();
965
966        // Only the non-blob transaction should be returned
967        let tx = best.next().expect("Transaction should be returned");
968        assert_eq!(tx.transaction, tx_non_blob);
969
970        // Ensure no more transactions are left
971        assert!(best.next().is_none());
972    }
973
974    #[test]
975    fn test_best_transactions_no_updates() {
976        // Tests the no_updates functionality to ensure it properly clears the
977        // new_transaction_receiver.
978        let mut pool = PendingPool::new(MockOrdering::default());
979        let mut f = MockTransactionFactory::default();
980
981        // Add a transaction
982        let tx = MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(100);
983        let valid_tx = f.validated(tx);
984        pool.add_transaction(Arc::new(valid_tx), 0);
985
986        let mut best = pool.best();
987
988        // Use a broadcast channel for transaction updates
989        let (_tx_sender, tx_receiver) =
990            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
991        best.new_transaction_receiver = Some(tx_receiver);
992
993        // Ensure receiver is set
994        assert!(best.new_transaction_receiver.is_some());
995
996        // Call no_updates to clear the receiver
997        best.no_updates();
998
999        // Ensure receiver is cleared
1000        assert!(best.new_transaction_receiver.is_none());
1001    }
1002
1003    #[test]
1004    fn test_best_update_transaction_priority() {
1005        let mut pool = PendingPool::new(MockOrdering::default());
1006        let mut f = MockTransactionFactory::default();
1007
1008        // Add 5 transactions with increasing nonces to the pool
1009        let num_tx = 5;
1010        let tx = MockTransaction::eip1559();
1011        for nonce in 0..num_tx {
1012            let tx = tx.clone().rng_hash().with_nonce(nonce);
1013            let valid_tx = f.validated(tx);
1014            pool.add_transaction(Arc::new(valid_tx), 0);
1015        }
1016
1017        // Create a BestTransactions iterator from the pool
1018        let mut best = pool.best();
1019
1020        // Use a broadcast channel for transaction updates
1021        let (tx_sender, tx_receiver) =
1022            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
1023        best.new_transaction_receiver = Some(tx_receiver);
1024
1025        // yield one tx, effectively locking in the highest prio
1026        let first = best.next().unwrap();
1027
1028        // Create a new transaction with nonce 5 and validate it
1029        let new_higher_fee_tx = MockTransaction::eip1559().with_nonce(0);
1030        let valid_new_higher_fee_tx = f.validated(new_higher_fee_tx);
1031
1032        // Send the new transaction through the broadcast channel
1033        let pending_tx = PendingTransaction {
1034            submission_id: 10,
1035            transaction: Arc::new(valid_new_higher_fee_tx.clone()),
1036            priority: Priority::Value(u128::MAX),
1037        };
1038        tx_sender.send(pending_tx).unwrap();
1039
1040        // ensure that the higher prio tx is skipped since we yielded a lower one
1041        for tx in best {
1042            assert_eq!(tx.sender_id(), first.sender_id());
1043            assert_ne!(tx.sender_id(), valid_new_higher_fee_tx.sender_id());
1044        }
1045    }
1046}