Skip to main content

reth_transaction_pool/pool/
best.rs

1use crate::{
2    error::{Eip4844PoolTransactionError, InvalidPoolTransactionError},
3    identifier::{SenderId, TransactionId},
4    pool::pending::PendingTransaction,
5    PoolTransaction, Priority, TransactionOrdering, ValidPoolTransaction,
6};
7use alloy_consensus::Transaction;
8use alloy_primitives::map::AddressSet;
9use core::fmt;
10use imbl::OrdMap;
11use reth_primitives_traits::transaction::error::InvalidTransactionError;
12use std::{
13    collections::{BTreeSet, HashSet, VecDeque},
14    sync::Arc,
15};
16use tokio::sync::broadcast::{error::TryRecvError, Receiver};
17use tracing::debug;
18
19const MAX_NEW_TRANSACTIONS_PER_BATCH: usize = 16;
20
21/// An iterator that returns transactions that can be executed on the current state (*best*
22/// transactions).
23///
24/// This is a wrapper around [`BestTransactions`] that also enforces a specific basefee.
25///
26/// This iterator guarantees that all transactions it returns satisfy both the base fee and blob
27/// fee!
28pub(crate) struct BestTransactionsWithFees<T: TransactionOrdering> {
29    pub(crate) best: BestTransactions<T>,
30    pub(crate) base_fee: u64,
31    pub(crate) base_fee_per_blob_gas: u64,
32}
33
34impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactionsWithFees<T> {
35    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
36        BestTransactions::mark_invalid(&mut self.best, tx, kind)
37    }
38
39    fn no_updates(&mut self) {
40        self.best.no_updates()
41    }
42
43    fn skip_blobs(&mut self) {
44        self.set_skip_blobs(true)
45    }
46
47    fn set_skip_blobs(&mut self, skip_blobs: bool) {
48        self.best.set_skip_blobs(skip_blobs)
49    }
50}
51
52impl<T: TransactionOrdering> Iterator for BestTransactionsWithFees<T> {
53    type Item = Arc<ValidPoolTransaction<T::Transaction>>;
54
55    fn next(&mut self) -> Option<Self::Item> {
56        // find the next transaction that satisfies the base fee
57        loop {
58            let best = Iterator::next(&mut self.best)?;
59            // If both the base fee and blob fee (if applicable for EIP-4844) are satisfied, return
60            // the transaction
61            if best.transaction.max_fee_per_gas() >= self.base_fee as u128 &&
62                best.transaction
63                    .max_fee_per_blob_gas()
64                    .is_none_or(|fee| fee >= self.base_fee_per_blob_gas as u128)
65            {
66                return Some(best);
67            }
68            crate::traits::BestTransactions::mark_invalid(
69                self,
70                &best,
71                InvalidPoolTransactionError::Underpriced,
72            );
73        }
74    }
75}
76
77/// An iterator that returns transactions that can be executed on the current state (*best*
78/// transactions).
79///
80/// The [`PendingPool`](crate::pool::pending::PendingPool) contains transactions that *could* all
81/// be executed on the current state, but only yields transactions that are ready to be executed
82/// now. While it contains all gapless transactions of a sender, it _always_ only returns the
83/// transaction with the current on chain nonce.
84#[derive(Debug)]
85pub struct BestTransactions<T: TransactionOrdering> {
86    /// Contains a copy of _all_ transactions of the pending pool at the point in time this
87    /// iterator was created.
88    pub(crate) all: OrdMap<TransactionId, PendingTransaction<T>>,
89    /// Transactions that can be executed right away: these have the expected nonce.
90    ///
91    /// Once an `independent` transaction with the nonce `N` is returned, it unlocks `N+1`, which
92    /// then can be moved from the `all` set to the `independent` set.
93    pub(crate) independent: BTreeSet<PendingTransaction<T>>,
94    /// There might be the case where a yielded transactions is invalid, this will track it.
95    pub(crate) invalid: HashSet<SenderId>,
96    /// Used to receive any new pending transactions that have been added to the pool after this
97    /// iterator was static filtered
98    ///
99    /// These new pending transactions are inserted into this iterator's pool before yielding the
100    /// next value
101    pub(crate) new_transaction_receiver: Option<Receiver<PendingTransaction<T>>>,
102    /// The priority value of most recently yielded transaction.
103    ///
104    /// This is required if new pending transactions are fed in while it yields new values.
105    pub(crate) last_priority: Option<Priority<T::PriorityValue>>,
106    /// Flag to control whether to skip blob transactions (EIP4844).
107    pub(crate) skip_blobs: bool,
108}
109
110impl<T: TransactionOrdering> BestTransactions<T> {
111    /// Mark the transaction and its descendants as invalid.
112    pub(crate) fn mark_invalid(
113        &mut self,
114        tx: &Arc<ValidPoolTransaction<T::Transaction>>,
115        _kind: InvalidPoolTransactionError,
116    ) {
117        self.invalid.insert(tx.sender_id());
118    }
119
120    /// Returns the ancestor the given transaction, the transaction with `nonce - 1`.
121    ///
122    /// Note: for a transaction with nonce higher than the current on chain nonce this will always
123    /// return an ancestor since all transactions in this pool are gapless.
124    pub(crate) fn ancestor(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
125        self.all.get(&id.unchecked_ancestor()?)
126    }
127
128    /// Non-blocking read on the new pending transactions subscription channel
129    fn try_recv(&mut self) -> Option<IncomingTransaction<T>> {
130        loop {
131            match self.new_transaction_receiver.as_mut()?.try_recv() {
132                Ok(tx) => {
133                    if let Some(last_priority) = &self.last_priority &&
134                        &tx.priority > last_priority
135                    {
136                        // we skip transactions if we already yielded a transaction with lower
137                        // priority
138                        return Some(IncomingTransaction::Stash(tx))
139                    }
140                    return Some(IncomingTransaction::Process(tx))
141                }
142                // note TryRecvError::Lagged can be returned here, which is an error that attempts
143                // to correct itself on consecutive try_recv() attempts
144
145                // the cost of ignoring this error is allowing old transactions to get
146                // overwritten after the chan buffer size is met
147                Err(TryRecvError::Lagged(_)) => {
148                    // Handle the case where the receiver lagged too far behind.
149                    // `num_skipped` indicates the number of messages that were skipped.
150                }
151
152                // this case is still better than the existing iterator behavior where no new
153                // pending txs are surfaced to consumers
154                Err(_) => return None,
155            }
156        }
157    }
158
159    /// Removes the currently best independent transaction from the independent set and the total
160    /// set.
161    fn pop_best(&mut self) -> Option<PendingTransaction<T>> {
162        self.independent.pop_last().inspect(|best| {
163            self.all.remove(best.transaction.id());
164        })
165    }
166
167    /// Checks for new transactions that have come into the `PendingPool` after this iterator was
168    /// created and inserts them
169    fn add_new_transactions(&mut self) {
170        for _ in 0..MAX_NEW_TRANSACTIONS_PER_BATCH {
171            if let Some(pending_tx) = self.try_recv() {
172                //  same logic as PendingPool::add_transaction/PendingPool::best_with_unlocked
173
174                match pending_tx {
175                    IncomingTransaction::Process(tx) => {
176                        let tx_id = *tx.transaction.id();
177                        if self.ancestor(&tx_id).is_none() {
178                            self.independent.insert(tx.clone());
179                        }
180                        self.all.insert(tx_id, tx);
181                    }
182                    IncomingTransaction::Stash(tx) => {
183                        let tx_id = *tx.transaction.id();
184                        self.all.insert(tx_id, tx);
185                    }
186                }
187            } else {
188                break;
189            }
190        }
191    }
192
193    /// Returns the next best transaction and its priority value.
194    #[expect(clippy::type_complexity)]
195    pub fn next_tx_and_priority(
196        &mut self,
197    ) -> Option<(Arc<ValidPoolTransaction<T::Transaction>>, Priority<T::PriorityValue>)> {
198        loop {
199            self.add_new_transactions();
200            // Remove the next independent tx with the highest priority
201            let best = self.pop_best()?;
202            let sender_id = best.transaction.sender_id();
203
204            // skip transactions for which sender was marked as invalid
205            if self.invalid.contains(&sender_id) {
206                debug!(
207                    target: "txpool",
208                    "[{:?}] skipping invalid transaction",
209                    best.transaction.hash()
210                );
211                continue
212            }
213
214            // Insert transactions that just got unlocked.
215            if let Some(unlocked) = self.all.get(&best.unlocks()) {
216                self.independent.insert(unlocked.clone());
217            }
218
219            if self.skip_blobs && best.transaction.is_eip4844() {
220                // blobs should be skipped, marking them as invalid will ensure that no dependent
221                // transactions are returned
222                self.mark_invalid(
223                    &best.transaction,
224                    InvalidPoolTransactionError::Eip4844(
225                        Eip4844PoolTransactionError::NoEip4844Blobs,
226                    ),
227                )
228            } else {
229                if self.new_transaction_receiver.is_some() {
230                    self.last_priority = Some(best.priority.clone())
231                }
232                return Some((best.transaction, best.priority))
233            }
234        }
235    }
236}
237
238/// Result of attempting to receive a new transaction from the channel during iteration.
239///
240/// This enum determines how a newly received transaction should be handled based on its priority
241/// relative to transactions already yielded by the iterator.
242enum IncomingTransaction<T: TransactionOrdering> {
243    /// Process the transaction normally: add to both `all` map and potentially to `independent`
244    /// set (if it has no ancestor).
245    ///
246    /// This variant is used when the transaction's priority is lower than or equal to the last
247    /// yielded transaction, meaning it can be safely processed without breaking the descending
248    /// priority order.
249    Process(PendingTransaction<T>),
250
251    /// Stash the transaction: add only to the `all` map, but NOT to the `independent` set.
252    ///
253    /// This variant is used when the transaction has a higher priority than the last yielded
254    /// transaction. We cannot yield it immediately (to maintain strict priority ordering), but we
255    /// must still track it so that:
256    /// - Its descendants can find it via `ancestor()` lookups
257    /// - We prevent those descendants from being incorrectly promoted to `independent`
258    ///
259    /// Without stashing, if a child of this transaction arrives later, it would fail to find its
260    /// parent in `all`, be marked as `independent`, and be yielded out of order (before its
261    /// parent), causing nonce gaps.
262    Stash(PendingTransaction<T>),
263}
264
265impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactions<T> {
266    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
267        Self::mark_invalid(self, tx, kind)
268    }
269
270    fn no_updates(&mut self) {
271        self.new_transaction_receiver.take();
272        self.last_priority.take();
273    }
274
275    fn skip_blobs(&mut self) {
276        self.set_skip_blobs(true);
277    }
278
279    fn set_skip_blobs(&mut self, skip_blobs: bool) {
280        self.skip_blobs = skip_blobs;
281    }
282}
283
284impl<T: TransactionOrdering> Iterator for BestTransactions<T> {
285    type Item = Arc<ValidPoolTransaction<T::Transaction>>;
286
287    fn next(&mut self) -> Option<Self::Item> {
288        self.next_tx_and_priority().map(|(tx, _)| tx)
289    }
290}
291
292/// A [`BestTransactions`](crate::traits::BestTransactions) implementation that filters the
293/// transactions of iter with predicate.
294///
295/// Filter out transactions are marked as invalid:
296/// [`BestTransactions::mark_invalid`](crate::traits::BestTransactions::mark_invalid).
297pub struct BestTransactionFilter<I, P> {
298    pub(crate) best: I,
299    pub(crate) predicate: P,
300}
301
302impl<I, P> BestTransactionFilter<I, P> {
303    /// Create a new [`BestTransactionFilter`] with the given predicate.
304    pub const fn new(best: I, predicate: P) -> Self {
305        Self { best, predicate }
306    }
307}
308
309impl<I, P> Iterator for BestTransactionFilter<I, P>
310where
311    I: crate::traits::BestTransactions,
312    P: FnMut(&<I as Iterator>::Item) -> bool,
313{
314    type Item = <I as Iterator>::Item;
315
316    fn next(&mut self) -> Option<Self::Item> {
317        loop {
318            let best = self.best.next()?;
319            if (self.predicate)(&best) {
320                return Some(best)
321            }
322            self.best.mark_invalid(
323                &best,
324                InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
325            );
326        }
327    }
328}
329
330impl<I, P> crate::traits::BestTransactions for BestTransactionFilter<I, P>
331where
332    I: crate::traits::BestTransactions,
333    P: FnMut(&<I as Iterator>::Item) -> bool + Send,
334{
335    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
336        crate::traits::BestTransactions::mark_invalid(&mut self.best, tx, kind)
337    }
338
339    fn no_updates(&mut self) {
340        self.best.no_updates()
341    }
342
343    fn skip_blobs(&mut self) {
344        self.set_skip_blobs(true)
345    }
346
347    fn set_skip_blobs(&mut self, skip_blobs: bool) {
348        self.best.set_skip_blobs(skip_blobs)
349    }
350}
351
352impl<I: fmt::Debug, P> fmt::Debug for BestTransactionFilter<I, P> {
353    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
354        f.debug_struct("BestTransactionFilter").field("best", &self.best).finish()
355    }
356}
357
358/// Wrapper over [`crate::traits::BestTransactions`] that prioritizes transactions of certain
359/// senders capping total gas used by such transactions.
360#[derive(Debug)]
361pub struct BestTransactionsWithPrioritizedSenders<I: Iterator> {
362    /// Inner iterator
363    inner: I,
364    /// A set of senders which transactions should be prioritized
365    prioritized_senders: AddressSet,
366    /// Maximum total gas limit of prioritized transactions
367    max_prioritized_gas: u64,
368    /// Buffer with transactions that are not being prioritized. Those will be the first to be
369    /// included after the prioritized transactions
370    buffer: VecDeque<I::Item>,
371    /// Tracker of total gas limit of prioritized transactions. Once it reaches
372    /// `max_prioritized_gas` no more transactions will be prioritized
373    prioritized_gas: u64,
374}
375
376impl<I: Iterator> BestTransactionsWithPrioritizedSenders<I> {
377    /// Constructs a new [`BestTransactionsWithPrioritizedSenders`].
378    pub fn new(prioritized_senders: AddressSet, max_prioritized_gas: u64, inner: I) -> Self {
379        Self {
380            inner,
381            prioritized_senders,
382            max_prioritized_gas,
383            buffer: Default::default(),
384            prioritized_gas: Default::default(),
385        }
386    }
387}
388
389impl<I, T> Iterator for BestTransactionsWithPrioritizedSenders<I>
390where
391    I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
392    T: PoolTransaction,
393{
394    type Item = <I as Iterator>::Item;
395
396    fn next(&mut self) -> Option<Self::Item> {
397        // If we have space, try prioritizing transactions
398        if self.prioritized_gas < self.max_prioritized_gas {
399            for item in &mut self.inner {
400                if self.prioritized_senders.contains(&item.transaction.sender()) &&
401                    self.prioritized_gas + item.transaction.gas_limit() <=
402                        self.max_prioritized_gas
403                {
404                    self.prioritized_gas += item.transaction.gas_limit();
405                    return Some(item)
406                }
407                self.buffer.push_back(item);
408            }
409        }
410
411        if let Some(item) = self.buffer.pop_front() {
412            Some(item)
413        } else {
414            self.inner.next()
415        }
416    }
417}
418
419impl<I, T> crate::traits::BestTransactions for BestTransactionsWithPrioritizedSenders<I>
420where
421    I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
422    T: PoolTransaction,
423{
424    fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
425        self.inner.mark_invalid(tx, kind)
426    }
427
428    fn no_updates(&mut self) {
429        self.inner.no_updates()
430    }
431
432    fn set_skip_blobs(&mut self, skip_blobs: bool) {
433        if skip_blobs {
434            self.buffer.retain(|tx| !tx.transaction.is_eip4844())
435        }
436        self.inner.set_skip_blobs(skip_blobs)
437    }
438}
439
440#[cfg(test)]
441mod tests {
442    use super::*;
443    use crate::{
444        pool::pending::PendingPool,
445        test_utils::{MockOrdering, MockTransaction, MockTransactionFactory},
446        BestTransactions, Priority,
447    };
448
449    #[test]
450    fn test_best_iter() {
451        let mut pool = PendingPool::new(MockOrdering::default());
452        let mut f = MockTransactionFactory::default();
453
454        let num_tx = 10;
455        // insert 10 gapless tx
456        let tx = MockTransaction::eip1559();
457        for nonce in 0..num_tx {
458            let tx = tx.clone().rng_hash().with_nonce(nonce);
459            let valid_tx = f.validated(tx);
460            pool.add_transaction(Arc::new(valid_tx), 0);
461        }
462
463        let mut best = pool.best();
464        assert_eq!(best.all.len(), num_tx as usize);
465        assert_eq!(best.independent.len(), 1);
466
467        // check tx are returned in order
468        for nonce in 0..num_tx {
469            assert_eq!(best.independent.len(), 1);
470            let tx = best.next().unwrap();
471            assert_eq!(tx.nonce(), nonce);
472        }
473    }
474
475    #[test]
476    fn test_best_iter_invalid() {
477        let mut pool = PendingPool::new(MockOrdering::default());
478        let mut f = MockTransactionFactory::default();
479
480        let num_tx = 10;
481        // insert 10 gapless tx
482        let tx = MockTransaction::eip1559();
483        for nonce in 0..num_tx {
484            let tx = tx.clone().rng_hash().with_nonce(nonce);
485            let valid_tx = f.validated(tx);
486            pool.add_transaction(Arc::new(valid_tx), 0);
487        }
488
489        let mut best = pool.best();
490
491        // mark the first tx as invalid
492        let invalid = best.independent.iter().next().unwrap();
493        best.mark_invalid(
494            &invalid.transaction.clone(),
495            InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
496        );
497
498        // iterator is empty
499        assert!(best.next().is_none());
500    }
501
502    #[test]
503    fn test_best_transactions_iter_invalid() {
504        let mut pool = PendingPool::new(MockOrdering::default());
505        let mut f = MockTransactionFactory::default();
506
507        let num_tx = 10;
508        // insert 10 gapless tx
509        let tx = MockTransaction::eip1559();
510        for nonce in 0..num_tx {
511            let tx = tx.clone().rng_hash().with_nonce(nonce);
512            let valid_tx = f.validated(tx);
513            pool.add_transaction(Arc::new(valid_tx), 0);
514        }
515
516        let mut best: Box<
517            dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<MockTransaction>>>,
518        > = Box::new(pool.best());
519
520        let tx = Iterator::next(&mut best).unwrap();
521        crate::traits::BestTransactions::mark_invalid(
522            &mut *best,
523            &tx,
524            InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
525        );
526        assert!(Iterator::next(&mut best).is_none());
527    }
528
529    #[test]
530    fn test_best_with_fees_iter_base_fee_satisfied() {
531        let mut pool = PendingPool::new(MockOrdering::default());
532        let mut f = MockTransactionFactory::default();
533
534        let num_tx = 5;
535        let base_fee: u64 = 10;
536        let base_fee_per_blob_gas: u64 = 15;
537
538        // Insert transactions with a max_fee_per_gas greater than or equal to the base fee
539        // Without blob fee
540        for nonce in 0..num_tx {
541            let tx = MockTransaction::eip1559()
542                .rng_hash()
543                .with_nonce(nonce)
544                .with_max_fee(base_fee as u128 + 5);
545            let valid_tx = f.validated(tx);
546            pool.add_transaction(Arc::new(valid_tx), 0);
547        }
548
549        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
550
551        for nonce in 0..num_tx {
552            let tx = best.next().expect("Transaction should be returned");
553            assert_eq!(tx.nonce(), nonce);
554            assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
555        }
556    }
557
558    #[test]
559    fn test_best_with_fees_iter_base_fee_violated() {
560        let mut pool = PendingPool::new(MockOrdering::default());
561        let mut f = MockTransactionFactory::default();
562
563        let num_tx = 5;
564        let base_fee: u64 = 20;
565        let base_fee_per_blob_gas: u64 = 15;
566
567        // Insert transactions with a max_fee_per_gas less than the base fee
568        for nonce in 0..num_tx {
569            let tx = MockTransaction::eip1559()
570                .rng_hash()
571                .with_nonce(nonce)
572                .with_max_fee(base_fee as u128 - 5);
573            let valid_tx = f.validated(tx);
574            pool.add_transaction(Arc::new(valid_tx), 0);
575        }
576
577        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
578
579        // No transaction should be returned since all violate the base fee
580        assert!(best.next().is_none());
581    }
582
583    #[test]
584    fn test_best_with_fees_iter_blob_fee_satisfied() {
585        let mut pool = PendingPool::new(MockOrdering::default());
586        let mut f = MockTransactionFactory::default();
587
588        let num_tx = 5;
589        let base_fee: u64 = 10;
590        let base_fee_per_blob_gas: u64 = 20;
591
592        // Insert transactions with a max_fee_per_blob_gas greater than or equal to the base fee per
593        // blob gas
594        for nonce in 0..num_tx {
595            let tx = MockTransaction::eip4844()
596                .rng_hash()
597                .with_nonce(nonce)
598                .with_max_fee(base_fee as u128 + 5)
599                .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
600            let valid_tx = f.validated(tx);
601            pool.add_transaction(Arc::new(valid_tx), 0);
602        }
603
604        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
605
606        // All transactions should be returned in order since they satisfy both base fee and blob
607        // fee
608        for nonce in 0..num_tx {
609            let tx = best.next().expect("Transaction should be returned");
610            assert_eq!(tx.nonce(), nonce);
611            assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
612            assert!(
613                tx.transaction.max_fee_per_blob_gas().unwrap() >= base_fee_per_blob_gas as u128
614            );
615        }
616
617        // No more transactions should be returned
618        assert!(best.next().is_none());
619    }
620
621    #[test]
622    fn test_best_with_fees_iter_blob_fee_violated() {
623        let mut pool = PendingPool::new(MockOrdering::default());
624        let mut f = MockTransactionFactory::default();
625
626        let num_tx = 5;
627        let base_fee: u64 = 10;
628        let base_fee_per_blob_gas: u64 = 20;
629
630        // Insert transactions with a max_fee_per_blob_gas less than the base fee per blob gas
631        for nonce in 0..num_tx {
632            let tx = MockTransaction::eip4844()
633                .rng_hash()
634                .with_nonce(nonce)
635                .with_max_fee(base_fee as u128 + 5)
636                .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
637            let valid_tx = f.validated(tx);
638            pool.add_transaction(Arc::new(valid_tx), 0);
639        }
640
641        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
642
643        // No transaction should be returned since all violate the blob fee
644        assert!(best.next().is_none());
645    }
646
647    #[test]
648    fn test_best_with_fees_iter_mixed_fees() {
649        let mut pool = PendingPool::new(MockOrdering::default());
650        let mut f = MockTransactionFactory::default();
651
652        let base_fee: u64 = 10;
653        let base_fee_per_blob_gas: u64 = 20;
654
655        // Insert transactions with varying max_fee_per_gas and max_fee_per_blob_gas
656        let tx1 =
657            MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
658        let tx2 = MockTransaction::eip4844()
659            .rng_hash()
660            .with_nonce(1)
661            .with_max_fee(base_fee as u128 + 5)
662            .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
663        let tx3 = MockTransaction::eip4844()
664            .rng_hash()
665            .with_nonce(2)
666            .with_max_fee(base_fee as u128 + 5)
667            .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
668        let tx4 =
669            MockTransaction::eip1559().rng_hash().with_nonce(3).with_max_fee(base_fee as u128 - 5);
670
671        pool.add_transaction(Arc::new(f.validated(tx1.clone())), 0);
672        pool.add_transaction(Arc::new(f.validated(tx2.clone())), 0);
673        pool.add_transaction(Arc::new(f.validated(tx3)), 0);
674        pool.add_transaction(Arc::new(f.validated(tx4)), 0);
675
676        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
677
678        let expected_order = vec![tx1, tx2];
679        for expected_tx in expected_order {
680            let tx = best.next().expect("Transaction should be returned");
681            assert_eq!(tx.transaction, expected_tx);
682        }
683
684        // No more transactions should be returned
685        assert!(best.next().is_none());
686    }
687
688    #[test]
689    fn test_best_add_transaction_with_next_nonce() {
690        let mut pool = PendingPool::new(MockOrdering::default());
691        let mut f = MockTransactionFactory::default();
692
693        // Add 5 transactions with increasing nonces to the pool
694        let num_tx = 5;
695        let tx = MockTransaction::eip1559();
696        for nonce in 0..num_tx {
697            let tx = tx.clone().rng_hash().with_nonce(nonce);
698            let valid_tx = f.validated(tx);
699            pool.add_transaction(Arc::new(valid_tx), 0);
700        }
701
702        // Create a BestTransactions iterator from the pool
703        let mut best = pool.best();
704
705        // Use a broadcast channel for transaction updates
706        let (tx_sender, tx_receiver) =
707            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
708        best.new_transaction_receiver = Some(tx_receiver);
709
710        // Create a new transaction with nonce 5 and validate it
711        let new_tx = MockTransaction::eip1559().rng_hash().with_nonce(5);
712        let valid_new_tx = f.validated(new_tx);
713
714        // Send the new transaction through the broadcast channel
715        let pending_tx = PendingTransaction {
716            submission_id: 10,
717            transaction: Arc::new(valid_new_tx.clone()),
718            priority: Priority::Value(1000),
719        };
720        tx_sender.send(pending_tx.clone()).unwrap();
721
722        // Add new transactions to the iterator
723        best.add_new_transactions();
724
725        // Verify that the new transaction has been added to the 'all' map
726        assert_eq!(best.all.len(), 6);
727        assert!(best.all.contains_key(valid_new_tx.id()));
728
729        // Verify that the new transaction has been added to the 'independent' set
730        assert_eq!(best.independent.len(), 2);
731        assert!(best.independent.contains(&pending_tx));
732    }
733
734    #[test]
735    fn test_best_add_transaction_with_ancestor() {
736        // Initialize a new PendingPool with default MockOrdering and MockTransactionFactory
737        let mut pool = PendingPool::new(MockOrdering::default());
738        let mut f = MockTransactionFactory::default();
739
740        // Add 5 transactions with increasing nonces to the pool
741        let num_tx = 5;
742        let tx = MockTransaction::eip1559();
743        for nonce in 0..num_tx {
744            let tx = tx.clone().rng_hash().with_nonce(nonce);
745            let valid_tx = f.validated(tx);
746            pool.add_transaction(Arc::new(valid_tx), 0);
747        }
748
749        // Create a BestTransactions iterator from the pool
750        let mut best = pool.best();
751
752        // Use a broadcast channel for transaction updates
753        let (tx_sender, tx_receiver) =
754            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
755        best.new_transaction_receiver = Some(tx_receiver);
756
757        // Create a new transaction with nonce 5 and validate it
758        let base_tx1 = MockTransaction::eip1559().rng_hash().with_nonce(5);
759        let valid_new_tx1 = f.validated(base_tx1.clone());
760
761        // Send the new transaction through the broadcast channel
762        let pending_tx1 = PendingTransaction {
763            submission_id: 10,
764            transaction: Arc::new(valid_new_tx1.clone()),
765            priority: Priority::Value(1000),
766        };
767        tx_sender.send(pending_tx1.clone()).unwrap();
768
769        // Add new transactions to the iterator
770        best.add_new_transactions();
771
772        // Verify that the new transaction has been added to the 'all' map
773        assert_eq!(best.all.len(), 6);
774        assert!(best.all.contains_key(valid_new_tx1.id()));
775
776        // Verify that the new transaction has been added to the 'independent' set
777        assert_eq!(best.independent.len(), 2);
778        assert!(best.independent.contains(&pending_tx1));
779
780        // Attempt to add a new transaction with a different nonce (not a duplicate)
781        let base_tx2 = base_tx1.with_nonce(6);
782        let valid_new_tx2 = f.validated(base_tx2);
783
784        // Send the new transaction through the broadcast channel
785        let pending_tx2 = PendingTransaction {
786            submission_id: 11, // Different submission ID
787            transaction: Arc::new(valid_new_tx2.clone()),
788            priority: Priority::Value(1000),
789        };
790        tx_sender.send(pending_tx2.clone()).unwrap();
791
792        // Add new transactions to the iterator
793        best.add_new_transactions();
794
795        // Verify that the new transaction has been added to 'all'
796        assert_eq!(best.all.len(), 7);
797        assert!(best.all.contains_key(valid_new_tx2.id()));
798
799        // Verify that the new transaction has not been added to the 'independent' set
800        assert_eq!(best.independent.len(), 2);
801        assert!(!best.independent.contains(&pending_tx2));
802    }
803
804    #[test]
805    fn test_best_transactions_filter_trait_object() {
806        // Initialize a new PendingPool with default MockOrdering and MockTransactionFactory
807        let mut pool = PendingPool::new(MockOrdering::default());
808        let mut f = MockTransactionFactory::default();
809
810        // Add 5 transactions with increasing nonces to the pool
811        let num_tx = 5;
812        let tx = MockTransaction::eip1559();
813        for nonce in 0..num_tx {
814            let tx = tx.clone().rng_hash().with_nonce(nonce);
815            let valid_tx = f.validated(tx);
816            pool.add_transaction(Arc::new(valid_tx), 0);
817        }
818
819        // Create a trait object of BestTransactions iterator from the pool
820        let best: Box<dyn crate::traits::BestTransactions<Item = _>> = Box::new(pool.best());
821
822        // Create a filter that only returns transactions with even nonces
823        let filter =
824            BestTransactionFilter::new(best, |tx: &Arc<ValidPoolTransaction<MockTransaction>>| {
825                tx.nonce().is_multiple_of(2)
826            });
827
828        // Verify that the filter only returns transactions with even nonces
829        for tx in filter {
830            assert_eq!(tx.nonce() % 2, 0);
831        }
832    }
833
834    #[test]
835    fn test_best_transactions_prioritized_senders() {
836        let mut pool = PendingPool::new(MockOrdering::default());
837        let mut f = MockTransactionFactory::default();
838
839        // Add 5 plain transactions from different senders with increasing gas price
840        for gas_price in 0..5 {
841            let tx = MockTransaction::eip1559().with_gas_price((gas_price + 1) * 10);
842            let valid_tx = f.validated(tx);
843            pool.add_transaction(Arc::new(valid_tx), 0);
844        }
845
846        // Add another transaction with 5 gas price that's going to be prioritized by sender
847        let prioritized_tx = MockTransaction::eip1559().with_gas_price(5).with_gas_limit(200);
848        let valid_prioritized_tx = f.validated(prioritized_tx.clone());
849        pool.add_transaction(Arc::new(valid_prioritized_tx), 0);
850
851        // Add another transaction with 3 gas price that should not be prioritized by sender because
852        // of gas limit.
853        let prioritized_tx2 = MockTransaction::eip1559().with_gas_price(3);
854        let valid_prioritized_tx2 = f.validated(prioritized_tx2.clone());
855        pool.add_transaction(Arc::new(valid_prioritized_tx2), 0);
856
857        let prioritized_senders =
858            AddressSet::from_iter([prioritized_tx.sender(), prioritized_tx2.sender()]);
859        let best =
860            BestTransactionsWithPrioritizedSenders::new(prioritized_senders, 200, pool.best());
861
862        // Verify that the prioritized transaction is returned first
863        // and the rest are returned in the reverse order of gas price
864        let mut iter = best.into_iter();
865        let top_of_block_tx = iter.next().unwrap();
866        assert_eq!(top_of_block_tx.max_fee_per_gas(), 5);
867        assert_eq!(top_of_block_tx.sender(), prioritized_tx.sender());
868        for gas_price in (0..5).rev() {
869            assert_eq!(iter.next().unwrap().max_fee_per_gas(), (gas_price + 1) * 10);
870        }
871
872        // Due to the gas limit, the transaction from second-prioritized sender was not
873        // prioritized.
874        let top_of_block_tx2 = iter.next().unwrap();
875        assert_eq!(top_of_block_tx2.max_fee_per_gas(), 3);
876        assert_eq!(top_of_block_tx2.sender(), prioritized_tx2.sender());
877    }
878
879    #[test]
880    fn test_best_with_fees_iter_no_blob_fee_required() {
881        // Tests transactions without blob fees where base fees are checked.
882        let mut pool = PendingPool::new(MockOrdering::default());
883        let mut f = MockTransactionFactory::default();
884
885        let base_fee: u64 = 10;
886        let base_fee_per_blob_gas: u64 = 0; // No blob fee requirement
887
888        // Insert transactions with max_fee_per_gas above the base fee
889        for nonce in 0..5 {
890            let tx = MockTransaction::eip1559()
891                .rng_hash()
892                .with_nonce(nonce)
893                .with_max_fee(base_fee as u128 + 5);
894            let valid_tx = f.validated(tx);
895            pool.add_transaction(Arc::new(valid_tx), 0);
896        }
897
898        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
899
900        // All transactions should be returned as no blob fee requirement is imposed
901        for nonce in 0..5 {
902            let tx = best.next().expect("Transaction should be returned");
903            assert_eq!(tx.nonce(), nonce);
904        }
905
906        // Ensure no more transactions are left
907        assert!(best.next().is_none());
908    }
909
910    #[test]
911    fn test_best_with_fees_iter_mix_of_blob_and_non_blob_transactions() {
912        // Tests mixed scenarios with both blob and non-blob transactions.
913        let mut pool = PendingPool::new(MockOrdering::default());
914        let mut f = MockTransactionFactory::default();
915
916        let base_fee: u64 = 10;
917        let base_fee_per_blob_gas: u64 = 15;
918
919        // Add a non-blob transaction that satisfies the base fee
920        let tx_non_blob =
921            MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
922        pool.add_transaction(Arc::new(f.validated(tx_non_blob.clone())), 0);
923
924        // Add a blob transaction that satisfies both base fee and blob fee
925        let tx_blob = MockTransaction::eip4844()
926            .rng_hash()
927            .with_nonce(1)
928            .with_max_fee(base_fee as u128 + 5)
929            .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
930        pool.add_transaction(Arc::new(f.validated(tx_blob.clone())), 0);
931
932        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
933
934        // Verify both transactions are returned
935        let tx = best.next().expect("Transaction should be returned");
936        assert_eq!(tx.transaction, tx_non_blob);
937
938        let tx = best.next().expect("Transaction should be returned");
939        assert_eq!(tx.transaction, tx_blob);
940
941        // Ensure no more transactions are left
942        assert!(best.next().is_none());
943    }
944
945    #[test]
946    fn test_best_transactions_with_skipping_blobs() {
947        // Tests the skip_blobs functionality to ensure blob transactions are skipped.
948        let mut pool = PendingPool::new(MockOrdering::default());
949        let mut f = MockTransactionFactory::default();
950
951        // Add a blob transaction
952        let tx_blob = MockTransaction::eip4844().rng_hash().with_nonce(0).with_blob_fee(100);
953        let valid_blob_tx = f.validated(tx_blob);
954        pool.add_transaction(Arc::new(valid_blob_tx), 0);
955
956        // Add a non-blob transaction
957        let tx_non_blob = MockTransaction::eip1559().rng_hash().with_nonce(1).with_max_fee(200);
958        let valid_non_blob_tx = f.validated(tx_non_blob.clone());
959        pool.add_transaction(Arc::new(valid_non_blob_tx), 0);
960
961        let mut best = pool.best();
962        best.skip_blobs();
963
964        // Only the non-blob transaction should be returned
965        let tx = best.next().expect("Transaction should be returned");
966        assert_eq!(tx.transaction, tx_non_blob);
967
968        // Ensure no more transactions are left
969        assert!(best.next().is_none());
970    }
971
972    #[test]
973    fn test_best_transactions_no_updates() {
974        // Tests the no_updates functionality to ensure it properly clears the
975        // new_transaction_receiver.
976        let mut pool = PendingPool::new(MockOrdering::default());
977        let mut f = MockTransactionFactory::default();
978
979        // Add a transaction
980        let tx = MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(100);
981        let valid_tx = f.validated(tx);
982        pool.add_transaction(Arc::new(valid_tx), 0);
983
984        let mut best = pool.best();
985
986        // Use a broadcast channel for transaction updates
987        let (_tx_sender, tx_receiver) =
988            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
989        best.new_transaction_receiver = Some(tx_receiver);
990
991        // Ensure receiver is set
992        assert!(best.new_transaction_receiver.is_some());
993
994        // Call no_updates to clear the receiver
995        best.no_updates();
996
997        // Ensure receiver is cleared
998        assert!(best.new_transaction_receiver.is_none());
999    }
1000
1001    #[test]
1002    fn test_best_update_transaction_priority() {
1003        let mut pool = PendingPool::new(MockOrdering::default());
1004        let mut f = MockTransactionFactory::default();
1005
1006        // Add 5 transactions with increasing nonces to the pool
1007        let num_tx = 5;
1008        let tx = MockTransaction::eip1559();
1009        for nonce in 0..num_tx {
1010            let tx = tx.clone().rng_hash().with_nonce(nonce);
1011            let valid_tx = f.validated(tx);
1012            pool.add_transaction(Arc::new(valid_tx), 0);
1013        }
1014
1015        // Create a BestTransactions iterator from the pool
1016        let mut best = pool.best();
1017
1018        // Use a broadcast channel for transaction updates
1019        let (tx_sender, tx_receiver) =
1020            tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
1021        best.new_transaction_receiver = Some(tx_receiver);
1022
1023        // yield one tx, effectively locking in the highest prio
1024        let first = best.next().unwrap();
1025
1026        // Create a new transaction with nonce 5 and validate it
1027        let new_higher_fee_tx = MockTransaction::eip1559().with_nonce(0);
1028        let valid_new_higher_fee_tx = f.validated(new_higher_fee_tx);
1029
1030        // Send the new transaction through the broadcast channel
1031        let pending_tx = PendingTransaction {
1032            submission_id: 10,
1033            transaction: Arc::new(valid_new_higher_fee_tx.clone()),
1034            priority: Priority::Value(u128::MAX),
1035        };
1036        tx_sender.send(pending_tx).unwrap();
1037
1038        // ensure that the higher prio tx is skipped since we yielded a lower one
1039        for tx in best {
1040            assert_eq!(tx.sender_id(), first.sender_id());
1041            assert_ne!(tx.sender_id(), valid_new_higher_fee_tx.sender_id());
1042        }
1043    }
1044
1045    /// Reproduces the "Blob Transaction Ordering, Multiple Clients" Hive scenario.
1046    ///
1047    /// Sender A contributes 5-blob transactions while sender B contributes 1-blob transactions.
1048    /// A single payload build should be able to fill the block with 6 blobs total (5+1).
1049    #[test]
1050    fn test_blob_transaction_ordering_multiple_clients_shape() {
1051        let mut pool = PendingPool::new(MockOrdering::default());
1052        let mut f = MockTransactionFactory::default();
1053
1054        let base_fee: u64 = 10;
1055        let base_fee_per_blob_gas: u64 = 1;
1056        let max_blob_count: u64 = 6;
1057
1058        let sender_a = MockTransaction::eip4844()
1059            .with_blob_hashes(5)
1060            .with_max_fee(base_fee as u128 + 20)
1061            .with_priority_fee(base_fee as u128 + 20)
1062            .with_blob_fee(120);
1063        for nonce in 0..5u64 {
1064            let tx = sender_a.clone().rng_hash().with_nonce(nonce);
1065            pool.add_transaction(Arc::new(f.validated(tx)), 0);
1066        }
1067
1068        let sender_b = MockTransaction::eip4844()
1069            .with_blob_hashes(1)
1070            .with_max_fee(base_fee as u128 + 20)
1071            .with_priority_fee(base_fee as u128 + 20)
1072            .with_blob_fee(100);
1073        for nonce in 0..5u64 {
1074            let tx = sender_b.clone().rng_hash().with_nonce(nonce);
1075            pool.add_transaction(Arc::new(f.validated(tx)), 0);
1076        }
1077
1078        let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
1079        let mut block_blob_count = 0u64;
1080        let mut included_txs = 0u64;
1081
1082        while let Some(tx) = best.next() {
1083            if let Some(blob_hashes) = tx.transaction.blob_versioned_hashes() {
1084                let tx_blob_count = blob_hashes.len() as u64;
1085
1086                if block_blob_count + tx_blob_count > max_blob_count {
1087                    crate::traits::BestTransactions::mark_invalid(
1088                        &mut best,
1089                        &tx,
1090                        InvalidPoolTransactionError::Eip4844(
1091                            Eip4844PoolTransactionError::TooManyEip4844Blobs {
1092                                have: block_blob_count + tx_blob_count,
1093                                permitted: max_blob_count,
1094                            },
1095                        ),
1096                    );
1097                    continue;
1098                }
1099
1100                block_blob_count += tx_blob_count;
1101                included_txs += 1;
1102
1103                if block_blob_count == max_blob_count {
1104                    best.skip_blobs();
1105                    break;
1106                }
1107            }
1108        }
1109
1110        assert_eq!(
1111            block_blob_count, max_blob_count,
1112            "expected a full blob block (5+1 blobs across senders)"
1113        );
1114        assert_eq!(included_txs, 2, "expected one 5-blob tx and one 1-blob tx in the block");
1115    }
1116}