Skip to main content

reth_transaction_pool/pool/
best.rs

1use crate::{
2    error::{Eip4844PoolTransactionError, InvalidPoolTransactionError},
3    identifier::{SenderId, TransactionId},
4    pool::pending::PendingTransaction,
5    PoolTransaction, Priority, TransactionOrdering, ValidPoolTransaction,
6};
7use alloy_consensus::Transaction;
8use alloy_primitives::map::AddressSet;
9use core::fmt;
10use reth_primitives_traits::transaction::error::InvalidTransactionError;
11use std::{
12    collections::{BTreeMap, BTreeSet, HashSet, VecDeque},
13    sync::Arc,
14};
15use tokio::sync::broadcast::{error::TryRecvError, Receiver};
16use tracing::debug;
17
18const MAX_NEW_TRANSACTIONS_PER_BATCH: usize = 16;
19
20/// An iterator that returns transactions that can be executed on the current state (*best*
21/// transactions).
22///
23/// This is a wrapper around [`BestTransactions`] that also enforces a specific basefee.
24///
25/// This iterator guarantees that all transactions it returns satisfy both the base fee and blob
26/// fee!
27pub(crate) struct BestTransactionsWithFees<T: TransactionOrdering> {
28    pub(crate) best: BestTransactions<T>,
29    pub(crate) base_fee: u64,
30    pub(crate) base_fee_per_blob_gas: u64,
31}
32
33impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactionsWithFees<T> {
34    fn mark_invalid(&mut self, tx: &Self::Item, kind: &InvalidPoolTransactionError) {
35        BestTransactions::mark_invalid(&mut self.best, tx, kind)
36    }
37
38    fn no_updates(&mut self) {
39        self.best.no_updates()
40    }
41
42    fn skip_blobs(&mut self) {
43        self.set_skip_blobs(true)
44    }
45
46    fn set_skip_blobs(&mut self, skip_blobs: bool) {
47        self.best.set_skip_blobs(skip_blobs)
48    }
49}
50
51impl<T: TransactionOrdering> Iterator for BestTransactionsWithFees<T> {
52    type Item = Arc<ValidPoolTransaction<T::Transaction>>;
53
54    fn next(&mut self) -> Option<Self::Item> {
55        // find the next transaction that satisfies the base fee
56        loop {
57            let best = Iterator::next(&mut self.best)?;
58            // If both the base fee and blob fee (if applicable for EIP-4844) are satisfied, return
59            // the transaction
60            if best.transaction.max_fee_per_gas() >= self.base_fee as u128 &&
61                best.transaction
62                    .max_fee_per_blob_gas()
63                    .is_none_or(|fee| fee >= self.base_fee_per_blob_gas as u128)
64            {
65                return Some(best);
66            }
67            crate::traits::BestTransactions::mark_invalid(
68                self,
69                &best,
70                &InvalidPoolTransactionError::Underpriced,
71            );
72        }
73    }
74}
75
76/// An iterator that returns transactions that can be executed on the current state (*best*
77/// transactions).
78///
79/// The [`PendingPool`](crate::pool::pending::PendingPool) contains transactions that *could* all
80/// be executed on the current state, but only yields transactions that are ready to be executed
81/// now. While it contains all gapless transactions of a sender, it _always_ only returns the
82/// transaction with the current on chain nonce.
83#[derive(Debug)]
84pub struct BestTransactions<T: TransactionOrdering> {
85    /// Contains a copy of _all_ transactions of the pending pool at the point in time this
86    /// iterator was created.
87    pub(crate) all: BTreeMap<TransactionId, PendingTransaction<T>>,
88    /// Transactions that can be executed right away: these have the expected nonce.
89    ///
90    /// Once an `independent` transaction with the nonce `N` is returned, it unlocks `N+1`, which
91    /// then can be moved from the `all` set to the `independent` set.
92    pub(crate) independent: BTreeSet<PendingTransaction<T>>,
93    /// There might be the case where a yielded transactions is invalid, this will track it.
94    pub(crate) invalid: HashSet<SenderId>,
95    /// Used to receive any new pending transactions that have been added to the pool after this
96    /// iterator was static filtered
97    ///
98    /// These new pending transactions are inserted into this iterator's pool before yielding the
99    /// next value
100    pub(crate) new_transaction_receiver: Option<Receiver<PendingTransaction<T>>>,
101    /// The priority value of most recently yielded transaction.
102    ///
103    /// This is required if new pending transactions are fed in while it yields new values.
104    pub(crate) last_priority: Option<Priority<T::PriorityValue>>,
105    /// Flag to control whether to skip blob transactions (EIP4844).
106    pub(crate) skip_blobs: bool,
107}
108
109impl<T: TransactionOrdering> BestTransactions<T> {
110    /// Mark the transaction and its descendants as invalid.
111    pub(crate) fn mark_invalid(
112        &mut self,
113        tx: &Arc<ValidPoolTransaction<T::Transaction>>,
114        _kind: &InvalidPoolTransactionError,
115    ) {
116        self.invalid.insert(tx.sender_id());
117    }
118
119    /// Returns the ancestor the given transaction, the transaction with `nonce - 1`.
120    ///
121    /// Note: for a transaction with nonce higher than the current on chain nonce this will always
122    /// return an ancestor since all transactions in this pool are gapless.
123    pub(crate) fn ancestor(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
124        self.all.get(&id.unchecked_ancestor()?)
125    }
126
127    /// Non-blocking read on the new pending transactions subscription channel
128    fn try_recv(&mut self) -> Option<IncomingTransaction<T>> {
129        loop {
130            match self.new_transaction_receiver.as_mut()?.try_recv() {
131                Ok(tx) => {
132                    if let Some(last_priority) = &self.last_priority &&
133                        &tx.priority > last_priority
134                    {
135                        // we skip transactions if we already yielded a transaction with lower
136                        // priority
137                        return Some(IncomingTransaction::Stash(tx))
138                    }
139                    return Some(IncomingTransaction::Process(tx))
140                }
141                // note TryRecvError::Lagged can be returned here, which is an error that attempts
142                // to correct itself on consecutive try_recv() attempts
143
144                // the cost of ignoring this error is allowing old transactions to get
145                // overwritten after the chan buffer size is met
146                Err(TryRecvError::Lagged(_)) => {
147                    // Handle the case where the receiver lagged too far behind.
148                    // `num_skipped` indicates the number of messages that were skipped.
149                }
150
151                // this case is still better than the existing iterator behavior where no new
152                // pending txs are surfaced to consumers
153                Err(_) => return None,
154            }
155        }
156    }
157
158    /// Removes the currently best independent transaction from the independent set and the total
159    /// set.
160    fn pop_best(&mut self) -> Option<PendingTransaction<T>> {
161        self.independent.pop_last().inspect(|best| {
162            self.all.remove(best.transaction.id());
163        })
164    }
165
166    /// Checks for new transactions that have come into the `PendingPool` after this iterator was
167    /// created and inserts them
168    fn add_new_transactions(&mut self) {
169        for _ in 0..MAX_NEW_TRANSACTIONS_PER_BATCH {
170            if let Some(pending_tx) = self.try_recv() {
171                //  same logic as PendingPool::add_transaction/PendingPool::best_with_unlocked
172
173                match pending_tx {
174                    IncomingTransaction::Process(tx) => {
175                        let tx_id = *tx.transaction.id();
176                        if self.ancestor(&tx_id).is_none() {
177                            self.independent.insert(tx.clone());
178                        }
179                        self.all.insert(tx_id, tx);
180                    }
181                    IncomingTransaction::Stash(tx) => {
182                        let tx_id = *tx.transaction.id();
183                        self.all.insert(tx_id, tx);
184                    }
185                }
186            } else {
187                break;
188            }
189        }
190    }
191
192    /// Returns the next best transaction and its priority value.
193    #[allow(clippy::type_complexity)]
194    pub fn next_tx_and_priority(
195        &mut self,
196    ) -> Option<(Arc<ValidPoolTransaction<T::Transaction>>, Priority<T::PriorityValue>)> {
197        loop {
198            self.add_new_transactions();
199            // Remove the next independent tx with the highest priority
200            let best = self.pop_best()?;
201            let sender_id = best.transaction.sender_id();
202
203            // skip transactions for which sender was marked as invalid
204            if self.invalid.contains(&sender_id) {
205                debug!(
206                    target: "txpool",
207                    "[{:?}] skipping invalid transaction",
208                    best.transaction.hash()
209                );
210                continue
211            }
212
213            // Insert transactions that just got unlocked.
214            if let Some(unlocked) = self.all.get(&best.unlocks()) {
215                self.independent.insert(unlocked.clone());
216            }
217
218            if self.skip_blobs && best.transaction.is_eip4844() {
219                // blobs should be skipped, marking them as invalid will ensure that no dependent
220                // transactions are returned
221                self.mark_invalid(
222                    &best.transaction,
223                    &InvalidPoolTransactionError::Eip4844(
224                        Eip4844PoolTransactionError::NoEip4844Blobs,
225                    ),
226                )
227            } else {
228                if self.new_transaction_receiver.is_some() {
229                    self.last_priority = Some(best.priority.clone())
230                }
231                return Some((best.transaction, best.priority))
232            }
233        }
234    }
235}
236
237/// Result of attempting to receive a new transaction from the channel during iteration.
238///
239/// This enum determines how a newly received transaction should be handled based on its priority
240/// relative to transactions already yielded by the iterator.
241enum IncomingTransaction<T: TransactionOrdering> {
242    /// Process the transaction normally: add to both `all` map and potentially to `independent`
243    /// set (if it has no ancestor).
244    ///
245    /// This variant is used when the transaction's priority is lower than or equal to the last
246    /// yielded transaction, meaning it can be safely processed without breaking the descending
247    /// priority order.
248    Process(PendingTransaction<T>),
249
250    /// Stash the transaction: add only to the `all` map, but NOT to the `independent` set.
251    ///
252    /// This variant is used when the transaction has a higher priority than the last yielded
253    /// transaction. We cannot yield it immediately (to maintain strict priority ordering), but we
254    /// must still track it so that:
255    /// - Its descendants can find it via `ancestor()` lookups
256    /// - We prevent those descendants from being incorrectly promoted to `independent`
257    ///
258    /// Without stashing, if a child of this transaction arrives later, it would fail to find its
259    /// parent in `all`, be marked as `independent`, and be yielded out of order (before its
260    /// parent), causing nonce gaps.
261    Stash(PendingTransaction<T>),
262}
263
264impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactions<T> {
265    fn mark_invalid(&mut self, tx: &Self::Item, kind: &InvalidPoolTransactionError) {
266        Self::mark_invalid(self, tx, kind)
267    }
268
269    fn no_updates(&mut self) {
270        self.new_transaction_receiver.take();
271        self.last_priority.take();
272    }
273
274    fn skip_blobs(&mut self) {
275        self.set_skip_blobs(true);
276    }
277
278    fn set_skip_blobs(&mut self, skip_blobs: bool) {
279        self.skip_blobs = skip_blobs;
280    }
281}
282
283impl<T: TransactionOrdering> Iterator for BestTransactions<T> {
284    type Item = Arc<ValidPoolTransaction<T::Transaction>>;
285
286    fn next(&mut self) -> Option<Self::Item> {
287        self.next_tx_and_priority().map(|(tx, _)| tx)
288    }
289}
290
291/// A [`BestTransactions`](crate::traits::BestTransactions) implementation that filters the
292/// transactions of iter with predicate.
293///
294/// Filter out transactions are marked as invalid:
295/// [`BestTransactions::mark_invalid`](crate::traits::BestTransactions::mark_invalid).
296pub struct BestTransactionFilter<I, P> {
297    pub(crate) best: I,
298    pub(crate) predicate: P,
299}
300
301impl<I, P> BestTransactionFilter<I, P> {
302    /// Create a new [`BestTransactionFilter`] with the given predicate.
303    pub const fn new(best: I, predicate: P) -> Self {
304        Self { best, predicate }
305    }
306}
307
308impl<I, P> Iterator for BestTransactionFilter<I, P>
309where
310    I: crate::traits::BestTransactions,
311    P: FnMut(&<I as Iterator>::Item) -> bool,
312{
313    type Item = <I as Iterator>::Item;
314
315    fn next(&mut self) -> Option<Self::Item> {
316        loop {
317            let best = self.best.next()?;
318            if (self.predicate)(&best) {
319                return Some(best)
320            }
321            self.best.mark_invalid(
322                &best,
323                &InvalidPoolTransactionError::Consensus(
324                    InvalidTransactionError::TxTypeNotSupported,
325                ),
326            );
327        }
328    }
329}
330
331impl<I, P> crate::traits::BestTransactions for BestTransactionFilter<I, P>
332where
333    I: crate::traits::BestTransactions,
334    P: FnMut(&<I as Iterator>::Item) -> bool + Send,
335{
336    fn mark_invalid(&mut self, tx: &Self::Item, kind: &InvalidPoolTransactionError) {
337        crate::traits::BestTransactions::mark_invalid(&mut self.best, tx, kind)
338    }
339
340    fn no_updates(&mut self) {
341        self.best.no_updates()
342    }
343
344    fn skip_blobs(&mut self) {
345        self.set_skip_blobs(true)
346    }
347
348    fn set_skip_blobs(&mut self, skip_blobs: bool) {
349        self.best.set_skip_blobs(skip_blobs)
350    }
351}
352
353impl<I: fmt::Debug, P> fmt::Debug for BestTransactionFilter<I, P> {
354    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
355        f.debug_struct("BestTransactionFilter").field("best", &self.best).finish()
356    }
357}
358
359/// Wrapper over [`crate::traits::BestTransactions`] that prioritizes transactions of certain
360/// senders capping total gas used by such transactions.
361#[derive(Debug)]
362pub struct BestTransactionsWithPrioritizedSenders<I: Iterator> {
363    /// Inner iterator
364    inner: I,
365    /// A set of senders which transactions should be prioritized
366    prioritized_senders: AddressSet,
367    /// Maximum total gas limit of prioritized transactions
368    max_prioritized_gas: u64,
369    /// Buffer with transactions that are not being prioritized. Those will be the first to be
370    /// included after the prioritized transactions
371    buffer: VecDeque<I::Item>,
372    /// Tracker of total gas limit of prioritized transactions. Once it reaches
373    /// `max_prioritized_gas` no more transactions will be prioritized
374    prioritized_gas: u64,
375}
376
377impl<I: Iterator> BestTransactionsWithPrioritizedSenders<I> {
378    /// Constructs a new [`BestTransactionsWithPrioritizedSenders`].
379    pub fn new(prioritized_senders: AddressSet, max_prioritized_gas: u64, inner: I) -> Self {
380        Self {
381            inner,
382            prioritized_senders,
383            max_prioritized_gas,
384            buffer: Default::default(),
385            prioritized_gas: Default::default(),
386        }
387    }
388}
389
390impl<I, T> Iterator for BestTransactionsWithPrioritizedSenders<I>
391where
392    I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
393    T: PoolTransaction,
394{
395    type Item = <I as Iterator>::Item;
396
397    fn next(&mut self) -> Option<Self::Item> {
398        // If we have space, try prioritizing transactions
399        if self.prioritized_gas < self.max_prioritized_gas {
400            for item in &mut self.inner {
401                if self.prioritized_senders.contains(&item.transaction.sender()) &&
402                    self.prioritized_gas + item.transaction.gas_limit() <=
403                        self.max_prioritized_gas
404                {
405                    self.prioritized_gas += item.transaction.gas_limit();
406                    return Some(item)
407                }
408                self.buffer.push_back(item);
409            }
410        }
411
412        if let Some(item) = self.buffer.pop_front() {
413            Some(item)
414        } else {
415            self.inner.next()
416        }
417    }
418}
419
420impl<I, T> crate::traits::BestTransactions for BestTransactionsWithPrioritizedSenders<I>
421where
422    I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
423    T: PoolTransaction,
424{
425    fn mark_invalid(&mut self, tx: &Self::Item, kind: &InvalidPoolTransactionError) {
426        self.inner.mark_invalid(tx, kind)
427    }
428
429    fn no_updates(&mut self) {
430        self.inner.no_updates()
431    }
432
433    fn set_skip_blobs(&mut self, skip_blobs: bool) {
434        if skip_blobs {
435            self.buffer.retain(|tx| !tx.transaction.is_eip4844())
436        }
437        self.inner.set_skip_blobs(skip_blobs)
438    }
439}
440
441#[cfg(test)]
442mod tests {
443    use super::*;
444    use crate::{
445        pool::pending::PendingPool,
446        test_utils::{MockOrdering, MockTransaction, MockTransactionFactory},
447        BestTransactions, Priority,
448    };
449
450    #[test]
451    fn test_best_iter() {
452        let mut pool = PendingPool::new(MockOrdering::default());
453        let mut f = MockTransactionFactory::default();
454
455        let num_tx = 10;
456        // insert 10 gapless tx
457        let tx = MockTransaction::eip1559();
458        for nonce in 0..num_tx {
459            let tx = tx.clone().rng_hash().with_nonce(nonce);
460            let valid_tx = f.validated(tx);
461            pool.add_transaction(Arc::new(valid_tx), 0);
462        }
463
464        let mut best = pool.best();
465        assert_eq!(best.all.len(), num_tx as usize);
466        assert_eq!(best.independent.len(), 1);
467
468        // check tx are returned in order
469        for nonce in 0..num_tx {
470            assert_eq!(best.independent.len(), 1);
471            let tx = best.next().unwrap();
472            assert_eq!(tx.nonce(), nonce);
473        }
474    }
475
476    #[test]
477    fn test_best_iter_invalid() {
478        let mut pool = PendingPool::new(MockOrdering::default());
479        let mut f = MockTransactionFactory::default();
480
481        let num_tx = 10;
482        // insert 10 gapless tx
483        let tx = MockTransaction::eip1559();
484        for nonce in 0..num_tx {
485            let tx = tx.clone().rng_hash().with_nonce(nonce);
486            let valid_tx = f.validated(tx);
487            pool.add_transaction(Arc::new(valid_tx), 0);
488        }
489
490        let mut best = pool.best();
491
492        // mark the first tx as invalid
493        let invalid = best.independent.iter().next().unwrap();
494        best.mark_invalid(
495            &invalid.transaction.clone(),
496            &InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
497        );
498
499        // iterator is empty
500        assert!(best.next().is_none());
501    }
502
503    #[test]
504    fn test_best_transactions_iter_invalid() {
505        let mut pool = PendingPool::new(MockOrdering::default());
506        let mut f = MockTransactionFactory::default();
507
508        let num_tx = 10;
509        // insert 10 gapless tx
510        let tx = MockTransaction::eip1559();
511        for nonce in 0..num_tx {
512            let tx = tx.clone().rng_hash().with_nonce(nonce);
513            let valid_tx = f.validated(tx);
514            pool.add_transaction(Arc::new(valid_tx), 0);
515        }
516
517        let mut best: Box<
518            dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<MockTransaction>>>,
519        > = Box::new(pool.best());
520
521        let tx = Iterator::next(&mut best).unwrap();
522        crate::traits::BestTransactions::mark_invalid(
523            &mut *best,
524            &tx,
525            &InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
526        );
527        assert!(Iterator::next(&mut best).is_none());
528    }
529
530    #[test]
531    fn test_best_with_fees_iter_base_fee_satisfied() {
532        let mut pool = PendingPool::new(MockOrdering::default());
533        let mut f = MockTransactionFactory::default();
534
535        let num_tx = 5;
536        let base_fee: u64 = 10;
537        let base_fee_per_blob_gas: u64 = 15;
538
539        // Insert transactions with a max_fee_per_gas greater than or equal to the base fee
540        // Without blob fee
541        for nonce in 0..num_tx {
542            let tx = MockTransaction::eip1559()
543                .rng_hash()
544                .with_nonce(nonce)
545                .with_max_fee(base_fee as u128 + 5);
546            let valid_tx = f.validated(tx);
547            pool.add_transaction(Arc::new(valid_tx), 0);
548        }
549
550        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
551
552        for nonce in 0..num_tx {
553            let tx = best.next().expect("Transaction should be returned");
554            assert_eq!(tx.nonce(), nonce);
555            assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
556        }
557    }
558
559    #[test]
560    fn test_best_with_fees_iter_base_fee_violated() {
561        let mut pool = PendingPool::new(MockOrdering::default());
562        let mut f = MockTransactionFactory::default();
563
564        let num_tx = 5;
565        let base_fee: u64 = 20;
566        let base_fee_per_blob_gas: u64 = 15;
567
568        // Insert transactions with a max_fee_per_gas less than the base fee
569        for nonce in 0..num_tx {
570            let tx = MockTransaction::eip1559()
571                .rng_hash()
572                .with_nonce(nonce)
573                .with_max_fee(base_fee as u128 - 5);
574            let valid_tx = f.validated(tx);
575            pool.add_transaction(Arc::new(valid_tx), 0);
576        }
577
578        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
579
580        // No transaction should be returned since all violate the base fee
581        assert!(best.next().is_none());
582    }
583
584    #[test]
585    fn test_best_with_fees_iter_blob_fee_satisfied() {
586        let mut pool = PendingPool::new(MockOrdering::default());
587        let mut f = MockTransactionFactory::default();
588
589        let num_tx = 5;
590        let base_fee: u64 = 10;
591        let base_fee_per_blob_gas: u64 = 20;
592
593        // Insert transactions with a max_fee_per_blob_gas greater than or equal to the base fee per
594        // blob gas
595        for nonce in 0..num_tx {
596            let tx = MockTransaction::eip4844()
597                .rng_hash()
598                .with_nonce(nonce)
599                .with_max_fee(base_fee as u128 + 5)
600                .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
601            let valid_tx = f.validated(tx);
602            pool.add_transaction(Arc::new(valid_tx), 0);
603        }
604
605        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
606
607        // All transactions should be returned in order since they satisfy both base fee and blob
608        // fee
609        for nonce in 0..num_tx {
610            let tx = best.next().expect("Transaction should be returned");
611            assert_eq!(tx.nonce(), nonce);
612            assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
613            assert!(
614                tx.transaction.max_fee_per_blob_gas().unwrap() >= base_fee_per_blob_gas as u128
615            );
616        }
617
618        // No more transactions should be returned
619        assert!(best.next().is_none());
620    }
621
622    #[test]
623    fn test_best_with_fees_iter_blob_fee_violated() {
624        let mut pool = PendingPool::new(MockOrdering::default());
625        let mut f = MockTransactionFactory::default();
626
627        let num_tx = 5;
628        let base_fee: u64 = 10;
629        let base_fee_per_blob_gas: u64 = 20;
630
631        // Insert transactions with a max_fee_per_blob_gas less than the base fee per blob gas
632        for nonce in 0..num_tx {
633            let tx = MockTransaction::eip4844()
634                .rng_hash()
635                .with_nonce(nonce)
636                .with_max_fee(base_fee as u128 + 5)
637                .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
638            let valid_tx = f.validated(tx);
639            pool.add_transaction(Arc::new(valid_tx), 0);
640        }
641
642        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
643
644        // No transaction should be returned since all violate the blob fee
645        assert!(best.next().is_none());
646    }
647
648    #[test]
649    fn test_best_with_fees_iter_mixed_fees() {
650        let mut pool = PendingPool::new(MockOrdering::default());
651        let mut f = MockTransactionFactory::default();
652
653        let base_fee: u64 = 10;
654        let base_fee_per_blob_gas: u64 = 20;
655
656        // Insert transactions with varying max_fee_per_gas and max_fee_per_blob_gas
657        let tx1 =
658            MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
659        let tx2 = MockTransaction::eip4844()
660            .rng_hash()
661            .with_nonce(1)
662            .with_max_fee(base_fee as u128 + 5)
663            .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
664        let tx3 = MockTransaction::eip4844()
665            .rng_hash()
666            .with_nonce(2)
667            .with_max_fee(base_fee as u128 + 5)
668            .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
669        let tx4 =
670            MockTransaction::eip1559().rng_hash().with_nonce(3).with_max_fee(base_fee as u128 - 5);
671
672        pool.add_transaction(Arc::new(f.validated(tx1.clone())), 0);
673        pool.add_transaction(Arc::new(f.validated(tx2.clone())), 0);
674        pool.add_transaction(Arc::new(f.validated(tx3)), 0);
675        pool.add_transaction(Arc::new(f.validated(tx4)), 0);
676
677        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
678
679        let expected_order = vec![tx1, tx2];
680        for expected_tx in expected_order {
681            let tx = best.next().expect("Transaction should be returned");
682            assert_eq!(tx.transaction, expected_tx);
683        }
684
685        // No more transactions should be returned
686        assert!(best.next().is_none());
687    }
688
689    #[test]
690    fn test_best_add_transaction_with_next_nonce() {
691        let mut pool = PendingPool::new(MockOrdering::default());
692        let mut f = MockTransactionFactory::default();
693
694        // Add 5 transactions with increasing nonces to the pool
695        let num_tx = 5;
696        let tx = MockTransaction::eip1559();
697        for nonce in 0..num_tx {
698            let tx = tx.clone().rng_hash().with_nonce(nonce);
699            let valid_tx = f.validated(tx);
700            pool.add_transaction(Arc::new(valid_tx), 0);
701        }
702
703        // Create a BestTransactions iterator from the pool
704        let mut best = pool.best();
705
706        // Use a broadcast channel for transaction updates
707        let (tx_sender, tx_receiver) =
708            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
709        best.new_transaction_receiver = Some(tx_receiver);
710
711        // Create a new transaction with nonce 5 and validate it
712        let new_tx = MockTransaction::eip1559().rng_hash().with_nonce(5);
713        let valid_new_tx = f.validated(new_tx);
714
715        // Send the new transaction through the broadcast channel
716        let pending_tx = PendingTransaction {
717            submission_id: 10,
718            transaction: Arc::new(valid_new_tx.clone()),
719            priority: Priority::Value(1000),
720        };
721        tx_sender.send(pending_tx.clone()).unwrap();
722
723        // Add new transactions to the iterator
724        best.add_new_transactions();
725
726        // Verify that the new transaction has been added to the 'all' map
727        assert_eq!(best.all.len(), 6);
728        assert!(best.all.contains_key(valid_new_tx.id()));
729
730        // Verify that the new transaction has been added to the 'independent' set
731        assert_eq!(best.independent.len(), 2);
732        assert!(best.independent.contains(&pending_tx));
733    }
734
735    #[test]
736    fn test_best_add_transaction_with_ancestor() {
737        // Initialize a new PendingPool with default MockOrdering and MockTransactionFactory
738        let mut pool = PendingPool::new(MockOrdering::default());
739        let mut f = MockTransactionFactory::default();
740
741        // Add 5 transactions with increasing nonces to the pool
742        let num_tx = 5;
743        let tx = MockTransaction::eip1559();
744        for nonce in 0..num_tx {
745            let tx = tx.clone().rng_hash().with_nonce(nonce);
746            let valid_tx = f.validated(tx);
747            pool.add_transaction(Arc::new(valid_tx), 0);
748        }
749
750        // Create a BestTransactions iterator from the pool
751        let mut best = pool.best();
752
753        // Use a broadcast channel for transaction updates
754        let (tx_sender, tx_receiver) =
755            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
756        best.new_transaction_receiver = Some(tx_receiver);
757
758        // Create a new transaction with nonce 5 and validate it
759        let base_tx1 = MockTransaction::eip1559().rng_hash().with_nonce(5);
760        let valid_new_tx1 = f.validated(base_tx1.clone());
761
762        // Send the new transaction through the broadcast channel
763        let pending_tx1 = PendingTransaction {
764            submission_id: 10,
765            transaction: Arc::new(valid_new_tx1.clone()),
766            priority: Priority::Value(1000),
767        };
768        tx_sender.send(pending_tx1.clone()).unwrap();
769
770        // Add new transactions to the iterator
771        best.add_new_transactions();
772
773        // Verify that the new transaction has been added to the 'all' map
774        assert_eq!(best.all.len(), 6);
775        assert!(best.all.contains_key(valid_new_tx1.id()));
776
777        // Verify that the new transaction has been added to the 'independent' set
778        assert_eq!(best.independent.len(), 2);
779        assert!(best.independent.contains(&pending_tx1));
780
781        // Attempt to add a new transaction with a different nonce (not a duplicate)
782        let base_tx2 = base_tx1.with_nonce(6);
783        let valid_new_tx2 = f.validated(base_tx2);
784
785        // Send the new transaction through the broadcast channel
786        let pending_tx2 = PendingTransaction {
787            submission_id: 11, // Different submission ID
788            transaction: Arc::new(valid_new_tx2.clone()),
789            priority: Priority::Value(1000),
790        };
791        tx_sender.send(pending_tx2.clone()).unwrap();
792
793        // Add new transactions to the iterator
794        best.add_new_transactions();
795
796        // Verify that the new transaction has been added to 'all'
797        assert_eq!(best.all.len(), 7);
798        assert!(best.all.contains_key(valid_new_tx2.id()));
799
800        // Verify that the new transaction has not been added to the 'independent' set
801        assert_eq!(best.independent.len(), 2);
802        assert!(!best.independent.contains(&pending_tx2));
803    }
804
805    #[test]
806    fn test_best_transactions_filter_trait_object() {
807        // Initialize a new PendingPool with default MockOrdering and MockTransactionFactory
808        let mut pool = PendingPool::new(MockOrdering::default());
809        let mut f = MockTransactionFactory::default();
810
811        // Add 5 transactions with increasing nonces to the pool
812        let num_tx = 5;
813        let tx = MockTransaction::eip1559();
814        for nonce in 0..num_tx {
815            let tx = tx.clone().rng_hash().with_nonce(nonce);
816            let valid_tx = f.validated(tx);
817            pool.add_transaction(Arc::new(valid_tx), 0);
818        }
819
820        // Create a trait object of BestTransactions iterator from the pool
821        let best: Box<dyn crate::traits::BestTransactions<Item = _>> = Box::new(pool.best());
822
823        // Create a filter that only returns transactions with even nonces
824        let filter =
825            BestTransactionFilter::new(best, |tx: &Arc<ValidPoolTransaction<MockTransaction>>| {
826                tx.nonce().is_multiple_of(2)
827            });
828
829        // Verify that the filter only returns transactions with even nonces
830        for tx in filter {
831            assert_eq!(tx.nonce() % 2, 0);
832        }
833    }
834
835    #[test]
836    fn test_best_transactions_prioritized_senders() {
837        let mut pool = PendingPool::new(MockOrdering::default());
838        let mut f = MockTransactionFactory::default();
839
840        // Add 5 plain transactions from different senders with increasing gas price
841        for gas_price in 0..5 {
842            let tx = MockTransaction::eip1559().with_gas_price((gas_price + 1) * 10);
843            let valid_tx = f.validated(tx);
844            pool.add_transaction(Arc::new(valid_tx), 0);
845        }
846
847        // Add another transaction with 5 gas price that's going to be prioritized by sender
848        let prioritized_tx = MockTransaction::eip1559().with_gas_price(5).with_gas_limit(200);
849        let valid_prioritized_tx = f.validated(prioritized_tx.clone());
850        pool.add_transaction(Arc::new(valid_prioritized_tx), 0);
851
852        // Add another transaction with 3 gas price that should not be prioritized by sender because
853        // of gas limit.
854        let prioritized_tx2 = MockTransaction::eip1559().with_gas_price(3);
855        let valid_prioritized_tx2 = f.validated(prioritized_tx2.clone());
856        pool.add_transaction(Arc::new(valid_prioritized_tx2), 0);
857
858        let prioritized_senders =
859            AddressSet::from_iter([prioritized_tx.sender(), prioritized_tx2.sender()]);
860        let best =
861            BestTransactionsWithPrioritizedSenders::new(prioritized_senders, 200, pool.best());
862
863        // Verify that the prioritized transaction is returned first
864        // and the rest are returned in the reverse order of gas price
865        let mut iter = best.into_iter();
866        let top_of_block_tx = iter.next().unwrap();
867        assert_eq!(top_of_block_tx.max_fee_per_gas(), 5);
868        assert_eq!(top_of_block_tx.sender(), prioritized_tx.sender());
869        for gas_price in (0..5).rev() {
870            assert_eq!(iter.next().unwrap().max_fee_per_gas(), (gas_price + 1) * 10);
871        }
872
873        // Due to the gas limit, the transaction from second-prioritized sender was not
874        // prioritized.
875        let top_of_block_tx2 = iter.next().unwrap();
876        assert_eq!(top_of_block_tx2.max_fee_per_gas(), 3);
877        assert_eq!(top_of_block_tx2.sender(), prioritized_tx2.sender());
878    }
879
880    #[test]
881    fn test_best_with_fees_iter_no_blob_fee_required() {
882        // Tests transactions without blob fees where base fees are checked.
883        let mut pool = PendingPool::new(MockOrdering::default());
884        let mut f = MockTransactionFactory::default();
885
886        let base_fee: u64 = 10;
887        let base_fee_per_blob_gas: u64 = 0; // No blob fee requirement
888
889        // Insert transactions with max_fee_per_gas above the base fee
890        for nonce in 0..5 {
891            let tx = MockTransaction::eip1559()
892                .rng_hash()
893                .with_nonce(nonce)
894                .with_max_fee(base_fee as u128 + 5);
895            let valid_tx = f.validated(tx);
896            pool.add_transaction(Arc::new(valid_tx), 0);
897        }
898
899        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
900
901        // All transactions should be returned as no blob fee requirement is imposed
902        for nonce in 0..5 {
903            let tx = best.next().expect("Transaction should be returned");
904            assert_eq!(tx.nonce(), nonce);
905        }
906
907        // Ensure no more transactions are left
908        assert!(best.next().is_none());
909    }
910
911    #[test]
912    fn test_best_with_fees_iter_mix_of_blob_and_non_blob_transactions() {
913        // Tests mixed scenarios with both blob and non-blob transactions.
914        let mut pool = PendingPool::new(MockOrdering::default());
915        let mut f = MockTransactionFactory::default();
916
917        let base_fee: u64 = 10;
918        let base_fee_per_blob_gas: u64 = 15;
919
920        // Add a non-blob transaction that satisfies the base fee
921        let tx_non_blob =
922            MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
923        pool.add_transaction(Arc::new(f.validated(tx_non_blob.clone())), 0);
924
925        // Add a blob transaction that satisfies both base fee and blob fee
926        let tx_blob = MockTransaction::eip4844()
927            .rng_hash()
928            .with_nonce(1)
929            .with_max_fee(base_fee as u128 + 5)
930            .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
931        pool.add_transaction(Arc::new(f.validated(tx_blob.clone())), 0);
932
933        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
934
935        // Verify both transactions are returned
936        let tx = best.next().expect("Transaction should be returned");
937        assert_eq!(tx.transaction, tx_non_blob);
938
939        let tx = best.next().expect("Transaction should be returned");
940        assert_eq!(tx.transaction, tx_blob);
941
942        // Ensure no more transactions are left
943        assert!(best.next().is_none());
944    }
945
946    #[test]
947    fn test_best_transactions_with_skipping_blobs() {
948        // Tests the skip_blobs functionality to ensure blob transactions are skipped.
949        let mut pool = PendingPool::new(MockOrdering::default());
950        let mut f = MockTransactionFactory::default();
951
952        // Add a blob transaction
953        let tx_blob = MockTransaction::eip4844().rng_hash().with_nonce(0).with_blob_fee(100);
954        let valid_blob_tx = f.validated(tx_blob);
955        pool.add_transaction(Arc::new(valid_blob_tx), 0);
956
957        // Add a non-blob transaction
958        let tx_non_blob = MockTransaction::eip1559().rng_hash().with_nonce(1).with_max_fee(200);
959        let valid_non_blob_tx = f.validated(tx_non_blob.clone());
960        pool.add_transaction(Arc::new(valid_non_blob_tx), 0);
961
962        let mut best = pool.best();
963        best.skip_blobs();
964
965        // Only the non-blob transaction should be returned
966        let tx = best.next().expect("Transaction should be returned");
967        assert_eq!(tx.transaction, tx_non_blob);
968
969        // Ensure no more transactions are left
970        assert!(best.next().is_none());
971    }
972
973    #[test]
974    fn test_best_transactions_no_updates() {
975        // Tests the no_updates functionality to ensure it properly clears the
976        // new_transaction_receiver.
977        let mut pool = PendingPool::new(MockOrdering::default());
978        let mut f = MockTransactionFactory::default();
979
980        // Add a transaction
981        let tx = MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(100);
982        let valid_tx = f.validated(tx);
983        pool.add_transaction(Arc::new(valid_tx), 0);
984
985        let mut best = pool.best();
986
987        // Use a broadcast channel for transaction updates
988        let (_tx_sender, tx_receiver) =
989            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
990        best.new_transaction_receiver = Some(tx_receiver);
991
992        // Ensure receiver is set
993        assert!(best.new_transaction_receiver.is_some());
994
995        // Call no_updates to clear the receiver
996        best.no_updates();
997
998        // Ensure receiver is cleared
999        assert!(best.new_transaction_receiver.is_none());
1000    }
1001
1002    #[test]
1003    fn test_best_update_transaction_priority() {
1004        let mut pool = PendingPool::new(MockOrdering::default());
1005        let mut f = MockTransactionFactory::default();
1006
1007        // Add 5 transactions with increasing nonces to the pool
1008        let num_tx = 5;
1009        let tx = MockTransaction::eip1559();
1010        for nonce in 0..num_tx {
1011            let tx = tx.clone().rng_hash().with_nonce(nonce);
1012            let valid_tx = f.validated(tx);
1013            pool.add_transaction(Arc::new(valid_tx), 0);
1014        }
1015
1016        // Create a BestTransactions iterator from the pool
1017        let mut best = pool.best();
1018
1019        // Use a broadcast channel for transaction updates
1020        let (tx_sender, tx_receiver) =
1021            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
1022        best.new_transaction_receiver = Some(tx_receiver);
1023
1024        // yield one tx, effectively locking in the highest prio
1025        let first = best.next().unwrap();
1026
1027        // Create a new transaction with nonce 5 and validate it
1028        let new_higher_fee_tx = MockTransaction::eip1559().with_nonce(0);
1029        let valid_new_higher_fee_tx = f.validated(new_higher_fee_tx);
1030
1031        // Send the new transaction through the broadcast channel
1032        let pending_tx = PendingTransaction {
1033            submission_id: 10,
1034            transaction: Arc::new(valid_new_higher_fee_tx.clone()),
1035            priority: Priority::Value(u128::MAX),
1036        };
1037        tx_sender.send(pending_tx).unwrap();
1038
1039        // ensure that the higher prio tx is skipped since we yielded a lower one
1040        for tx in best {
1041            assert_eq!(tx.sender_id(), first.sender_id());
1042            assert_ne!(tx.sender_id(), valid_new_higher_fee_tx.sender_id());
1043        }
1044    }
1045
1046    /// Reproduces the "Blob Transaction Ordering, Multiple Clients" Hive scenario.
1047    ///
1048    /// Sender A contributes 5-blob transactions while sender B contributes 1-blob transactions.
1049    /// A single payload build should be able to fill the block with 6 blobs total (5+1).
1050    #[test]
1051    fn test_blob_transaction_ordering_multiple_clients_shape() {
1052        let mut pool = PendingPool::new(MockOrdering::default());
1053        let mut f = MockTransactionFactory::default();
1054
1055        let base_fee: u64 = 10;
1056        let base_fee_per_blob_gas: u64 = 1;
1057        let max_blob_count: u64 = 6;
1058
1059        let sender_a = MockTransaction::eip4844()
1060            .with_blob_hashes(5)
1061            .with_max_fee(base_fee as u128 + 20)
1062            .with_priority_fee(base_fee as u128 + 20)
1063            .with_blob_fee(120);
1064        for nonce in 0..5u64 {
1065            let tx = sender_a.clone().rng_hash().with_nonce(nonce);
1066            pool.add_transaction(Arc::new(f.validated(tx)), 0);
1067        }
1068
1069        let sender_b = MockTransaction::eip4844()
1070            .with_blob_hashes(1)
1071            .with_max_fee(base_fee as u128 + 20)
1072            .with_priority_fee(base_fee as u128 + 20)
1073            .with_blob_fee(100);
1074        for nonce in 0..5u64 {
1075            let tx = sender_b.clone().rng_hash().with_nonce(nonce);
1076            pool.add_transaction(Arc::new(f.validated(tx)), 0);
1077        }
1078
1079        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
1080        let mut block_blob_count = 0u64;
1081        let mut included_txs = 0u64;
1082
1083        while let Some(tx) = best.next() {
1084            if let Some(blob_hashes) = tx.transaction.blob_versioned_hashes() {
1085                let tx_blob_count = blob_hashes.len() as u64;
1086
1087                if block_blob_count + tx_blob_count > max_blob_count {
1088                    crate::traits::BestTransactions::mark_invalid(
1089                        &mut best,
1090                        &tx,
1091                        &InvalidPoolTransactionError::Eip4844(
1092                            Eip4844PoolTransactionError::TooManyEip4844Blobs {
1093                                have: block_blob_count + tx_blob_count,
1094                                permitted: max_blob_count,
1095                            },
1096                        ),
1097                    );
1098                    continue;
1099                }
1100
1101                block_blob_count += tx_blob_count;
1102                included_txs += 1;
1103
1104                if block_blob_count == max_blob_count {
1105                    best.skip_blobs();
1106                    break;
1107                }
1108            }
1109        }
1110
1111        assert_eq!(
1112            block_blob_count, max_blob_count,
1113            "expected a full blob block (5+1 blobs across senders)"
1114        );
1115        assert_eq!(included_txs, 2, "expected one 5-blob tx and one 1-blob tx in the block");
1116    }
1117}