use crate::{
identifier::TransactionId, pool::pending::PendingTransaction, PoolTransaction,
TransactionOrdering, ValidPoolTransaction,
};
use core::fmt;
use reth_primitives::B256 as TxHash;
use std::{
collections::{BTreeMap, BTreeSet, HashSet},
sync::Arc,
};
use tokio::sync::broadcast::{error::TryRecvError, Receiver};
use tracing::debug;
pub(crate) struct BestTransactionsWithFees<T: TransactionOrdering> {
pub(crate) best: BestTransactions<T>,
pub(crate) base_fee: u64,
pub(crate) base_fee_per_blob_gas: u64,
}
impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactionsWithFees<T> {
fn mark_invalid(&mut self, tx: &Self::Item) {
BestTransactions::mark_invalid(&mut self.best, tx)
}
fn no_updates(&mut self) {
self.best.no_updates()
}
fn skip_blobs(&mut self) {
self.set_skip_blobs(true)
}
fn set_skip_blobs(&mut self, skip_blobs: bool) {
self.best.set_skip_blobs(skip_blobs)
}
}
impl<T: TransactionOrdering> Iterator for BestTransactionsWithFees<T> {
type Item = Arc<ValidPoolTransaction<T::Transaction>>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let best = self.best.next()?;
if best.transaction.max_fee_per_gas() >= self.base_fee as u128 &&
best.transaction
.max_fee_per_blob_gas()
.map_or(true, |fee| fee >= self.base_fee_per_blob_gas as u128)
{
return Some(best);
} else {
crate::traits::BestTransactions::mark_invalid(self, &best);
}
}
}
}
pub(crate) struct BestTransactions<T: TransactionOrdering> {
pub(crate) all: BTreeMap<TransactionId, PendingTransaction<T>>,
pub(crate) independent: BTreeSet<PendingTransaction<T>>,
pub(crate) invalid: HashSet<TxHash>,
pub(crate) new_transaction_receiver: Option<Receiver<PendingTransaction<T>>>,
pub(crate) skip_blobs: bool,
}
impl<T: TransactionOrdering> BestTransactions<T> {
pub(crate) fn mark_invalid(&mut self, tx: &Arc<ValidPoolTransaction<T::Transaction>>) {
self.invalid.insert(*tx.hash());
}
pub(crate) fn ancestor(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
self.all.get(&id.unchecked_ancestor()?)
}
fn try_recv(&mut self) -> Option<PendingTransaction<T>> {
loop {
match self.new_transaction_receiver.as_mut()?.try_recv() {
Ok(tx) => return Some(tx),
Err(TryRecvError::Lagged(_)) => {
continue
}
Err(_) => return None,
}
}
}
fn add_new_transactions(&mut self) {
while let Some(pending_tx) = self.try_recv() {
let tx = pending_tx.transaction.clone();
let tx_id = *tx.id();
if self.ancestor(&tx_id).is_none() {
self.independent.insert(pending_tx.clone());
}
self.all.insert(tx_id, pending_tx);
}
}
}
impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactions<T> {
fn mark_invalid(&mut self, tx: &Self::Item) {
Self::mark_invalid(self, tx)
}
fn no_updates(&mut self) {
self.new_transaction_receiver.take();
}
fn skip_blobs(&mut self) {
self.set_skip_blobs(true);
}
fn set_skip_blobs(&mut self, skip_blobs: bool) {
self.skip_blobs = skip_blobs;
}
}
impl<T: TransactionOrdering> Iterator for BestTransactions<T> {
type Item = Arc<ValidPoolTransaction<T::Transaction>>;
fn next(&mut self) -> Option<Self::Item> {
loop {
self.add_new_transactions();
let best = self.independent.pop_last()?;
let hash = best.transaction.hash();
if self.invalid.contains(hash) {
debug!(
target: "txpool",
"[{:?}] skipping invalid transaction",
hash
);
continue
}
if let Some(unlocked) = self.all.get(&best.unlocks()) {
self.independent.insert(unlocked.clone());
}
if self.skip_blobs && best.transaction.transaction.is_eip4844() {
self.mark_invalid(&best.transaction)
} else {
return Some(best.transaction)
}
}
}
}
pub struct BestTransactionFilter<I, P> {
pub(crate) best: I,
pub(crate) predicate: P,
}
impl<I, P> BestTransactionFilter<I, P> {
pub(crate) const fn new(best: I, predicate: P) -> Self {
Self { best, predicate }
}
}
impl<I, P> Iterator for BestTransactionFilter<I, P>
where
I: crate::traits::BestTransactions,
P: FnMut(&<I as Iterator>::Item) -> bool,
{
type Item = <I as Iterator>::Item;
fn next(&mut self) -> Option<Self::Item> {
loop {
let best = self.best.next()?;
if (self.predicate)(&best) {
return Some(best)
} else {
self.best.mark_invalid(&best);
}
}
}
}
impl<I, P> crate::traits::BestTransactions for BestTransactionFilter<I, P>
where
I: crate::traits::BestTransactions,
P: FnMut(&<I as Iterator>::Item) -> bool + Send,
{
fn mark_invalid(&mut self, tx: &Self::Item) {
crate::traits::BestTransactions::mark_invalid(&mut self.best, tx)
}
fn no_updates(&mut self) {
self.best.no_updates()
}
fn skip_blobs(&mut self) {
self.set_skip_blobs(true)
}
fn set_skip_blobs(&mut self, skip_blobs: bool) {
self.best.set_skip_blobs(skip_blobs)
}
}
impl<I: fmt::Debug, P> fmt::Debug for BestTransactionFilter<I, P> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BestTransactionFilter").field("best", &self.best).finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
pool::pending::PendingPool,
test_utils::{MockOrdering, MockTransaction, MockTransactionFactory},
Priority,
};
use reth_primitives::U256;
#[test]
fn test_best_iter() {
let mut pool = PendingPool::new(MockOrdering::default());
let mut f = MockTransactionFactory::default();
let num_tx = 10;
let tx = MockTransaction::eip1559();
for nonce in 0..num_tx {
let tx = tx.clone().rng_hash().with_nonce(nonce);
let valid_tx = f.validated(tx);
pool.add_transaction(Arc::new(valid_tx), 0);
}
let mut best = pool.best();
assert_eq!(best.all.len(), num_tx as usize);
assert_eq!(best.independent.len(), 1);
for nonce in 0..num_tx {
assert_eq!(best.independent.len(), 1);
let tx = best.next().unwrap();
assert_eq!(tx.nonce(), nonce);
}
}
#[test]
fn test_best_iter_invalid() {
let mut pool = PendingPool::new(MockOrdering::default());
let mut f = MockTransactionFactory::default();
let num_tx = 10;
let tx = MockTransaction::eip1559();
for nonce in 0..num_tx {
let tx = tx.clone().rng_hash().with_nonce(nonce);
let valid_tx = f.validated(tx);
pool.add_transaction(Arc::new(valid_tx), 0);
}
let mut best = pool.best();
let invalid = best.independent.iter().next().unwrap();
best.mark_invalid(&invalid.transaction.clone());
assert!(best.next().is_none());
}
#[test]
fn test_best_with_fees_iter_base_fee_satisfied() {
let mut pool = PendingPool::new(MockOrdering::default());
let mut f = MockTransactionFactory::default();
let num_tx = 5;
let base_fee: u64 = 10;
let base_fee_per_blob_gas: u64 = 15;
for nonce in 0..num_tx {
let tx = MockTransaction::eip1559()
.rng_hash()
.with_nonce(nonce)
.with_max_fee(base_fee as u128 + 5);
let valid_tx = f.validated(tx);
pool.add_transaction(Arc::new(valid_tx), 0);
}
let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
for nonce in 0..num_tx {
let tx = best.next().expect("Transaction should be returned");
assert_eq!(tx.nonce(), nonce);
assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
}
}
#[test]
fn test_best_with_fees_iter_base_fee_violated() {
let mut pool = PendingPool::new(MockOrdering::default());
let mut f = MockTransactionFactory::default();
let num_tx = 5;
let base_fee: u64 = 20;
let base_fee_per_blob_gas: u64 = 15;
for nonce in 0..num_tx {
let tx = MockTransaction::eip1559()
.rng_hash()
.with_nonce(nonce)
.with_max_fee(base_fee as u128 - 5);
let valid_tx = f.validated(tx);
pool.add_transaction(Arc::new(valid_tx), 0);
}
let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
assert!(best.next().is_none());
}
#[test]
fn test_best_with_fees_iter_blob_fee_satisfied() {
let mut pool = PendingPool::new(MockOrdering::default());
let mut f = MockTransactionFactory::default();
let num_tx = 5;
let base_fee: u64 = 10;
let base_fee_per_blob_gas: u64 = 20;
for nonce in 0..num_tx {
let tx = MockTransaction::eip4844()
.rng_hash()
.with_nonce(nonce)
.with_max_fee(base_fee as u128 + 5)
.with_blob_fee(base_fee_per_blob_gas as u128 + 5);
let valid_tx = f.validated(tx);
pool.add_transaction(Arc::new(valid_tx), 0);
}
let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
for nonce in 0..num_tx {
let tx = best.next().expect("Transaction should be returned");
assert_eq!(tx.nonce(), nonce);
assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
assert!(
tx.transaction.max_fee_per_blob_gas().unwrap() >= base_fee_per_blob_gas as u128
);
}
assert!(best.next().is_none());
}
#[test]
fn test_best_with_fees_iter_blob_fee_violated() {
let mut pool = PendingPool::new(MockOrdering::default());
let mut f = MockTransactionFactory::default();
let num_tx = 5;
let base_fee: u64 = 10;
let base_fee_per_blob_gas: u64 = 20;
for nonce in 0..num_tx {
let tx = MockTransaction::eip4844()
.rng_hash()
.with_nonce(nonce)
.with_max_fee(base_fee as u128 + 5)
.with_blob_fee(base_fee_per_blob_gas as u128 - 5);
let valid_tx = f.validated(tx);
pool.add_transaction(Arc::new(valid_tx), 0);
}
let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
assert!(best.next().is_none());
}
#[test]
fn test_best_with_fees_iter_mixed_fees() {
let mut pool = PendingPool::new(MockOrdering::default());
let mut f = MockTransactionFactory::default();
let base_fee: u64 = 10;
let base_fee_per_blob_gas: u64 = 20;
let tx1 =
MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
let tx2 = MockTransaction::eip4844()
.rng_hash()
.with_nonce(1)
.with_max_fee(base_fee as u128 + 5)
.with_blob_fee(base_fee_per_blob_gas as u128 + 5);
let tx3 = MockTransaction::eip4844()
.rng_hash()
.with_nonce(2)
.with_max_fee(base_fee as u128 + 5)
.with_blob_fee(base_fee_per_blob_gas as u128 - 5);
let tx4 =
MockTransaction::eip1559().rng_hash().with_nonce(3).with_max_fee(base_fee as u128 - 5);
pool.add_transaction(Arc::new(f.validated(tx1.clone())), 0);
pool.add_transaction(Arc::new(f.validated(tx2.clone())), 0);
pool.add_transaction(Arc::new(f.validated(tx3)), 0);
pool.add_transaction(Arc::new(f.validated(tx4)), 0);
let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
let expected_order = vec![tx1, tx2];
for expected_tx in expected_order {
let tx = best.next().expect("Transaction should be returned");
assert_eq!(tx.transaction, expected_tx);
}
assert!(best.next().is_none());
}
#[test]
fn test_best_add_transaction_with_next_nonce() {
let mut pool = PendingPool::new(MockOrdering::default());
let mut f = MockTransactionFactory::default();
let num_tx = 5;
let tx = MockTransaction::eip1559();
for nonce in 0..num_tx {
let tx = tx.clone().rng_hash().with_nonce(nonce);
let valid_tx = f.validated(tx);
pool.add_transaction(Arc::new(valid_tx), 0);
}
let mut best = pool.best();
let (tx_sender, tx_receiver) =
tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
best.new_transaction_receiver = Some(tx_receiver);
let new_tx = MockTransaction::eip1559().rng_hash().with_nonce(5);
let valid_new_tx = f.validated(new_tx);
let pending_tx = PendingTransaction {
submission_id: 10,
transaction: Arc::new(valid_new_tx.clone()),
priority: Priority::Value(U256::from(1000)),
};
tx_sender.send(pending_tx.clone()).unwrap();
best.add_new_transactions();
assert_eq!(best.all.len(), 6);
assert!(best.all.contains_key(valid_new_tx.id()));
assert_eq!(best.independent.len(), 2);
assert!(best.independent.contains(&pending_tx));
}
#[test]
fn test_best_add_transaction_with_ancestor() {
let mut pool = PendingPool::new(MockOrdering::default());
let mut f = MockTransactionFactory::default();
let num_tx = 5;
let tx = MockTransaction::eip1559();
for nonce in 0..num_tx {
let tx = tx.clone().rng_hash().with_nonce(nonce);
let valid_tx = f.validated(tx);
pool.add_transaction(Arc::new(valid_tx), 0);
}
let mut best = pool.best();
let (tx_sender, tx_receiver) =
tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
best.new_transaction_receiver = Some(tx_receiver);
let base_tx1 = MockTransaction::eip1559().rng_hash().with_nonce(5);
let valid_new_tx1 = f.validated(base_tx1.clone());
let pending_tx1 = PendingTransaction {
submission_id: 10,
transaction: Arc::new(valid_new_tx1.clone()),
priority: Priority::Value(U256::from(1000)),
};
tx_sender.send(pending_tx1.clone()).unwrap();
best.add_new_transactions();
assert_eq!(best.all.len(), 6);
assert!(best.all.contains_key(valid_new_tx1.id()));
assert_eq!(best.independent.len(), 2);
assert!(best.independent.contains(&pending_tx1));
let base_tx2 = base_tx1.with_nonce(6);
let valid_new_tx2 = f.validated(base_tx2);
let pending_tx2 = PendingTransaction {
submission_id: 11, transaction: Arc::new(valid_new_tx2.clone()),
priority: Priority::Value(U256::from(1000)),
};
tx_sender.send(pending_tx2.clone()).unwrap();
best.add_new_transactions();
assert_eq!(best.all.len(), 7);
assert!(best.all.contains_key(valid_new_tx2.id()));
assert_eq!(best.independent.len(), 2);
assert!(!best.independent.contains(&pending_tx2));
}
}