Skip to main content

reth_transaction_pool/pool/
best.rs

1use crate::{
2    error::{Eip4844PoolTransactionError, InvalidPoolTransactionError},
3    identifier::{SenderId, TransactionId},
4    pool::pending::PendingTransaction,
5    PoolTransaction, Priority, TransactionOrdering, ValidPoolTransaction,
6};
7use alloy_consensus::Transaction;
8use alloy_primitives::map::AddressSet;
9use core::fmt;
10use imbl::OrdMap;
11use reth_primitives_traits::transaction::error::InvalidTransactionError;
12use rustc_hash::FxHashSet;
13use std::{
14    collections::{BTreeSet, VecDeque},
15    sync::Arc,
16};
17use tokio::sync::broadcast::{error::TryRecvError, Receiver};
18use tracing::debug;
19
20const MAX_NEW_TRANSACTIONS_PER_BATCH: usize = 16;
21
22/// An iterator that returns transactions that can be executed on the current state (*best*
23/// transactions).
24///
25/// This is a wrapper around [`BestTransactions`] that also enforces a specific basefee.
26///
27/// This iterator guarantees that all transactions it returns satisfy both the base fee and blob
28/// fee!
29pub(crate) struct BestTransactionsWithFees<T: TransactionOrdering> {
30    pub(crate) best: BestTransactions<T>,
31    pub(crate) base_fee: u64,
32    pub(crate) base_fee_per_blob_gas: u64,
33}
34
35impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactionsWithFees<T> {
36    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
37        BestTransactions::mark_invalid(&mut self.best, tx, kind)
38    }
39
40    fn no_updates(&mut self) {
41        self.best.no_updates()
42    }
43
44    fn skip_blobs(&mut self) {
45        self.set_skip_blobs(true)
46    }
47
48    fn set_skip_blobs(&mut self, skip_blobs: bool) {
49        self.best.set_skip_blobs(skip_blobs)
50    }
51}
52
53impl<T: TransactionOrdering> Iterator for BestTransactionsWithFees<T> {
54    type Item = Arc<ValidPoolTransaction<T::Transaction>>;
55
56    fn next(&mut self) -> Option<Self::Item> {
57        // find the next transaction that satisfies the base fee
58        loop {
59            let best = Iterator::next(&mut self.best)?;
60            // If both the base fee and blob fee (if applicable for EIP-4844) are satisfied, return
61            // the transaction
62            if best.transaction.max_fee_per_gas() >= self.base_fee as u128 &&
63                best.transaction
64                    .max_fee_per_blob_gas()
65                    .is_none_or(|fee| fee >= self.base_fee_per_blob_gas as u128)
66            {
67                return Some(best);
68            }
69            crate::traits::BestTransactions::mark_invalid(
70                self,
71                &best,
72                InvalidPoolTransactionError::Underpriced,
73            );
74        }
75    }
76
77    fn size_hint(&self) -> (usize, Option<usize>) {
78        let (_, upper) = self.best.size_hint();
79        (0, upper)
80    }
81}
82
83/// An iterator that returns transactions that can be executed on the current state (*best*
84/// transactions).
85///
86/// The [`PendingPool`](crate::pool::pending::PendingPool) contains transactions that *could* all
87/// be executed on the current state, but only yields transactions that are ready to be executed
88/// now. While it contains all gapless transactions of a sender, it _always_ only returns the
89/// transaction with the current on chain nonce.
90#[derive(Debug)]
91pub struct BestTransactions<T: TransactionOrdering> {
92    /// Contains a copy of _all_ transactions of the pending pool at the point in time this
93    /// iterator was created.
94    pub(crate) all: OrdMap<TransactionId, PendingTransaction<T>>,
95    /// Transactions that can be executed right away: these have the expected nonce.
96    ///
97    /// Once an `independent` transaction with the nonce `N` is returned, it unlocks `N+1`, which
98    /// then can be moved from the `all` set to the `independent` set.
99    pub(crate) independent: BTreeSet<PendingTransaction<T>>,
100    /// There might be the case where a yielded transactions is invalid, this will track it.
101    pub(crate) invalid: FxHashSet<SenderId>,
102    /// Used to receive any new pending transactions that have been added to the pool after this
103    /// iterator was static filtered
104    ///
105    /// These new pending transactions are inserted into this iterator's pool before yielding the
106    /// next value
107    pub(crate) new_transaction_receiver: Option<Receiver<PendingTransaction<T>>>,
108    /// The priority value of most recently yielded transaction.
109    ///
110    /// This is required if new pending transactions are fed in while it yields new values.
111    pub(crate) last_priority: Option<Priority<T::PriorityValue>>,
112    /// Flag to control whether to skip blob transactions (EIP4844).
113    pub(crate) skip_blobs: bool,
114}
115
116impl<T: TransactionOrdering> BestTransactions<T> {
117    /// Mark the transaction and its descendants as invalid.
118    pub(crate) fn mark_invalid(
119        &mut self,
120        tx: &Arc<ValidPoolTransaction<T::Transaction>>,
121        _kind: InvalidPoolTransactionError,
122    ) {
123        self.invalid.insert(tx.sender_id());
124    }
125
126    /// Returns the ancestor the given transaction, the transaction with `nonce - 1`.
127    ///
128    /// Note: for a transaction with nonce higher than the current on chain nonce this will always
129    /// return an ancestor since all transactions in this pool are gapless.
130    pub(crate) fn ancestor(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
131        self.all.get(&id.unchecked_ancestor()?)
132    }
133
134    /// Non-blocking read on the new pending transactions subscription channel
135    fn try_recv(&mut self) -> Option<IncomingTransaction<T>> {
136        loop {
137            match self.new_transaction_receiver.as_mut()?.try_recv() {
138                Ok(tx) => {
139                    if let Some(last_priority) = &self.last_priority &&
140                        &tx.priority > last_priority
141                    {
142                        // we skip transactions if we already yielded a transaction with lower
143                        // priority
144                        return Some(IncomingTransaction::Stash(tx))
145                    }
146                    return Some(IncomingTransaction::Process(tx))
147                }
148                // note TryRecvError::Lagged can be returned here, which is an error that attempts
149                // to correct itself on consecutive try_recv() attempts
150
151                // the cost of ignoring this error is allowing old transactions to get
152                // overwritten after the chan buffer size is met
153                Err(TryRecvError::Lagged(_)) => {
154                    // Handle the case where the receiver lagged too far behind.
155                    // `num_skipped` indicates the number of messages that were skipped.
156                }
157
158                // this case is still better than the existing iterator behavior where no new
159                // pending txs are surfaced to consumers
160                Err(_) => return None,
161            }
162        }
163    }
164
165    /// Removes the currently best independent transaction from the independent set and the total
166    /// set.
167    fn pop_best(&mut self) -> Option<PendingTransaction<T>> {
168        self.independent.pop_last().inspect(|best| {
169            self.all.remove(best.transaction.id());
170        })
171    }
172
173    /// Checks for new transactions that have come into the `PendingPool` after this iterator was
174    /// created and inserts them
175    fn add_new_transactions(&mut self) {
176        for _ in 0..MAX_NEW_TRANSACTIONS_PER_BATCH {
177            if let Some(pending_tx) = self.try_recv() {
178                //  same logic as PendingPool::add_transaction/PendingPool::best_with_unlocked
179
180                match pending_tx {
181                    IncomingTransaction::Process(tx) => {
182                        let tx_id = *tx.transaction.id();
183                        if self.ancestor(&tx_id).is_none() {
184                            self.independent.insert(tx.clone());
185                        }
186                        self.all.insert(tx_id, tx);
187                    }
188                    IncomingTransaction::Stash(tx) => {
189                        let tx_id = *tx.transaction.id();
190                        self.all.insert(tx_id, tx);
191                    }
192                }
193            } else {
194                break;
195            }
196        }
197    }
198
199    /// Returns the next best transaction and its priority value.
200    #[expect(clippy::type_complexity)]
201    pub fn next_tx_and_priority(
202        &mut self,
203    ) -> Option<(Arc<ValidPoolTransaction<T::Transaction>>, Priority<T::PriorityValue>)> {
204        loop {
205            self.add_new_transactions();
206            // Remove the next independent tx with the highest priority
207            let best = self.pop_best()?;
208            let sender_id = best.transaction.sender_id();
209
210            // skip transactions for which sender was marked as invalid
211            if self.invalid.contains(&sender_id) {
212                debug!(
213                    target: "txpool",
214                    "[{:?}] skipping invalid transaction",
215                    best.transaction.hash()
216                );
217                continue
218            }
219
220            // Insert transactions that just got unlocked.
221            if let Some(unlocked) = self.all.get(&best.unlocks()) {
222                self.independent.insert(unlocked.clone());
223            }
224
225            if self.skip_blobs && best.transaction.is_eip4844() {
226                // blobs should be skipped, marking them as invalid will ensure that no dependent
227                // transactions are returned
228                self.mark_invalid(
229                    &best.transaction,
230                    InvalidPoolTransactionError::Eip4844(
231                        Eip4844PoolTransactionError::NoEip4844Blobs,
232                    ),
233                )
234            } else {
235                if self.new_transaction_receiver.is_some() {
236                    self.last_priority = Some(best.priority.clone())
237                }
238                return Some((best.transaction, best.priority))
239            }
240        }
241    }
242}
243
244/// Result of attempting to receive a new transaction from the channel during iteration.
245///
246/// This enum determines how a newly received transaction should be handled based on its priority
247/// relative to transactions already yielded by the iterator.
248enum IncomingTransaction<T: TransactionOrdering> {
249    /// Process the transaction normally: add to both `all` map and potentially to `independent`
250    /// set (if it has no ancestor).
251    ///
252    /// This variant is used when the transaction's priority is lower than or equal to the last
253    /// yielded transaction, meaning it can be safely processed without breaking the descending
254    /// priority order.
255    Process(PendingTransaction<T>),
256
257    /// Stash the transaction: add only to the `all` map, but NOT to the `independent` set.
258    ///
259    /// This variant is used when the transaction has a higher priority than the last yielded
260    /// transaction. We cannot yield it immediately (to maintain strict priority ordering), but we
261    /// must still track it so that:
262    /// - Its descendants can find it via `ancestor()` lookups
263    /// - We prevent those descendants from being incorrectly promoted to `independent`
264    ///
265    /// Without stashing, if a child of this transaction arrives later, it would fail to find its
266    /// parent in `all`, be marked as `independent`, and be yielded out of order (before its
267    /// parent), causing nonce gaps.
268    Stash(PendingTransaction<T>),
269}
270
271impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactions<T> {
272    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
273        Self::mark_invalid(self, tx, kind)
274    }
275
276    fn no_updates(&mut self) {
277        self.new_transaction_receiver.take();
278        self.last_priority.take();
279    }
280
281    fn skip_blobs(&mut self) {
282        self.set_skip_blobs(true);
283    }
284
285    fn set_skip_blobs(&mut self, skip_blobs: bool) {
286        self.skip_blobs = skip_blobs;
287    }
288}
289
290impl<T: TransactionOrdering> Iterator for BestTransactions<T> {
291    type Item = Arc<ValidPoolTransaction<T::Transaction>>;
292
293    fn next(&mut self) -> Option<Self::Item> {
294        self.next_tx_and_priority().map(|(tx, _)| tx)
295    }
296
297    fn size_hint(&self) -> (usize, Option<usize>) {
298        (0, self.new_transaction_receiver.is_none().then_some(self.all.len()))
299    }
300}
301
302/// A [`BestTransactions`](crate::traits::BestTransactions) implementation that filters the
303/// transactions of iter with predicate.
304///
305/// Filter out transactions are marked as invalid:
306/// [`BestTransactions::mark_invalid`](crate::traits::BestTransactions::mark_invalid).
307pub struct BestTransactionFilter<I, P> {
308    pub(crate) best: I,
309    pub(crate) predicate: P,
310}
311
312impl<I, P> BestTransactionFilter<I, P> {
313    /// Create a new [`BestTransactionFilter`] with the given predicate.
314    pub const fn new(best: I, predicate: P) -> Self {
315        Self { best, predicate }
316    }
317}
318
319impl<I, P> Iterator for BestTransactionFilter<I, P>
320where
321    I: crate::traits::BestTransactions,
322    P: FnMut(&<I as Iterator>::Item) -> bool,
323{
324    type Item = <I as Iterator>::Item;
325
326    fn next(&mut self) -> Option<Self::Item> {
327        loop {
328            let best = self.best.next()?;
329            if (self.predicate)(&best) {
330                return Some(best)
331            }
332            self.best.mark_invalid(
333                &best,
334                InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
335            );
336        }
337    }
338
339    fn size_hint(&self) -> (usize, Option<usize>) {
340        let (_, upper) = self.best.size_hint();
341        (0, upper)
342    }
343}
344
345impl<I, P> crate::traits::BestTransactions for BestTransactionFilter<I, P>
346where
347    I: crate::traits::BestTransactions,
348    P: FnMut(&<I as Iterator>::Item) -> bool + Send,
349{
350    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
351        crate::traits::BestTransactions::mark_invalid(&mut self.best, tx, kind)
352    }
353
354    fn no_updates(&mut self) {
355        self.best.no_updates()
356    }
357
358    fn skip_blobs(&mut self) {
359        self.set_skip_blobs(true)
360    }
361
362    fn set_skip_blobs(&mut self, skip_blobs: bool) {
363        self.best.set_skip_blobs(skip_blobs)
364    }
365}
366
367impl<I: fmt::Debug, P> fmt::Debug for BestTransactionFilter<I, P> {
368    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
369        f.debug_struct("BestTransactionFilter").field("best", &self.best).finish()
370    }
371}
372
373/// Wrapper over [`crate::traits::BestTransactions`] that prioritizes transactions of certain
374/// senders capping total gas used by such transactions.
375#[derive(Debug)]
376pub struct BestTransactionsWithPrioritizedSenders<I: Iterator> {
377    /// Inner iterator
378    inner: I,
379    /// A set of senders which transactions should be prioritized
380    prioritized_senders: AddressSet,
381    /// Maximum total gas limit of prioritized transactions
382    max_prioritized_gas: u64,
383    /// Buffer with transactions that are not being prioritized. Those will be the first to be
384    /// included after the prioritized transactions
385    buffer: VecDeque<I::Item>,
386    /// Tracker of total gas limit of prioritized transactions. Once it reaches
387    /// `max_prioritized_gas` no more transactions will be prioritized
388    prioritized_gas: u64,
389}
390
391impl<I: Iterator> BestTransactionsWithPrioritizedSenders<I> {
392    /// Constructs a new [`BestTransactionsWithPrioritizedSenders`].
393    pub fn new(prioritized_senders: AddressSet, max_prioritized_gas: u64, inner: I) -> Self {
394        Self {
395            inner,
396            prioritized_senders,
397            max_prioritized_gas,
398            buffer: Default::default(),
399            prioritized_gas: Default::default(),
400        }
401    }
402}
403
404impl<I, T> Iterator for BestTransactionsWithPrioritizedSenders<I>
405where
406    I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
407    T: PoolTransaction,
408{
409    type Item = <I as Iterator>::Item;
410
411    fn next(&mut self) -> Option<Self::Item> {
412        // If we have space, try prioritizing transactions
413        if self.prioritized_gas < self.max_prioritized_gas {
414            for item in &mut self.inner {
415                if self.prioritized_senders.contains(&item.transaction.sender()) &&
416                    self.prioritized_gas + item.transaction.gas_limit() <=
417                        self.max_prioritized_gas
418                {
419                    self.prioritized_gas += item.transaction.gas_limit();
420                    return Some(item)
421                }
422                self.buffer.push_back(item);
423            }
424        }
425
426        if let Some(item) = self.buffer.pop_front() {
427            Some(item)
428        } else {
429            self.inner.next()
430        }
431    }
432
433    fn size_hint(&self) -> (usize, Option<usize>) {
434        let buffered = self.buffer.len();
435        let (inner_lower, inner_upper) = self.inner.size_hint();
436
437        (
438            buffered.saturating_add(inner_lower),
439            inner_upper.and_then(|upper| upper.checked_add(buffered)),
440        )
441    }
442}
443
444impl<I, T> crate::traits::BestTransactions for BestTransactionsWithPrioritizedSenders<I>
445where
446    I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
447    T: PoolTransaction,
448{
449    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
450        self.inner.mark_invalid(tx, kind)
451    }
452
453    fn no_updates(&mut self) {
454        self.inner.no_updates()
455    }
456
457    fn set_skip_blobs(&mut self, skip_blobs: bool) {
458        if skip_blobs {
459            self.buffer.retain(|tx| !tx.transaction.is_eip4844())
460        }
461        self.inner.set_skip_blobs(skip_blobs)
462    }
463}
464
465#[cfg(test)]
466mod tests {
467    use super::*;
468    use crate::{
469        pool::pending::PendingPool,
470        test_utils::{MockOrdering, MockTransaction, MockTransactionFactory},
471        BestTransactions, Priority,
472    };
473
474    #[test]
475    fn test_best_iter() {
476        let mut pool = PendingPool::new(MockOrdering::default());
477        let mut f = MockTransactionFactory::default();
478
479        let num_tx = 10;
480        // insert 10 gapless tx
481        let tx = MockTransaction::eip1559();
482        for nonce in 0..num_tx {
483            let tx = tx.clone().rng_hash().with_nonce(nonce);
484            let valid_tx = f.validated(tx);
485            pool.add_transaction(Arc::new(valid_tx), 0);
486        }
487
488        let mut best = pool.best();
489        assert_eq!(best.all.len(), num_tx as usize);
490        assert_eq!(best.independent.len(), 1);
491
492        // check tx are returned in order
493        for nonce in 0..num_tx {
494            assert_eq!(best.independent.len(), 1);
495            let tx = best.next().unwrap();
496            assert_eq!(tx.nonce(), nonce);
497        }
498    }
499
500    #[test]
501    fn test_best_transactions_size_hint() {
502        let mut pool = PendingPool::new(MockOrdering::default());
503        let mut f = MockTransactionFactory::default();
504
505        for nonce in 0..3 {
506            let tx = MockTransaction::eip1559().rng_hash().with_nonce(nonce);
507            pool.add_transaction(Arc::new(f.validated(tx)), 0);
508        }
509
510        let mut best = pool.best();
511        assert_eq!(best.size_hint(), (0, None));
512
513        best.no_updates();
514        assert_eq!(best.size_hint(), (0, Some(3)));
515
516        assert_eq!(best.next().unwrap().nonce(), 0);
517        assert_eq!(best.size_hint(), (0, Some(2)));
518    }
519
520    #[test]
521    fn test_best_transactions_with_fees_size_hint() {
522        let mut pool = PendingPool::new(MockOrdering::default());
523        let mut f = MockTransactionFactory::default();
524
525        for nonce in 0..3 {
526            let tx = MockTransaction::eip1559().rng_hash().with_nonce(nonce).with_max_fee(100);
527            pool.add_transaction(Arc::new(f.validated(tx)), 0);
528        }
529
530        let mut best = pool.best_with_basefee_and_blobfee(10, 0);
531        best.no_updates();
532
533        assert_eq!(best.size_hint(), (0, Some(3)));
534        assert_eq!(best.next().unwrap().nonce(), 0);
535        assert_eq!(best.size_hint(), (0, Some(2)));
536    }
537
538    #[test]
539    fn test_best_transaction_filter_size_hint() {
540        let mut pool = PendingPool::new(MockOrdering::default());
541        let mut f = MockTransactionFactory::default();
542
543        for nonce in 0..3 {
544            let tx = MockTransaction::eip1559().rng_hash().with_nonce(nonce);
545            pool.add_transaction(Arc::new(f.validated(tx)), 0);
546        }
547
548        let best = pool.best().without_updates();
549        let mut filter =
550            BestTransactionFilter::new(best, |_: &Arc<ValidPoolTransaction<MockTransaction>>| {
551                false
552            });
553
554        assert_eq!(filter.size_hint(), (0, Some(3)));
555        assert!(filter.next().is_none());
556        assert_eq!(filter.size_hint(), (0, Some(0)));
557    }
558
559    #[test]
560    fn test_best_transactions_with_prioritized_senders_size_hint() {
561        let mut pool = PendingPool::new(MockOrdering::default());
562        let mut f = MockTransactionFactory::default();
563
564        for gas_price in 0..5 {
565            let tx = MockTransaction::eip1559().with_gas_price((gas_price + 1) * 10);
566            pool.add_transaction(Arc::new(f.validated(tx)), 0);
567        }
568
569        let prioritized_tx = MockTransaction::eip1559().with_gas_price(5).with_gas_limit(200);
570        let prioritized_sender = prioritized_tx.sender();
571        pool.add_transaction(Arc::new(f.validated(prioritized_tx)), 0);
572
573        let mut best = BestTransactionsWithPrioritizedSenders::new(
574            AddressSet::from_iter([prioritized_sender]),
575            200,
576            pool.best().without_updates(),
577        );
578
579        assert_eq!(best.size_hint(), (0, Some(6)));
580        assert_eq!(best.next().unwrap().sender(), prioritized_sender);
581        assert_eq!(best.size_hint(), (5, Some(5)));
582    }
583
584    #[test]
585    fn test_best_iter_invalid() {
586        let mut pool = PendingPool::new(MockOrdering::default());
587        let mut f = MockTransactionFactory::default();
588
589        let num_tx = 10;
590        // insert 10 gapless tx
591        let tx = MockTransaction::eip1559();
592        for nonce in 0..num_tx {
593            let tx = tx.clone().rng_hash().with_nonce(nonce);
594            let valid_tx = f.validated(tx);
595            pool.add_transaction(Arc::new(valid_tx), 0);
596        }
597
598        let mut best = pool.best();
599
600        // mark the first tx as invalid
601        let invalid = best.independent.iter().next().unwrap();
602        best.mark_invalid(
603            &invalid.transaction.clone(),
604            InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
605        );
606
607        // iterator is empty
608        assert!(best.next().is_none());
609    }
610
611    #[test]
612    fn test_best_transactions_iter_invalid() {
613        let mut pool = PendingPool::new(MockOrdering::default());
614        let mut f = MockTransactionFactory::default();
615
616        let num_tx = 10;
617        // insert 10 gapless tx
618        let tx = MockTransaction::eip1559();
619        for nonce in 0..num_tx {
620            let tx = tx.clone().rng_hash().with_nonce(nonce);
621            let valid_tx = f.validated(tx);
622            pool.add_transaction(Arc::new(valid_tx), 0);
623        }
624
625        let mut best: Box<
626            dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<MockTransaction>>>,
627        > = Box::new(pool.best());
628
629        let tx = Iterator::next(&mut best).unwrap();
630        crate::traits::BestTransactions::mark_invalid(
631            &mut *best,
632            &tx,
633            InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
634        );
635        assert!(Iterator::next(&mut best).is_none());
636    }
637
638    #[test]
639    fn test_best_with_fees_iter_base_fee_satisfied() {
640        let mut pool = PendingPool::new(MockOrdering::default());
641        let mut f = MockTransactionFactory::default();
642
643        let num_tx = 5;
644        let base_fee: u64 = 10;
645        let base_fee_per_blob_gas: u64 = 15;
646
647        // Insert transactions with a max_fee_per_gas greater than or equal to the base fee
648        // Without blob fee
649        for nonce in 0..num_tx {
650            let tx = MockTransaction::eip1559()
651                .rng_hash()
652                .with_nonce(nonce)
653                .with_max_fee(base_fee as u128 + 5);
654            let valid_tx = f.validated(tx);
655            pool.add_transaction(Arc::new(valid_tx), 0);
656        }
657
658        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
659
660        for nonce in 0..num_tx {
661            let tx = best.next().expect("Transaction should be returned");
662            assert_eq!(tx.nonce(), nonce);
663            assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
664        }
665    }
666
667    #[test]
668    fn test_best_with_fees_iter_base_fee_violated() {
669        let mut pool = PendingPool::new(MockOrdering::default());
670        let mut f = MockTransactionFactory::default();
671
672        let num_tx = 5;
673        let base_fee: u64 = 20;
674        let base_fee_per_blob_gas: u64 = 15;
675
676        // Insert transactions with a max_fee_per_gas less than the base fee
677        for nonce in 0..num_tx {
678            let tx = MockTransaction::eip1559()
679                .rng_hash()
680                .with_nonce(nonce)
681                .with_max_fee(base_fee as u128 - 5);
682            let valid_tx = f.validated(tx);
683            pool.add_transaction(Arc::new(valid_tx), 0);
684        }
685
686        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
687
688        // No transaction should be returned since all violate the base fee
689        assert!(best.next().is_none());
690    }
691
692    #[test]
693    fn test_best_with_fees_iter_blob_fee_satisfied() {
694        let mut pool = PendingPool::new(MockOrdering::default());
695        let mut f = MockTransactionFactory::default();
696
697        let num_tx = 5;
698        let base_fee: u64 = 10;
699        let base_fee_per_blob_gas: u64 = 20;
700
701        // Insert transactions with a max_fee_per_blob_gas greater than or equal to the base fee per
702        // blob gas
703        for nonce in 0..num_tx {
704            let tx = MockTransaction::eip4844()
705                .rng_hash()
706                .with_nonce(nonce)
707                .with_max_fee(base_fee as u128 + 5)
708                .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
709            let valid_tx = f.validated(tx);
710            pool.add_transaction(Arc::new(valid_tx), 0);
711        }
712
713        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
714
715        // All transactions should be returned in order since they satisfy both base fee and blob
716        // fee
717        for nonce in 0..num_tx {
718            let tx = best.next().expect("Transaction should be returned");
719            assert_eq!(tx.nonce(), nonce);
720            assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
721            assert!(
722                tx.transaction.max_fee_per_blob_gas().unwrap() >= base_fee_per_blob_gas as u128
723            );
724        }
725
726        // No more transactions should be returned
727        assert!(best.next().is_none());
728    }
729
730    #[test]
731    fn test_best_with_fees_iter_blob_fee_violated() {
732        let mut pool = PendingPool::new(MockOrdering::default());
733        let mut f = MockTransactionFactory::default();
734
735        let num_tx = 5;
736        let base_fee: u64 = 10;
737        let base_fee_per_blob_gas: u64 = 20;
738
739        // Insert transactions with a max_fee_per_blob_gas less than the base fee per blob gas
740        for nonce in 0..num_tx {
741            let tx = MockTransaction::eip4844()
742                .rng_hash()
743                .with_nonce(nonce)
744                .with_max_fee(base_fee as u128 + 5)
745                .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
746            let valid_tx = f.validated(tx);
747            pool.add_transaction(Arc::new(valid_tx), 0);
748        }
749
750        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
751
752        // No transaction should be returned since all violate the blob fee
753        assert!(best.next().is_none());
754    }
755
756    #[test]
757    fn test_best_with_fees_iter_mixed_fees() {
758        let mut pool = PendingPool::new(MockOrdering::default());
759        let mut f = MockTransactionFactory::default();
760
761        let base_fee: u64 = 10;
762        let base_fee_per_blob_gas: u64 = 20;
763
764        // Insert transactions with varying max_fee_per_gas and max_fee_per_blob_gas
765        let tx1 =
766            MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
767        let tx2 = MockTransaction::eip4844()
768            .rng_hash()
769            .with_nonce(1)
770            .with_max_fee(base_fee as u128 + 5)
771            .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
772        let tx3 = MockTransaction::eip4844()
773            .rng_hash()
774            .with_nonce(2)
775            .with_max_fee(base_fee as u128 + 5)
776            .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
777        let tx4 =
778            MockTransaction::eip1559().rng_hash().with_nonce(3).with_max_fee(base_fee as u128 - 5);
779
780        pool.add_transaction(Arc::new(f.validated(tx1.clone())), 0);
781        pool.add_transaction(Arc::new(f.validated(tx2.clone())), 0);
782        pool.add_transaction(Arc::new(f.validated(tx3)), 0);
783        pool.add_transaction(Arc::new(f.validated(tx4)), 0);
784
785        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
786
787        let expected_order = vec![tx1, tx2];
788        for expected_tx in expected_order {
789            let tx = best.next().expect("Transaction should be returned");
790            assert_eq!(tx.transaction, expected_tx);
791        }
792
793        // No more transactions should be returned
794        assert!(best.next().is_none());
795    }
796
797    #[test]
798    fn test_best_add_transaction_with_next_nonce() {
799        let mut pool = PendingPool::new(MockOrdering::default());
800        let mut f = MockTransactionFactory::default();
801
802        // Add 5 transactions with increasing nonces to the pool
803        let num_tx = 5;
804        let tx = MockTransaction::eip1559();
805        for nonce in 0..num_tx {
806            let tx = tx.clone().rng_hash().with_nonce(nonce);
807            let valid_tx = f.validated(tx);
808            pool.add_transaction(Arc::new(valid_tx), 0);
809        }
810
811        // Create a BestTransactions iterator from the pool
812        let mut best = pool.best();
813
814        // Use a broadcast channel for transaction updates
815        let (tx_sender, tx_receiver) =
816            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
817        best.new_transaction_receiver = Some(tx_receiver);
818
819        // Create a new transaction with nonce 5 and validate it
820        let new_tx = MockTransaction::eip1559().rng_hash().with_nonce(5);
821        let valid_new_tx = f.validated(new_tx);
822
823        // Send the new transaction through the broadcast channel
824        let pending_tx = PendingTransaction {
825            submission_id: 10,
826            transaction: Arc::new(valid_new_tx.clone()),
827            priority: Priority::Value(1000),
828        };
829        tx_sender.send(pending_tx.clone()).unwrap();
830
831        // Add new transactions to the iterator
832        best.add_new_transactions();
833
834        // Verify that the new transaction has been added to the 'all' map
835        assert_eq!(best.all.len(), 6);
836        assert!(best.all.contains_key(valid_new_tx.id()));
837
838        // Verify that the new transaction has been added to the 'independent' set
839        assert_eq!(best.independent.len(), 2);
840        assert!(best.independent.contains(&pending_tx));
841    }
842
843    #[test]
844    fn test_best_add_transaction_with_ancestor() {
845        // Initialize a new PendingPool with default MockOrdering and MockTransactionFactory
846        let mut pool = PendingPool::new(MockOrdering::default());
847        let mut f = MockTransactionFactory::default();
848
849        // Add 5 transactions with increasing nonces to the pool
850        let num_tx = 5;
851        let tx = MockTransaction::eip1559();
852        for nonce in 0..num_tx {
853            let tx = tx.clone().rng_hash().with_nonce(nonce);
854            let valid_tx = f.validated(tx);
855            pool.add_transaction(Arc::new(valid_tx), 0);
856        }
857
858        // Create a BestTransactions iterator from the pool
859        let mut best = pool.best();
860
861        // Use a broadcast channel for transaction updates
862        let (tx_sender, tx_receiver) =
863            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
864        best.new_transaction_receiver = Some(tx_receiver);
865
866        // Create a new transaction with nonce 5 and validate it
867        let base_tx1 = MockTransaction::eip1559().rng_hash().with_nonce(5);
868        let valid_new_tx1 = f.validated(base_tx1.clone());
869
870        // Send the new transaction through the broadcast channel
871        let pending_tx1 = PendingTransaction {
872            submission_id: 10,
873            transaction: Arc::new(valid_new_tx1.clone()),
874            priority: Priority::Value(1000),
875        };
876        tx_sender.send(pending_tx1.clone()).unwrap();
877
878        // Add new transactions to the iterator
879        best.add_new_transactions();
880
881        // Verify that the new transaction has been added to the 'all' map
882        assert_eq!(best.all.len(), 6);
883        assert!(best.all.contains_key(valid_new_tx1.id()));
884
885        // Verify that the new transaction has been added to the 'independent' set
886        assert_eq!(best.independent.len(), 2);
887        assert!(best.independent.contains(&pending_tx1));
888
889        // Attempt to add a new transaction with a different nonce (not a duplicate)
890        let base_tx2 = base_tx1.with_nonce(6);
891        let valid_new_tx2 = f.validated(base_tx2);
892
893        // Send the new transaction through the broadcast channel
894        let pending_tx2 = PendingTransaction {
895            submission_id: 11, // Different submission ID
896            transaction: Arc::new(valid_new_tx2.clone()),
897            priority: Priority::Value(1000),
898        };
899        tx_sender.send(pending_tx2.clone()).unwrap();
900
901        // Add new transactions to the iterator
902        best.add_new_transactions();
903
904        // Verify that the new transaction has been added to 'all'
905        assert_eq!(best.all.len(), 7);
906        assert!(best.all.contains_key(valid_new_tx2.id()));
907
908        // Verify that the new transaction has not been added to the 'independent' set
909        assert_eq!(best.independent.len(), 2);
910        assert!(!best.independent.contains(&pending_tx2));
911    }
912
913    #[test]
914    fn test_best_transactions_filter_trait_object() {
915        // Initialize a new PendingPool with default MockOrdering and MockTransactionFactory
916        let mut pool = PendingPool::new(MockOrdering::default());
917        let mut f = MockTransactionFactory::default();
918
919        // Add 5 transactions with increasing nonces to the pool
920        let num_tx = 5;
921        let tx = MockTransaction::eip1559();
922        for nonce in 0..num_tx {
923            let tx = tx.clone().rng_hash().with_nonce(nonce);
924            let valid_tx = f.validated(tx);
925            pool.add_transaction(Arc::new(valid_tx), 0);
926        }
927
928        // Create a trait object of BestTransactions iterator from the pool
929        let best: Box<dyn crate::traits::BestTransactions<Item = _>> = Box::new(pool.best());
930
931        // Create a filter that only returns transactions with even nonces
932        let filter =
933            BestTransactionFilter::new(best, |tx: &Arc<ValidPoolTransaction<MockTransaction>>| {
934                tx.nonce().is_multiple_of(2)
935            });
936
937        // Verify that the filter only returns transactions with even nonces
938        for tx in filter {
939            assert_eq!(tx.nonce() % 2, 0);
940        }
941    }
942
943    #[test]
944    fn test_best_transactions_prioritized_senders() {
945        let mut pool = PendingPool::new(MockOrdering::default());
946        let mut f = MockTransactionFactory::default();
947
948        // Add 5 plain transactions from different senders with increasing gas price
949        for gas_price in 0..5 {
950            let tx = MockTransaction::eip1559().with_gas_price((gas_price + 1) * 10);
951            let valid_tx = f.validated(tx);
952            pool.add_transaction(Arc::new(valid_tx), 0);
953        }
954
955        // Add another transaction with 5 gas price that's going to be prioritized by sender
956        let prioritized_tx = MockTransaction::eip1559().with_gas_price(5).with_gas_limit(200);
957        let valid_prioritized_tx = f.validated(prioritized_tx.clone());
958        pool.add_transaction(Arc::new(valid_prioritized_tx), 0);
959
960        // Add another transaction with 3 gas price that should not be prioritized by sender because
961        // of gas limit.
962        let prioritized_tx2 = MockTransaction::eip1559().with_gas_price(3);
963        let valid_prioritized_tx2 = f.validated(prioritized_tx2.clone());
964        pool.add_transaction(Arc::new(valid_prioritized_tx2), 0);
965
966        let prioritized_senders =
967            AddressSet::from_iter([prioritized_tx.sender(), prioritized_tx2.sender()]);
968        let best =
969            BestTransactionsWithPrioritizedSenders::new(prioritized_senders, 200, pool.best());
970
971        // Verify that the prioritized transaction is returned first
972        // and the rest are returned in the reverse order of gas price
973        let mut iter = best.into_iter();
974        let top_of_block_tx = iter.next().unwrap();
975        assert_eq!(top_of_block_tx.max_fee_per_gas(), 5);
976        assert_eq!(top_of_block_tx.sender(), prioritized_tx.sender());
977        for gas_price in (0..5).rev() {
978            assert_eq!(iter.next().unwrap().max_fee_per_gas(), (gas_price + 1) * 10);
979        }
980
981        // Due to the gas limit, the transaction from second-prioritized sender was not
982        // prioritized.
983        let top_of_block_tx2 = iter.next().unwrap();
984        assert_eq!(top_of_block_tx2.max_fee_per_gas(), 3);
985        assert_eq!(top_of_block_tx2.sender(), prioritized_tx2.sender());
986    }
987
988    #[test]
989    fn test_best_with_fees_iter_no_blob_fee_required() {
990        // Tests transactions without blob fees where base fees are checked.
991        let mut pool = PendingPool::new(MockOrdering::default());
992        let mut f = MockTransactionFactory::default();
993
994        let base_fee: u64 = 10;
995        let base_fee_per_blob_gas: u64 = 0; // No blob fee requirement
996
997        // Insert transactions with max_fee_per_gas above the base fee
998        for nonce in 0..5 {
999            let tx = MockTransaction::eip1559()
1000                .rng_hash()
1001                .with_nonce(nonce)
1002                .with_max_fee(base_fee as u128 + 5);
1003            let valid_tx = f.validated(tx);
1004            pool.add_transaction(Arc::new(valid_tx), 0);
1005        }
1006
1007        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
1008
1009        // All transactions should be returned as no blob fee requirement is imposed
1010        for nonce in 0..5 {
1011            let tx = best.next().expect("Transaction should be returned");
1012            assert_eq!(tx.nonce(), nonce);
1013        }
1014
1015        // Ensure no more transactions are left
1016        assert!(best.next().is_none());
1017    }
1018
1019    #[test]
1020    fn test_best_with_fees_iter_mix_of_blob_and_non_blob_transactions() {
1021        // Tests mixed scenarios with both blob and non-blob transactions.
1022        let mut pool = PendingPool::new(MockOrdering::default());
1023        let mut f = MockTransactionFactory::default();
1024
1025        let base_fee: u64 = 10;
1026        let base_fee_per_blob_gas: u64 = 15;
1027
1028        // Add a non-blob transaction that satisfies the base fee
1029        let tx_non_blob =
1030            MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
1031        pool.add_transaction(Arc::new(f.validated(tx_non_blob.clone())), 0);
1032
1033        // Add a blob transaction that satisfies both base fee and blob fee
1034        let tx_blob = MockTransaction::eip4844()
1035            .rng_hash()
1036            .with_nonce(1)
1037            .with_max_fee(base_fee as u128 + 5)
1038            .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
1039        pool.add_transaction(Arc::new(f.validated(tx_blob.clone())), 0);
1040
1041        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
1042
1043        // Verify both transactions are returned
1044        let tx = best.next().expect("Transaction should be returned");
1045        assert_eq!(tx.transaction, tx_non_blob);
1046
1047        let tx = best.next().expect("Transaction should be returned");
1048        assert_eq!(tx.transaction, tx_blob);
1049
1050        // Ensure no more transactions are left
1051        assert!(best.next().is_none());
1052    }
1053
1054    #[test]
1055    fn test_best_transactions_with_skipping_blobs() {
1056        // Tests the skip_blobs functionality to ensure blob transactions are skipped.
1057        let mut pool = PendingPool::new(MockOrdering::default());
1058        let mut f = MockTransactionFactory::default();
1059
1060        // Add a blob transaction
1061        let tx_blob = MockTransaction::eip4844().rng_hash().with_nonce(0).with_blob_fee(100);
1062        let valid_blob_tx = f.validated(tx_blob);
1063        pool.add_transaction(Arc::new(valid_blob_tx), 0);
1064
1065        // Add a non-blob transaction
1066        let tx_non_blob = MockTransaction::eip1559().rng_hash().with_nonce(1).with_max_fee(200);
1067        let valid_non_blob_tx = f.validated(tx_non_blob.clone());
1068        pool.add_transaction(Arc::new(valid_non_blob_tx), 0);
1069
1070        let mut best = pool.best();
1071        best.skip_blobs();
1072
1073        // Only the non-blob transaction should be returned
1074        let tx = best.next().expect("Transaction should be returned");
1075        assert_eq!(tx.transaction, tx_non_blob);
1076
1077        // Ensure no more transactions are left
1078        assert!(best.next().is_none());
1079    }
1080
1081    #[test]
1082    fn test_best_transactions_no_updates() {
1083        // Tests the no_updates functionality to ensure it properly clears the
1084        // new_transaction_receiver.
1085        let mut pool = PendingPool::new(MockOrdering::default());
1086        let mut f = MockTransactionFactory::default();
1087
1088        // Add a transaction
1089        let tx = MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(100);
1090        let valid_tx = f.validated(tx);
1091        pool.add_transaction(Arc::new(valid_tx), 0);
1092
1093        let mut best = pool.best();
1094
1095        // Use a broadcast channel for transaction updates
1096        let (_tx_sender, tx_receiver) =
1097            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
1098        best.new_transaction_receiver = Some(tx_receiver);
1099
1100        // Ensure receiver is set
1101        assert!(best.new_transaction_receiver.is_some());
1102
1103        // Call no_updates to clear the receiver
1104        best.no_updates();
1105
1106        // Ensure receiver is cleared
1107        assert!(best.new_transaction_receiver.is_none());
1108    }
1109
1110    #[test]
1111    fn test_best_update_transaction_priority() {
1112        let mut pool = PendingPool::new(MockOrdering::default());
1113        let mut f = MockTransactionFactory::default();
1114
1115        // Add 5 transactions with increasing nonces to the pool
1116        let num_tx = 5;
1117        let tx = MockTransaction::eip1559();
1118        for nonce in 0..num_tx {
1119            let tx = tx.clone().rng_hash().with_nonce(nonce);
1120            let valid_tx = f.validated(tx);
1121            pool.add_transaction(Arc::new(valid_tx), 0);
1122        }
1123
1124        // Create a BestTransactions iterator from the pool
1125        let mut best = pool.best();
1126
1127        // Use a broadcast channel for transaction updates
1128        let (tx_sender, tx_receiver) =
1129            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
1130        best.new_transaction_receiver = Some(tx_receiver);
1131
1132        // yield one tx, effectively locking in the highest prio
1133        let first = best.next().unwrap();
1134
1135        // Create a new transaction with nonce 5 and validate it
1136        let new_higher_fee_tx = MockTransaction::eip1559().with_nonce(0);
1137        let valid_new_higher_fee_tx = f.validated(new_higher_fee_tx);
1138
1139        // Send the new transaction through the broadcast channel
1140        let pending_tx = PendingTransaction {
1141            submission_id: 10,
1142            transaction: Arc::new(valid_new_higher_fee_tx.clone()),
1143            priority: Priority::Value(u128::MAX),
1144        };
1145        tx_sender.send(pending_tx).unwrap();
1146
1147        // ensure that the higher prio tx is skipped since we yielded a lower one
1148        for tx in best {
1149            assert_eq!(tx.sender_id(), first.sender_id());
1150            assert_ne!(tx.sender_id(), valid_new_higher_fee_tx.sender_id());
1151        }
1152    }
1153
1154    /// Reproduces the "Blob Transaction Ordering, Multiple Clients" Hive scenario.
1155    ///
1156    /// Sender A contributes 5-blob transactions while sender B contributes 1-blob transactions.
1157    /// A single payload build should be able to fill the block with 6 blobs total (5+1).
1158    #[test]
1159    fn test_blob_transaction_ordering_multiple_clients_shape() {
1160        let mut pool = PendingPool::new(MockOrdering::default());
1161        let mut f = MockTransactionFactory::default();
1162
1163        let base_fee: u64 = 10;
1164        let base_fee_per_blob_gas: u64 = 1;
1165        let max_blob_count: u64 = 6;
1166
1167        let sender_a = MockTransaction::eip4844()
1168            .with_blob_hashes(5)
1169            .with_max_fee(base_fee as u128 + 20)
1170            .with_priority_fee(base_fee as u128 + 20)
1171            .with_blob_fee(120);
1172        for nonce in 0..5u64 {
1173            let tx = sender_a.clone().rng_hash().with_nonce(nonce);
1174            pool.add_transaction(Arc::new(f.validated(tx)), 0);
1175        }
1176
1177        let sender_b = MockTransaction::eip4844()
1178            .with_blob_hashes(1)
1179            .with_max_fee(base_fee as u128 + 20)
1180            .with_priority_fee(base_fee as u128 + 20)
1181            .with_blob_fee(100);
1182        for nonce in 0..5u64 {
1183            let tx = sender_b.clone().rng_hash().with_nonce(nonce);
1184            pool.add_transaction(Arc::new(f.validated(tx)), 0);
1185        }
1186
1187        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
1188        let mut block_blob_count = 0u64;
1189        let mut included_txs = 0u64;
1190
1191        while let Some(tx) = best.next() {
1192            if let Some(blob_hashes) = tx.transaction.blob_versioned_hashes() {
1193                let tx_blob_count = blob_hashes.len() as u64;
1194
1195                if block_blob_count + tx_blob_count > max_blob_count {
1196                    crate::traits::BestTransactions::mark_invalid(
1197                        &mut best,
1198                        &tx,
1199                        InvalidPoolTransactionError::Eip4844(
1200                            Eip4844PoolTransactionError::TooManyEip4844Blobs {
1201                                have: block_blob_count + tx_blob_count,
1202                                permitted: max_blob_count,
1203                            },
1204                        ),
1205                    );
1206                    continue;
1207                }
1208
1209                block_blob_count += tx_blob_count;
1210                included_txs += 1;
1211
1212                if block_blob_count == max_blob_count {
1213                    best.skip_blobs();
1214                    break;
1215                }
1216            }
1217        }
1218
1219        assert_eq!(
1220            block_blob_count, max_blob_count,
1221            "expected a full blob block (5+1 blobs across senders)"
1222        );
1223        assert_eq!(included_txs, 2, "expected one 5-blob tx and one 1-blob tx in the block");
1224    }
1225}