use crate::{
config::{LocalTransactionConfig, TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER},
error::{Eip4844PoolTransactionError, InvalidPoolTransactionError, PoolError, PoolErrorKind},
identifier::{SenderId, TransactionId},
metrics::{AllTransactionsMetrics, TxPoolMetrics},
pool::{
best::BestTransactions,
blob::BlobTransactions,
parked::{BasefeeOrd, ParkedPool, QueuedOrd},
pending::PendingPool,
state::{SubPool, TxState},
update::{Destination, PoolUpdate},
AddedPendingTransaction, AddedTransaction, OnNewCanonicalStateOutcome,
},
traits::{BestTransactionsAttributes, BlockInfo, PoolSize},
PoolConfig, PoolResult, PoolTransaction, PoolUpdateKind, PriceBumpConfig, TransactionOrdering,
ValidPoolTransaction, U256,
};
use alloy_consensus::constants::{
EIP1559_TX_TYPE_ID, EIP2930_TX_TYPE_ID, EIP4844_TX_TYPE_ID, EIP7702_TX_TYPE_ID,
LEGACY_TX_TYPE_ID,
};
use alloy_eips::{
eip1559::{ETHEREUM_BLOCK_GAS_LIMIT, MIN_PROTOCOL_BASE_FEE},
eip4844::BLOB_TX_MIN_BLOB_GASPRICE,
};
use alloy_primitives::{Address, TxHash, B256};
use rustc_hash::FxHashMap;
use smallvec::SmallVec;
use std::{
cmp::Ordering,
collections::{btree_map::Entry, hash_map, BTreeMap, HashMap, HashSet},
fmt,
ops::Bound::{Excluded, Unbounded},
sync::Arc,
};
use tracing::trace;
#[cfg_attr(doc, aquamarine::aquamarine)]
pub struct TxPool<T: TransactionOrdering> {
sender_info: FxHashMap<SenderId, SenderInfo>,
pending_pool: PendingPool<T>,
config: PoolConfig,
queued_pool: ParkedPool<QueuedOrd<T::Transaction>>,
basefee_pool: ParkedPool<BasefeeOrd<T::Transaction>>,
blob_pool: BlobTransactions<T::Transaction>,
all_transactions: AllTransactions<T::Transaction>,
metrics: TxPoolMetrics,
latest_update_kind: Option<PoolUpdateKind>,
}
impl<T: TransactionOrdering> TxPool<T> {
pub fn new(ordering: T, config: PoolConfig) -> Self {
Self {
sender_info: Default::default(),
pending_pool: PendingPool::new(ordering),
queued_pool: Default::default(),
basefee_pool: Default::default(),
blob_pool: Default::default(),
all_transactions: AllTransactions::new(&config),
config,
metrics: Default::default(),
latest_update_kind: None,
}
}
pub fn get_highest_nonce_by_sender(&self, sender: SenderId) -> Option<u64> {
self.all().txs_iter(sender).last().map(|(_, tx)| tx.transaction.nonce())
}
pub fn get_highest_transaction_by_sender(
&self,
sender: SenderId,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
self.all().txs_iter(sender).last().map(|(_, tx)| Arc::clone(&tx.transaction))
}
pub(crate) fn get_highest_consecutive_transaction_by_sender(
&self,
mut on_chain: TransactionId,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let mut last_consecutive_tx = None;
if let Some(current) = self.sender_info.get(&on_chain.sender) {
on_chain.nonce = on_chain.nonce.max(current.state_nonce);
}
let mut next_expected_nonce = on_chain.nonce;
for (id, tx) in self.all().descendant_txs_inclusive(&on_chain) {
if next_expected_nonce != id.nonce {
break
}
next_expected_nonce = id.next_nonce();
last_consecutive_tx = Some(tx);
}
last_consecutive_tx.map(|tx| Arc::clone(&tx.transaction))
}
pub(crate) const fn all(&self) -> &AllTransactions<T::Transaction> {
&self.all_transactions
}
pub(crate) fn unique_senders(&self) -> HashSet<Address> {
self.all_transactions.txs.values().map(|tx| tx.transaction.sender()).collect()
}
pub fn size(&self) -> PoolSize {
PoolSize {
pending: self.pending_pool.len(),
pending_size: self.pending_pool.size(),
basefee: self.basefee_pool.len(),
basefee_size: self.basefee_pool.size(),
queued: self.queued_pool.len(),
queued_size: self.queued_pool.size(),
blob: self.blob_pool.len(),
blob_size: self.blob_pool.size(),
total: self.all_transactions.len(),
}
}
pub const fn block_info(&self) -> BlockInfo {
BlockInfo {
block_gas_limit: self.all_transactions.block_gas_limit,
last_seen_block_hash: self.all_transactions.last_seen_block_hash,
last_seen_block_number: self.all_transactions.last_seen_block_number,
pending_basefee: self.all_transactions.pending_fees.base_fee,
pending_blob_fee: Some(self.all_transactions.pending_fees.blob_fee),
}
}
fn update_blob_fee(&mut self, mut pending_blob_fee: u128, base_fee_update: Ordering) {
std::mem::swap(&mut self.all_transactions.pending_fees.blob_fee, &mut pending_blob_fee);
match (self.all_transactions.pending_fees.blob_fee.cmp(&pending_blob_fee), base_fee_update)
{
(Ordering::Equal, Ordering::Equal | Ordering::Greater) => {
}
(Ordering::Greater, Ordering::Equal | Ordering::Greater) => {
let removed =
self.pending_pool.update_blob_fee(self.all_transactions.pending_fees.blob_fee);
for tx in removed {
let to = {
let tx =
self.all_transactions.txs.get_mut(tx.id()).expect("tx exists in set");
tx.state.remove(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK);
tx.subpool = tx.state.into();
tx.subpool
};
self.add_transaction_to_subpool(to, tx);
}
}
(Ordering::Less, _) | (_, Ordering::Less) => {
let removed =
self.blob_pool.enforce_pending_fees(&self.all_transactions.pending_fees);
for tx in removed {
let to = {
let tx =
self.all_transactions.txs.get_mut(tx.id()).expect("tx exists in set");
tx.state.insert(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK);
tx.state.insert(TxState::ENOUGH_FEE_CAP_BLOCK);
tx.subpool = tx.state.into();
tx.subpool
};
self.add_transaction_to_subpool(to, tx);
}
}
}
}
fn update_basefee(&mut self, mut pending_basefee: u64) -> Ordering {
std::mem::swap(&mut self.all_transactions.pending_fees.base_fee, &mut pending_basefee);
match self.all_transactions.pending_fees.base_fee.cmp(&pending_basefee) {
Ordering::Equal => {
Ordering::Equal
}
Ordering::Greater => {
let removed =
self.pending_pool.update_base_fee(self.all_transactions.pending_fees.base_fee);
for tx in removed {
let to = {
let tx =
self.all_transactions.txs.get_mut(tx.id()).expect("tx exists in set");
tx.state.remove(TxState::ENOUGH_FEE_CAP_BLOCK);
tx.subpool = tx.state.into();
tx.subpool
};
self.add_transaction_to_subpool(to, tx);
}
Ordering::Greater
}
Ordering::Less => {
let removed =
self.basefee_pool.enforce_basefee(self.all_transactions.pending_fees.base_fee);
for tx in removed {
let to = {
let tx =
self.all_transactions.txs.get_mut(tx.id()).expect("tx exists in set");
tx.state.insert(TxState::ENOUGH_FEE_CAP_BLOCK);
tx.subpool = tx.state.into();
tx.subpool
};
self.add_transaction_to_subpool(to, tx);
}
Ordering::Less
}
}
}
pub fn set_block_info(&mut self, info: BlockInfo) {
let BlockInfo {
block_gas_limit,
last_seen_block_hash,
last_seen_block_number,
pending_basefee,
pending_blob_fee,
} = info;
self.all_transactions.last_seen_block_hash = last_seen_block_hash;
self.all_transactions.last_seen_block_number = last_seen_block_number;
let basefee_ordering = self.update_basefee(pending_basefee);
self.all_transactions.block_gas_limit = block_gas_limit;
if let Some(blob_fee) = pending_blob_fee {
self.update_blob_fee(blob_fee, basefee_ordering)
}
}
pub(crate) fn best_transactions(&self) -> BestTransactions<T> {
self.pending_pool.best()
}
pub(crate) fn best_transactions_with_attributes(
&self,
best_transactions_attributes: BestTransactionsAttributes,
) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
{
match best_transactions_attributes.basefee.cmp(&self.all_transactions.pending_fees.base_fee)
{
Ordering::Equal => {
if best_transactions_attributes
.blob_fee
.is_some_and(|fee| fee < self.all_transactions.pending_fees.blob_fee as u64)
{
let unlocked_by_blob_fee =
self.blob_pool.satisfy_attributes(best_transactions_attributes);
Box::new(self.pending_pool.best_with_unlocked(
unlocked_by_blob_fee,
self.all_transactions.pending_fees.base_fee,
))
} else {
Box::new(self.pending_pool.best())
}
}
Ordering::Greater => {
Box::new(self.pending_pool.best_with_basefee_and_blobfee(
best_transactions_attributes.basefee,
best_transactions_attributes.blob_fee.unwrap_or_default(),
))
}
Ordering::Less => {
let mut unlocked = self
.basefee_pool
.satisfy_base_fee_transactions(best_transactions_attributes.basefee);
unlocked.extend(self.blob_pool.satisfy_attributes(best_transactions_attributes));
Box::new(
self.pending_pool
.best_with_unlocked(unlocked, self.all_transactions.pending_fees.base_fee),
)
}
}
}
pub(crate) fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
self.pending_pool.all().collect()
}
pub(crate) fn pending_transactions_iter(
&self,
) -> impl Iterator<Item = Arc<ValidPoolTransaction<T::Transaction>>> + '_ {
self.pending_pool.all()
}
pub(crate) fn pending_transactions_with_predicate(
&self,
mut predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
self.pending_transactions_iter().filter(|tx| predicate(tx)).collect()
}
pub(crate) fn pending_txs_by_sender(
&self,
sender: SenderId,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
self.pending_transactions_iter().filter(|tx| tx.sender_id() == sender).collect()
}
pub(crate) fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
self.basefee_pool.all().chain(self.queued_pool.all()).collect()
}
pub(crate) fn queued_transactions_iter(
&self,
) -> impl Iterator<Item = Arc<ValidPoolTransaction<T::Transaction>>> + '_ {
self.basefee_pool.all().chain(self.queued_pool.all())
}
pub fn queued_and_pending_txs_by_sender(
&self,
sender: SenderId,
) -> (SmallVec<[TransactionId; TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER]>, Vec<TransactionId>) {
(self.queued_pool.get_txs_by_sender(sender), self.pending_pool.get_txs_by_sender(sender))
}
pub(crate) fn queued_txs_by_sender(
&self,
sender: SenderId,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
self.queued_transactions_iter().filter(|tx| tx.sender_id() == sender).collect()
}
pub(crate) fn contains(&self, tx_hash: &TxHash) -> bool {
self.all_transactions.contains(tx_hash)
}
#[cfg(test)]
pub(crate) fn subpool_contains(&self, subpool: SubPool, id: &TransactionId) -> bool {
match subpool {
SubPool::Queued => self.queued_pool.contains(id),
SubPool::Pending => self.pending_pool.contains(id),
SubPool::BaseFee => self.basefee_pool.contains(id),
SubPool::Blob => self.blob_pool.contains(id),
}
}
#[inline]
pub(crate) fn is_exceeded(&self) -> bool {
self.config.is_exceeded(self.size())
}
pub(crate) fn get(
&self,
tx_hash: &TxHash,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
self.all_transactions.by_hash.get(tx_hash).cloned()
}
pub(crate) fn get_all(
&self,
txs: Vec<TxHash>,
) -> impl Iterator<Item = Arc<ValidPoolTransaction<T::Transaction>>> + '_ {
txs.into_iter().filter_map(|tx| self.get(&tx))
}
pub(crate) fn get_transactions_by_sender(
&self,
sender: SenderId,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
self.all_transactions.txs_iter(sender).map(|(_, tx)| Arc::clone(&tx.transaction)).collect()
}
pub(crate) fn update_accounts(
&mut self,
changed_senders: FxHashMap<SenderId, SenderInfo>,
) -> UpdateOutcome<T::Transaction> {
let updates = self.all_transactions.update(&changed_senders);
self.sender_info.extend(changed_senders);
let update = self.process_updates(updates);
self.update_size_metrics();
update
}
pub(crate) fn on_canonical_state_change(
&mut self,
block_info: BlockInfo,
mined_transactions: Vec<TxHash>,
changed_senders: FxHashMap<SenderId, SenderInfo>,
update_kind: PoolUpdateKind,
) -> OnNewCanonicalStateOutcome<T::Transaction> {
let block_hash = block_info.last_seen_block_hash;
self.all_transactions.set_block_info(block_info);
let mut removed_txs_count = 0;
for tx_hash in &mined_transactions {
if self.prune_transaction_by_hash(tx_hash).is_some() {
removed_txs_count += 1;
}
}
self.metrics.removed_transactions.increment(removed_txs_count);
let UpdateOutcome { promoted, discarded } = self.update_accounts(changed_senders);
self.update_transaction_type_metrics();
self.metrics.performed_state_updates.increment(1);
self.latest_update_kind = Some(update_kind);
OnNewCanonicalStateOutcome { block_hash, mined: mined_transactions, promoted, discarded }
}
pub(crate) fn update_size_metrics(&self) {
let stats = self.size();
self.metrics.pending_pool_transactions.set(stats.pending as f64);
self.metrics.pending_pool_size_bytes.set(stats.pending_size as f64);
self.metrics.basefee_pool_transactions.set(stats.basefee as f64);
self.metrics.basefee_pool_size_bytes.set(stats.basefee_size as f64);
self.metrics.queued_pool_transactions.set(stats.queued as f64);
self.metrics.queued_pool_size_bytes.set(stats.queued_size as f64);
self.metrics.blob_pool_transactions.set(stats.blob as f64);
self.metrics.blob_pool_size_bytes.set(stats.blob_size as f64);
self.metrics.total_transactions.set(stats.total as f64);
}
pub(crate) fn update_transaction_type_metrics(&self) {
let mut legacy_count = 0;
let mut eip2930_count = 0;
let mut eip1559_count = 0;
let mut eip4844_count = 0;
let mut eip7702_count = 0;
for tx in self.all_transactions.transactions_iter() {
match tx.transaction.tx_type() {
LEGACY_TX_TYPE_ID => legacy_count += 1,
EIP2930_TX_TYPE_ID => eip2930_count += 1,
EIP1559_TX_TYPE_ID => eip1559_count += 1,
EIP4844_TX_TYPE_ID => eip4844_count += 1,
EIP7702_TX_TYPE_ID => eip7702_count += 1,
_ => {} }
}
self.metrics.total_legacy_transactions.set(legacy_count as f64);
self.metrics.total_eip2930_transactions.set(eip2930_count as f64);
self.metrics.total_eip1559_transactions.set(eip1559_count as f64);
self.metrics.total_eip4844_transactions.set(eip4844_count as f64);
self.metrics.total_eip7702_transactions.set(eip7702_count as f64);
}
pub(crate) fn add_transaction(
&mut self,
tx: ValidPoolTransaction<T::Transaction>,
on_chain_balance: U256,
on_chain_nonce: u64,
) -> PoolResult<AddedTransaction<T::Transaction>> {
if self.contains(tx.hash()) {
return Err(PoolError::new(*tx.hash(), PoolErrorKind::AlreadyImported))
}
self.sender_info
.entry(tx.sender_id())
.or_default()
.update(on_chain_nonce, on_chain_balance);
match self.all_transactions.insert_tx(tx, on_chain_balance, on_chain_nonce) {
Ok(InsertOk { transaction, move_to, replaced_tx, updates, .. }) => {
self.add_new_transaction(transaction.clone(), replaced_tx.clone(), move_to);
self.metrics.inserted_transactions.increment(1);
let UpdateOutcome { promoted, discarded } = self.process_updates(updates);
let replaced = replaced_tx.map(|(tx, _)| tx);
let res = if move_to.is_pending() {
AddedTransaction::Pending(AddedPendingTransaction {
transaction,
promoted,
discarded,
replaced,
})
} else {
AddedTransaction::Parked { transaction, subpool: move_to, replaced }
};
self.update_size_metrics();
Ok(res)
}
Err(err) => {
self.metrics.invalid_transactions.increment(1);
match err {
InsertErr::Underpriced { existing: _, transaction } => Err(PoolError::new(
*transaction.hash(),
PoolErrorKind::ReplacementUnderpriced,
)),
InsertErr::FeeCapBelowMinimumProtocolFeeCap { transaction, fee_cap } => {
Err(PoolError::new(
*transaction.hash(),
PoolErrorKind::FeeCapBelowMinimumProtocolFeeCap(fee_cap),
))
}
InsertErr::ExceededSenderTransactionsCapacity { transaction } => {
Err(PoolError::new(
*transaction.hash(),
PoolErrorKind::SpammerExceededCapacity(transaction.sender()),
))
}
InsertErr::TxGasLimitMoreThanAvailableBlockGas {
transaction,
block_gas_limit,
tx_gas_limit,
} => Err(PoolError::new(
*transaction.hash(),
PoolErrorKind::InvalidTransaction(
InvalidPoolTransactionError::ExceedsGasLimit(
tx_gas_limit,
block_gas_limit,
),
),
)),
InsertErr::BlobTxHasNonceGap { transaction } => Err(PoolError::new(
*transaction.hash(),
PoolErrorKind::InvalidTransaction(
Eip4844PoolTransactionError::Eip4844NonceGap.into(),
),
)),
InsertErr::Overdraft { transaction } => Err(PoolError::new(
*transaction.hash(),
PoolErrorKind::InvalidTransaction(InvalidPoolTransactionError::Overdraft {
cost: *transaction.cost(),
balance: on_chain_balance,
}),
)),
InsertErr::TxTypeConflict { transaction } => Err(PoolError::new(
*transaction.hash(),
PoolErrorKind::ExistingConflictingTransactionType(
transaction.sender(),
transaction.tx_type(),
),
)),
}
}
}
}
fn process_updates(&mut self, updates: Vec<PoolUpdate>) -> UpdateOutcome<T::Transaction> {
let mut outcome = UpdateOutcome::default();
for PoolUpdate { id, hash, current, destination } in updates {
match destination {
Destination::Discard => {
if let Some(tx) = self.prune_transaction_by_hash(&hash) {
outcome.discarded.push(tx);
}
self.metrics.removed_transactions.increment(1);
}
Destination::Pool(move_to) => {
debug_assert_ne!(&move_to, ¤t, "destination must be different");
let moved = self.move_transaction(current, move_to, &id);
if matches!(move_to, SubPool::Pending) {
if let Some(tx) = moved {
trace!(target: "txpool", hash=%tx.transaction.hash(), "Promoted transaction to pending");
outcome.promoted.push(tx);
}
}
}
}
}
outcome
}
fn move_transaction(
&mut self,
from: SubPool,
to: SubPool,
id: &TransactionId,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let tx = self.remove_from_subpool(from, id)?;
self.add_transaction_to_subpool(to, tx.clone());
Some(tx)
}
pub(crate) fn remove_transactions(
&mut self,
hashes: Vec<TxHash>,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
let txs =
hashes.into_iter().filter_map(|hash| self.remove_transaction_by_hash(&hash)).collect();
self.update_size_metrics();
txs
}
pub(crate) fn remove_transactions_and_descendants(
&mut self,
hashes: Vec<TxHash>,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
let mut removed = Vec::new();
for hash in hashes {
if let Some(tx) = self.remove_transaction_by_hash(&hash) {
removed.push(tx.clone());
self.remove_descendants(tx.id(), &mut removed);
}
}
self.update_size_metrics();
removed
}
pub(crate) fn remove_transactions_by_sender(
&mut self,
sender_id: SenderId,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
let mut removed = Vec::new();
let txs = self.get_transactions_by_sender(sender_id);
for tx in txs {
if let Some(tx) = self.remove_transaction(tx.id()) {
removed.push(tx);
}
}
self.update_size_metrics();
removed
}
fn remove_transaction(
&mut self,
id: &TransactionId,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let (tx, pool) = self.all_transactions.remove_transaction(id)?;
self.remove_from_subpool(pool, tx.id())
}
fn remove_transaction_by_hash(
&mut self,
tx_hash: &B256,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let (tx, pool) = self.all_transactions.remove_transaction_by_hash(tx_hash)?;
self.remove_from_subpool(pool, tx.id())
}
fn prune_transaction_by_hash(
&mut self,
tx_hash: &B256,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let (tx, pool) = self.all_transactions.remove_transaction_by_hash(tx_hash)?;
self.prune_from_subpool(pool, tx.id())
}
fn remove_from_subpool(
&mut self,
pool: SubPool,
tx: &TransactionId,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let tx = match pool {
SubPool::Queued => self.queued_pool.remove_transaction(tx),
SubPool::Pending => self.pending_pool.remove_transaction(tx),
SubPool::BaseFee => self.basefee_pool.remove_transaction(tx),
SubPool::Blob => self.blob_pool.remove_transaction(tx),
};
if let Some(ref tx) = tx {
trace!(target: "txpool", hash=%tx.transaction.hash(), ?pool, "Removed transaction from a subpool");
}
tx
}
fn prune_from_subpool(
&mut self,
pool: SubPool,
tx: &TransactionId,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let tx = match pool {
SubPool::Pending => self.pending_pool.remove_transaction(tx),
SubPool::Queued => self.queued_pool.remove_transaction(tx),
SubPool::BaseFee => self.basefee_pool.remove_transaction(tx),
SubPool::Blob => self.blob_pool.remove_transaction(tx),
};
if let Some(ref tx) = tx {
trace!(target: "txpool", hash=%tx.transaction.hash(), ?pool, "Pruned transaction from a subpool");
}
tx
}
fn remove_descendants(
&mut self,
tx: &TransactionId,
removed: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
) {
let mut id = *tx;
loop {
let descendant =
self.all_transactions.descendant_txs_exclusive(&id).map(|(id, _)| *id).next();
if let Some(descendant) = descendant {
if let Some(tx) = self.remove_transaction(&descendant) {
removed.push(tx)
}
id = descendant;
} else {
return
}
}
}
fn add_transaction_to_subpool(
&mut self,
pool: SubPool,
tx: Arc<ValidPoolTransaction<T::Transaction>>,
) {
trace!(target: "txpool", hash=%tx.transaction.hash(), ?pool, "Adding transaction to a subpool");
match pool {
SubPool::Queued => self.queued_pool.add_transaction(tx),
SubPool::Pending => {
self.pending_pool.add_transaction(tx, self.all_transactions.pending_fees.base_fee);
}
SubPool::BaseFee => self.basefee_pool.add_transaction(tx),
SubPool::Blob => self.blob_pool.add_transaction(tx),
}
}
fn add_new_transaction(
&mut self,
transaction: Arc<ValidPoolTransaction<T::Transaction>>,
replaced: Option<(Arc<ValidPoolTransaction<T::Transaction>>, SubPool)>,
pool: SubPool,
) {
if let Some((replaced, replaced_pool)) = replaced {
self.remove_from_subpool(replaced_pool, replaced.id());
}
self.add_transaction_to_subpool(pool, transaction)
}
pub(crate) fn discard_worst(&mut self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
let mut removed = Vec::new();
macro_rules! discard_worst {
($this:ident, $removed:ident, [$($limit:ident => $pool:ident),* $(,)*]) => {
$ (
while $this.$pool.exceeds(&$this.config.$limit)
{
trace!(
target: "txpool",
"discarding transactions from {}, limit: {:?}, curr size: {}, curr len: {}",
stringify!($pool),
$this.config.$limit,
$this.$pool.size(),
$this.$pool.len(),
);
let removed_from_subpool = $this.$pool.truncate_pool($this.config.$limit.clone());
trace!(
target: "txpool",
"removed {} transactions from {}, limit: {:?}, curr size: {}, curr len: {}",
removed_from_subpool.len(),
stringify!($pool),
$this.config.$limit,
$this.$pool.size(),
$this.$pool.len()
);
for tx in removed_from_subpool {
$this.all_transactions.remove_transaction(tx.id());
let id = *tx.id();
removed.push(tx);
$this.remove_descendants(&id, &mut $removed);
}
}
)*
};
}
discard_worst!(
self, removed, [
pending_limit => pending_pool,
basefee_limit => basefee_pool,
blob_limit => blob_pool,
queued_limit => queued_pool,
]
);
removed
}
pub(crate) fn len(&self) -> usize {
self.all_transactions.len()
}
pub(crate) fn is_empty(&self) -> bool {
self.all_transactions.is_empty()
}
#[cfg(any(test, feature = "test-utils"))]
pub fn assert_invariants(&self) {
let size = self.size();
let actual = size.basefee + size.pending + size.queued + size.blob;
assert_eq!(size.total, actual, "total size must be equal to the sum of all sub-pools, basefee:{}, pending:{}, queued:{}, blob:{}", size.basefee, size.pending, size.queued, size.blob);
self.all_transactions.assert_invariants();
self.pending_pool.assert_invariants();
self.basefee_pool.assert_invariants();
self.queued_pool.assert_invariants();
self.blob_pool.assert_invariants();
}
}
#[cfg(any(test, feature = "test-utils"))]
impl TxPool<crate::test_utils::MockOrdering> {
pub fn mock() -> Self {
Self::new(crate::test_utils::MockOrdering::default(), PoolConfig::default())
}
}
#[cfg(test)]
impl<T: TransactionOrdering> Drop for TxPool<T> {
fn drop(&mut self) {
self.assert_invariants();
}
}
#[cfg(any(test, feature = "test-utils"))]
#[allow(dead_code)]
impl<T: TransactionOrdering> TxPool<T> {
pub(crate) const fn pending(&self) -> &PendingPool<T> {
&self.pending_pool
}
pub(crate) const fn base_fee(&self) -> &ParkedPool<BasefeeOrd<T::Transaction>> {
&self.basefee_pool
}
pub(crate) const fn queued(&self) -> &ParkedPool<QueuedOrd<T::Transaction>> {
&self.queued_pool
}
}
impl<T: TransactionOrdering> fmt::Debug for TxPool<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TxPool").field("config", &self.config).finish_non_exhaustive()
}
}
pub(crate) struct AllTransactions<T: PoolTransaction> {
minimal_protocol_basefee: u64,
block_gas_limit: u64,
max_account_slots: usize,
by_hash: HashMap<TxHash, Arc<ValidPoolTransaction<T>>>,
txs: BTreeMap<TransactionId, PoolInternalTransaction<T>>,
tx_counter: FxHashMap<SenderId, usize>,
last_seen_block_number: u64,
last_seen_block_hash: B256,
pending_fees: PendingFees,
price_bumps: PriceBumpConfig,
local_transactions_config: LocalTransactionConfig,
metrics: AllTransactionsMetrics,
}
impl<T: PoolTransaction> AllTransactions<T> {
fn new(config: &PoolConfig) -> Self {
Self {
max_account_slots: config.max_account_slots,
price_bumps: config.price_bumps,
local_transactions_config: config.local_transactions_config.clone(),
minimal_protocol_basefee: config.minimal_protocol_basefee,
block_gas_limit: config.gas_limit,
..Default::default()
}
}
#[allow(dead_code)]
pub(crate) fn hashes_iter(&self) -> impl Iterator<Item = TxHash> + '_ {
self.by_hash.keys().copied()
}
pub(crate) fn transactions_iter(
&self,
) -> impl Iterator<Item = &Arc<ValidPoolTransaction<T>>> + '_ {
self.by_hash.values()
}
pub(crate) fn contains(&self, tx_hash: &TxHash) -> bool {
self.by_hash.contains_key(tx_hash)
}
pub(crate) fn get(&self, id: &TransactionId) -> Option<&PoolInternalTransaction<T>> {
self.txs.get(id)
}
pub(crate) fn tx_inc(&mut self, sender: SenderId) {
let count = self.tx_counter.entry(sender).or_default();
*count += 1;
self.metrics.all_transactions_by_all_senders.increment(1.0);
}
pub(crate) fn tx_decr(&mut self, sender: SenderId) {
if let hash_map::Entry::Occupied(mut entry) = self.tx_counter.entry(sender) {
let count = entry.get_mut();
if *count == 1 {
entry.remove();
self.metrics.all_transactions_by_all_senders.decrement(1.0);
return
}
*count -= 1;
self.metrics.all_transactions_by_all_senders.decrement(1.0);
}
}
fn set_block_info(&mut self, block_info: BlockInfo) {
let BlockInfo {
block_gas_limit,
last_seen_block_hash,
last_seen_block_number,
pending_basefee,
pending_blob_fee,
} = block_info;
self.last_seen_block_number = last_seen_block_number;
self.last_seen_block_hash = last_seen_block_hash;
self.pending_fees.base_fee = pending_basefee;
self.metrics.base_fee.set(pending_basefee as f64);
self.block_gas_limit = block_gas_limit;
if let Some(pending_blob_fee) = pending_blob_fee {
self.pending_fees.blob_fee = pending_blob_fee;
self.metrics.blob_base_fee.set(pending_blob_fee as f64);
}
}
pub(crate) fn update_size_metrics(&self) {
self.metrics.all_transactions_by_hash.set(self.by_hash.len() as f64);
self.metrics.all_transactions_by_id.set(self.txs.len() as f64);
}
pub(crate) fn update(
&mut self,
changed_accounts: &FxHashMap<SenderId, SenderInfo>,
) -> Vec<PoolUpdate> {
let mut updates = Vec::with_capacity(64);
let mut iter = self.txs.iter_mut().peekable();
'transactions: while let Some((id, tx)) = iter.next() {
macro_rules! next_sender {
($iter:ident) => {
'this: while let Some((peek, _)) = iter.peek() {
if peek.sender != id.sender {
break 'this
}
iter.next();
}
};
}
let mut changed_balance = None;
if let Some(info) = changed_accounts.get(&id.sender) {
if id.nonce < info.state_nonce {
updates.push(PoolUpdate {
id: *tx.transaction.id(),
hash: *tx.transaction.hash(),
current: tx.subpool,
destination: Destination::Discard,
});
continue 'transactions
}
let ancestor = TransactionId::ancestor(id.nonce, info.state_nonce, id.sender);
if ancestor.is_none() {
tx.state.insert(TxState::NO_NONCE_GAPS);
tx.state.insert(TxState::NO_PARKED_ANCESTORS);
tx.cumulative_cost = U256::ZERO;
if tx.transaction.cost() > &info.balance {
tx.state.remove(TxState::ENOUGH_BALANCE);
} else {
tx.state.insert(TxState::ENOUGH_BALANCE);
}
}
changed_balance = Some(&info.balance);
}
if tx.state.has_nonce_gap() {
next_sender!(iter);
continue 'transactions
}
tx.state.insert(TxState::NO_PARKED_ANCESTORS);
Self::update_tx_base_fee(self.pending_fees.base_fee, tx);
Self::record_subpool_update(&mut updates, tx);
let mut has_parked_ancestor = !tx.state.is_pending();
let mut cumulative_cost = tx.next_cumulative_cost();
let mut next_nonce_in_line = tx.transaction.nonce().saturating_add(1);
while let Some((peek, ref mut tx)) = iter.peek_mut() {
if peek.sender != id.sender {
continue 'transactions
}
if tx.transaction.nonce() == next_nonce_in_line {
tx.state.insert(TxState::NO_NONCE_GAPS);
} else {
next_sender!(iter);
continue 'transactions
}
next_nonce_in_line = next_nonce_in_line.saturating_add(1);
tx.cumulative_cost = cumulative_cost;
cumulative_cost = tx.next_cumulative_cost();
if let Some(changed_balance) = changed_balance {
if &cumulative_cost > changed_balance {
tx.state.remove(TxState::ENOUGH_BALANCE);
} else {
tx.state.insert(TxState::ENOUGH_BALANCE);
}
}
if has_parked_ancestor {
tx.state.remove(TxState::NO_PARKED_ANCESTORS);
} else {
tx.state.insert(TxState::NO_PARKED_ANCESTORS);
}
has_parked_ancestor = !tx.state.is_pending();
Self::update_tx_base_fee(self.pending_fees.base_fee, tx);
Self::record_subpool_update(&mut updates, tx);
iter.next();
}
}
updates
}
fn record_subpool_update(updates: &mut Vec<PoolUpdate>, tx: &mut PoolInternalTransaction<T>) {
let current_pool = tx.subpool;
tx.subpool = tx.state.into();
if current_pool != tx.subpool {
updates.push(PoolUpdate {
id: *tx.transaction.id(),
hash: *tx.transaction.hash(),
current: current_pool,
destination: tx.subpool.into(),
})
}
}
fn update_tx_base_fee(pending_block_base_fee: u64, tx: &mut PoolInternalTransaction<T>) {
match tx.transaction.max_fee_per_gas().cmp(&(pending_block_base_fee as u128)) {
Ordering::Greater | Ordering::Equal => {
tx.state.insert(TxState::ENOUGH_FEE_CAP_BLOCK);
}
Ordering::Less => {
tx.state.remove(TxState::ENOUGH_FEE_CAP_BLOCK);
}
}
}
pub(crate) fn txs_iter(
&self,
sender: SenderId,
) -> impl Iterator<Item = (&TransactionId, &PoolInternalTransaction<T>)> + '_ {
self.txs
.range((sender.start_bound(), Unbounded))
.take_while(move |(other, _)| sender == other.sender)
}
#[cfg(test)]
#[allow(dead_code)]
pub(crate) fn txs_iter_mut(
&mut self,
sender: SenderId,
) -> impl Iterator<Item = (&TransactionId, &mut PoolInternalTransaction<T>)> + '_ {
self.txs
.range_mut((sender.start_bound(), Unbounded))
.take_while(move |(other, _)| sender == other.sender)
}
pub(crate) fn descendant_txs_exclusive<'a, 'b: 'a>(
&'a self,
id: &'b TransactionId,
) -> impl Iterator<Item = (&'a TransactionId, &'a PoolInternalTransaction<T>)> + 'a {
self.txs.range((Excluded(id), Unbounded)).take_while(|(other, _)| id.sender == other.sender)
}
pub(crate) fn descendant_txs_inclusive<'a, 'b: 'a>(
&'a self,
id: &'b TransactionId,
) -> impl Iterator<Item = (&'a TransactionId, &'a PoolInternalTransaction<T>)> + 'a {
self.txs.range(id..).take_while(|(other, _)| id.sender == other.sender)
}
pub(crate) fn descendant_txs_mut<'a, 'b: 'a>(
&'a mut self,
id: &'b TransactionId,
) -> impl Iterator<Item = (&'a TransactionId, &'a mut PoolInternalTransaction<T>)> + 'a {
self.txs.range_mut(id..).take_while(|(other, _)| id.sender == other.sender)
}
pub(crate) fn remove_transaction_by_hash(
&mut self,
tx_hash: &B256,
) -> Option<(Arc<ValidPoolTransaction<T>>, SubPool)> {
let tx = self.by_hash.remove(tx_hash)?;
let internal = self.txs.remove(&tx.transaction_id)?;
self.tx_decr(tx.sender_id());
self.update_size_metrics();
Some((tx, internal.subpool))
}
pub(crate) fn remove_transaction(
&mut self,
id: &TransactionId,
) -> Option<(Arc<ValidPoolTransaction<T>>, SubPool)> {
let internal = self.txs.remove(id)?;
self.tx_decr(internal.transaction.sender_id());
let result =
self.by_hash.remove(internal.transaction.hash()).map(|tx| (tx, internal.subpool));
self.update_size_metrics();
result
}
#[inline]
fn contains_conflicting_transaction(&self, tx: &ValidPoolTransaction<T>) -> bool {
self.txs_iter(tx.transaction_id.sender)
.next()
.is_some_and(|(_, existing)| tx.tx_type_conflicts_with(&existing.transaction))
}
fn ensure_valid(
&self,
transaction: ValidPoolTransaction<T>,
on_chain_nonce: u64,
) -> Result<ValidPoolTransaction<T>, InsertErr<T>> {
if !self.local_transactions_config.is_local(transaction.origin, transaction.sender_ref()) {
let current_txs =
self.tx_counter.get(&transaction.sender_id()).copied().unwrap_or_default();
if current_txs >= self.max_account_slots && transaction.nonce() > on_chain_nonce {
return Err(InsertErr::ExceededSenderTransactionsCapacity {
transaction: Arc::new(transaction),
})
}
}
if transaction.gas_limit() > self.block_gas_limit {
return Err(InsertErr::TxGasLimitMoreThanAvailableBlockGas {
block_gas_limit: self.block_gas_limit,
tx_gas_limit: transaction.gas_limit(),
transaction: Arc::new(transaction),
})
}
if self.contains_conflicting_transaction(&transaction) {
return Err(InsertErr::TxTypeConflict { transaction: Arc::new(transaction) })
}
Ok(transaction)
}
fn ensure_valid_blob_transaction(
&self,
new_blob_tx: ValidPoolTransaction<T>,
on_chain_balance: U256,
ancestor: Option<TransactionId>,
) -> Result<ValidPoolTransaction<T>, InsertErr<T>> {
if let Some(ancestor) = ancestor {
let Some(ancestor_tx) = self.txs.get(&ancestor) else {
self.metrics.blob_transactions_nonce_gaps.increment(1);
return Err(InsertErr::BlobTxHasNonceGap { transaction: Arc::new(new_blob_tx) })
};
if ancestor_tx.state.has_nonce_gap() {
self.metrics.blob_transactions_nonce_gaps.increment(1);
return Err(InsertErr::BlobTxHasNonceGap { transaction: Arc::new(new_blob_tx) })
}
let mut cumulative_cost = ancestor_tx.next_cumulative_cost() + new_blob_tx.cost();
if cumulative_cost > on_chain_balance {
return Err(InsertErr::Overdraft { transaction: Arc::new(new_blob_tx) })
}
let id = new_blob_tx.transaction_id;
let mut descendants = self.descendant_txs_inclusive(&id).peekable();
if let Some((maybe_replacement, _)) = descendants.peek() {
if **maybe_replacement == new_blob_tx.transaction_id {
descendants.next();
for (_, tx) in descendants {
cumulative_cost += tx.transaction.cost();
if tx.transaction.is_eip4844() && cumulative_cost > on_chain_balance {
return Err(InsertErr::Overdraft { transaction: Arc::new(new_blob_tx) })
}
}
}
}
} else if new_blob_tx.cost() > &on_chain_balance {
return Err(InsertErr::Overdraft { transaction: Arc::new(new_blob_tx) })
}
Ok(new_blob_tx)
}
pub(crate) fn insert_tx(
&mut self,
transaction: ValidPoolTransaction<T>,
on_chain_balance: U256,
on_chain_nonce: u64,
) -> InsertResult<T> {
assert!(on_chain_nonce <= transaction.nonce(), "Invalid transaction");
let mut transaction = self.ensure_valid(transaction, on_chain_nonce)?;
let inserted_tx_id = *transaction.id();
let mut state = TxState::default();
let mut cumulative_cost = U256::ZERO;
let mut updates = Vec::new();
state.insert(TxState::NOT_TOO_MUCH_GAS);
let ancestor = TransactionId::ancestor(
transaction.transaction.nonce(),
on_chain_nonce,
inserted_tx_id.sender,
);
if transaction.is_eip4844() {
state.insert(TxState::BLOB_TRANSACTION);
transaction =
self.ensure_valid_blob_transaction(transaction, on_chain_balance, ancestor)?;
let blob_fee_cap = transaction.transaction.max_fee_per_blob_gas().unwrap_or_default();
if blob_fee_cap >= self.pending_fees.blob_fee {
state.insert(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK);
}
} else {
state.insert(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK);
}
let transaction = Arc::new(transaction);
if ancestor.is_none() {
state.insert(TxState::NO_NONCE_GAPS);
state.insert(TxState::NO_PARKED_ANCESTORS);
}
let fee_cap = transaction.max_fee_per_gas();
if fee_cap < self.minimal_protocol_basefee as u128 {
return Err(InsertErr::FeeCapBelowMinimumProtocolFeeCap { transaction, fee_cap })
}
if fee_cap >= self.pending_fees.base_fee as u128 {
state.insert(TxState::ENOUGH_FEE_CAP_BLOCK);
}
let mut replaced_tx = None;
let pool_tx = PoolInternalTransaction {
transaction: Arc::clone(&transaction),
subpool: state.into(),
state,
cumulative_cost,
};
match self.txs.entry(*transaction.id()) {
Entry::Vacant(entry) => {
self.by_hash.insert(*pool_tx.transaction.hash(), pool_tx.transaction.clone());
entry.insert(pool_tx);
}
Entry::Occupied(mut entry) => {
let existing_transaction = entry.get().transaction.as_ref();
let maybe_replacement = transaction.as_ref();
if existing_transaction.is_underpriced(maybe_replacement, &self.price_bumps) {
return Err(InsertErr::Underpriced {
transaction: pool_tx.transaction,
existing: *entry.get().transaction.hash(),
})
}
let new_hash = *pool_tx.transaction.hash();
let new_transaction = pool_tx.transaction.clone();
let replaced = entry.insert(pool_tx);
self.by_hash.remove(replaced.transaction.hash());
self.by_hash.insert(new_hash, new_transaction);
replaced_tx = Some((replaced.transaction, replaced.subpool));
}
}
let on_chain_id = TransactionId::new(transaction.sender_id(), on_chain_nonce);
{
let mut next_nonce = on_chain_id.nonce;
let mut has_parked_ancestor = false;
for (id, tx) in self.descendant_txs_mut(&on_chain_id) {
let current_pool = tx.subpool;
if next_nonce != id.nonce {
break
}
tx.state.insert(TxState::NO_NONCE_GAPS);
tx.cumulative_cost = cumulative_cost;
cumulative_cost = tx.next_cumulative_cost();
if cumulative_cost > on_chain_balance {
tx.state.remove(TxState::ENOUGH_BALANCE);
} else {
tx.state.insert(TxState::ENOUGH_BALANCE);
}
if has_parked_ancestor {
tx.state.remove(TxState::NO_PARKED_ANCESTORS);
} else {
tx.state.insert(TxState::NO_PARKED_ANCESTORS);
}
has_parked_ancestor = !tx.state.is_pending();
tx.subpool = tx.state.into();
if inserted_tx_id.eq(id) {
state = tx.state;
} else {
if current_pool != tx.subpool {
updates.push(PoolUpdate {
id: *id,
hash: *tx.transaction.hash(),
current: current_pool,
destination: tx.subpool.into(),
})
}
}
next_nonce = id.next_nonce();
}
}
if replaced_tx.is_none() {
self.tx_inc(inserted_tx_id.sender);
}
self.update_size_metrics();
Ok(InsertOk { transaction, move_to: state.into(), state, replaced_tx, updates })
}
pub(crate) fn len(&self) -> usize {
self.txs.len()
}
pub(crate) fn is_empty(&self) -> bool {
self.txs.is_empty()
}
#[cfg(any(test, feature = "test-utils"))]
pub(crate) fn assert_invariants(&self) {
assert_eq!(self.by_hash.len(), self.txs.len(), "by_hash.len() != txs.len()");
}
}
#[cfg(test)]
impl<T: PoolTransaction> AllTransactions<T> {
pub(crate) fn tx_count(&self, sender: SenderId) -> usize {
self.tx_counter.get(&sender).copied().unwrap_or_default()
}
}
impl<T: PoolTransaction> Default for AllTransactions<T> {
fn default() -> Self {
Self {
max_account_slots: TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER,
minimal_protocol_basefee: MIN_PROTOCOL_BASE_FEE,
block_gas_limit: ETHEREUM_BLOCK_GAS_LIMIT,
by_hash: Default::default(),
txs: Default::default(),
tx_counter: Default::default(),
last_seen_block_number: Default::default(),
last_seen_block_hash: Default::default(),
pending_fees: Default::default(),
price_bumps: Default::default(),
local_transactions_config: Default::default(),
metrics: Default::default(),
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct PendingFees {
pub(crate) base_fee: u64,
pub(crate) blob_fee: u128,
}
impl Default for PendingFees {
fn default() -> Self {
Self { base_fee: Default::default(), blob_fee: BLOB_TX_MIN_BLOB_GASPRICE }
}
}
pub(crate) type InsertResult<T> = Result<InsertOk<T>, InsertErr<T>>;
#[derive(Debug)]
pub(crate) enum InsertErr<T: PoolTransaction> {
Underpriced {
transaction: Arc<ValidPoolTransaction<T>>,
#[allow(dead_code)]
existing: TxHash,
},
BlobTxHasNonceGap { transaction: Arc<ValidPoolTransaction<T>> },
Overdraft { transaction: Arc<ValidPoolTransaction<T>> },
FeeCapBelowMinimumProtocolFeeCap { transaction: Arc<ValidPoolTransaction<T>>, fee_cap: u128 },
ExceededSenderTransactionsCapacity { transaction: Arc<ValidPoolTransaction<T>> },
TxGasLimitMoreThanAvailableBlockGas {
transaction: Arc<ValidPoolTransaction<T>>,
block_gas_limit: u64,
tx_gas_limit: u64,
},
TxTypeConflict { transaction: Arc<ValidPoolTransaction<T>> },
}
#[derive(Debug)]
pub(crate) struct InsertOk<T: PoolTransaction> {
transaction: Arc<ValidPoolTransaction<T>>,
move_to: SubPool,
#[allow(dead_code)]
state: TxState,
replaced_tx: Option<(Arc<ValidPoolTransaction<T>>, SubPool)>,
updates: Vec<PoolUpdate>,
}
#[derive(Debug)]
pub(crate) struct PoolInternalTransaction<T: PoolTransaction> {
pub(crate) transaction: Arc<ValidPoolTransaction<T>>,
pub(crate) subpool: SubPool,
pub(crate) state: TxState,
pub(crate) cumulative_cost: U256,
}
impl<T: PoolTransaction> PoolInternalTransaction<T> {
fn next_cumulative_cost(&self) -> U256 {
self.cumulative_cost + self.transaction.cost()
}
}
#[derive(Debug)]
pub(crate) struct UpdateOutcome<T: PoolTransaction> {
pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
}
impl<T: PoolTransaction> Default for UpdateOutcome<T> {
fn default() -> Self {
Self { promoted: vec![], discarded: vec![] }
}
}
#[derive(Debug, Clone, Default)]
pub(crate) struct SenderInfo {
pub(crate) state_nonce: u64,
pub(crate) balance: U256,
}
impl SenderInfo {
fn update(&mut self, state_nonce: u64, balance: U256) {
*self = Self { state_nonce, balance };
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
test_utils::{MockOrdering, MockTransaction, MockTransactionFactory, MockTransactionSet},
traits::TransactionOrigin,
SubPoolLimit,
};
use alloy_primitives::address;
use reth_primitives::TxType;
#[test]
fn test_insert_blob() {
let on_chain_balance = U256::MAX;
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = AllTransactions::default();
let tx = MockTransaction::eip4844().inc_price().inc_limit();
let valid_tx = f.validated(tx);
let InsertOk { updates, replaced_tx, move_to, state, .. } =
pool.insert_tx(valid_tx.clone(), on_chain_balance, on_chain_nonce).unwrap();
assert!(updates.is_empty());
assert!(replaced_tx.is_none());
assert!(state.contains(TxState::NO_NONCE_GAPS));
assert!(state.contains(TxState::ENOUGH_BALANCE));
assert!(state.contains(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK));
assert_eq!(move_to, SubPool::Pending);
let inserted = pool.txs.get(&valid_tx.transaction_id).unwrap();
assert_eq!(inserted.subpool, SubPool::Pending);
}
#[test]
fn test_insert_blob_not_enough_blob_fee() {
let on_chain_balance = U256::MAX;
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = AllTransactions {
pending_fees: PendingFees { blob_fee: 10_000_000, ..Default::default() },
..Default::default()
};
let tx = MockTransaction::eip4844().inc_price().inc_limit();
pool.pending_fees.blob_fee = tx.max_fee_per_blob_gas().unwrap() + 1;
let valid_tx = f.validated(tx);
let InsertOk { state, .. } =
pool.insert_tx(valid_tx.clone(), on_chain_balance, on_chain_nonce).unwrap();
assert!(state.contains(TxState::NO_NONCE_GAPS));
assert!(!state.contains(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK));
let _ = pool.txs.get(&valid_tx.transaction_id).unwrap();
}
#[test]
fn test_valid_tx_with_decreasing_blob_fee() {
let on_chain_balance = U256::MAX;
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = AllTransactions {
pending_fees: PendingFees { blob_fee: 10_000_000, ..Default::default() },
..Default::default()
};
let tx = MockTransaction::eip4844().inc_price().inc_limit();
pool.pending_fees.blob_fee = tx.max_fee_per_blob_gas().unwrap() + 1;
let valid_tx = f.validated(tx.clone());
let InsertOk { state, .. } =
pool.insert_tx(valid_tx.clone(), on_chain_balance, on_chain_nonce).unwrap();
assert!(state.contains(TxState::NO_NONCE_GAPS));
assert!(!state.contains(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK));
let _ = pool.txs.get(&valid_tx.transaction_id).unwrap();
pool.remove_transaction(&valid_tx.transaction_id);
pool.pending_fees.blob_fee = tx.max_fee_per_blob_gas().unwrap();
let InsertOk { state, .. } =
pool.insert_tx(valid_tx.clone(), on_chain_balance, on_chain_nonce).unwrap();
assert!(state.contains(TxState::NO_NONCE_GAPS));
assert!(state.contains(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK));
}
#[test]
fn test_demote_valid_tx_with_increasing_blob_fee() {
let on_chain_balance = U256::MAX;
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = TxPool::new(MockOrdering::default(), Default::default());
let tx = MockTransaction::eip4844().inc_price().inc_limit();
let mut block_info = pool.block_info();
block_info.pending_blob_fee = Some(tx.max_fee_per_blob_gas().unwrap());
pool.set_block_info(block_info);
let validated = f.validated(tx.clone());
let id = *validated.id();
pool.add_transaction(validated, on_chain_balance, on_chain_nonce).unwrap();
assert!(pool.blob_pool.is_empty());
assert_eq!(pool.pending_pool.len(), 1);
let internal_tx = pool.all_transactions.txs.get(&id).unwrap();
assert!(internal_tx.state.contains(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK));
assert_eq!(internal_tx.subpool, SubPool::Pending);
block_info.pending_blob_fee = Some(tx.max_fee_per_blob_gas().unwrap() + 1);
pool.set_block_info(block_info);
let internal_tx = pool.all_transactions.txs.get(&id).unwrap();
assert!(!internal_tx.state.contains(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK));
assert_eq!(internal_tx.subpool, SubPool::Blob);
assert_eq!(pool.blob_pool.len(), 1);
assert!(pool.pending_pool.is_empty());
}
#[test]
fn test_promote_valid_tx_with_decreasing_blob_fee() {
let on_chain_balance = U256::MAX;
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = TxPool::new(MockOrdering::default(), Default::default());
let tx = MockTransaction::eip4844().inc_price().inc_limit();
let mut block_info = pool.block_info();
block_info.pending_blob_fee = Some(tx.max_fee_per_blob_gas().unwrap() + 1);
pool.set_block_info(block_info);
let validated = f.validated(tx.clone());
let id = *validated.id();
pool.add_transaction(validated, on_chain_balance, on_chain_nonce).unwrap();
assert!(pool.pending_pool.is_empty());
assert_eq!(pool.blob_pool.len(), 1);
let internal_tx = pool.all_transactions.txs.get(&id).unwrap();
assert!(!internal_tx.state.contains(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK));
assert_eq!(internal_tx.subpool, SubPool::Blob);
block_info.pending_blob_fee = Some(tx.max_fee_per_blob_gas().unwrap());
pool.set_block_info(block_info);
let internal_tx = pool.all_transactions.txs.get(&id).unwrap();
assert!(internal_tx.state.contains(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK));
assert_eq!(internal_tx.subpool, SubPool::Pending);
assert_eq!(pool.pending_pool.len(), 1);
assert!(pool.blob_pool.is_empty());
}
#[derive(Debug, PartialEq, Eq, Clone, Hash)]
struct PromotionTest {
basefee: u64,
blobfee: u128,
subpool: SubPool,
basefee_update: u64,
blobfee_update: u128,
new_subpool: SubPool,
}
impl PromotionTest {
const fn opposite(&self) -> Self {
Self {
basefee: self.basefee_update,
blobfee: self.blobfee_update,
subpool: self.new_subpool,
blobfee_update: self.blobfee,
basefee_update: self.basefee,
new_subpool: self.subpool,
}
}
fn assert_subpool_lengths<T: TransactionOrdering>(
&self,
pool: &TxPool<T>,
failure_message: String,
check_subpool: SubPool,
) {
match check_subpool {
SubPool::Blob => {
assert_eq!(pool.blob_pool.len(), 1, "{failure_message}");
assert!(pool.pending_pool.is_empty(), "{failure_message}");
assert!(pool.basefee_pool.is_empty(), "{failure_message}");
assert!(pool.queued_pool.is_empty(), "{failure_message}");
}
SubPool::Pending => {
assert!(pool.blob_pool.is_empty(), "{failure_message}");
assert_eq!(pool.pending_pool.len(), 1, "{failure_message}");
assert!(pool.basefee_pool.is_empty(), "{failure_message}");
assert!(pool.queued_pool.is_empty(), "{failure_message}");
}
SubPool::BaseFee => {
assert!(pool.blob_pool.is_empty(), "{failure_message}");
assert!(pool.pending_pool.is_empty(), "{failure_message}");
assert_eq!(pool.basefee_pool.len(), 1, "{failure_message}");
assert!(pool.queued_pool.is_empty(), "{failure_message}");
}
SubPool::Queued => {
assert!(pool.blob_pool.is_empty(), "{failure_message}");
assert!(pool.pending_pool.is_empty(), "{failure_message}");
assert!(pool.basefee_pool.is_empty(), "{failure_message}");
assert_eq!(pool.queued_pool.len(), 1, "{failure_message}");
}
}
}
fn assert_single_tx_starting_subpool<T: TransactionOrdering>(&self, pool: &TxPool<T>) {
self.assert_subpool_lengths(
pool,
format!("pool length check failed at start of test: {self:?}"),
self.subpool,
);
}
fn assert_single_tx_ending_subpool<T: TransactionOrdering>(&self, pool: &TxPool<T>) {
self.assert_subpool_lengths(
pool,
format!("pool length check failed at end of test: {self:?}"),
self.new_subpool,
);
}
}
#[test]
fn test_promote_blob_tx_with_both_pending_fee_updates() {
let on_chain_balance = U256::MAX;
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let tx = MockTransaction::eip4844().inc_price().inc_limit();
let max_fee_per_blob_gas = tx.max_fee_per_blob_gas().unwrap();
let max_fee_per_gas = tx.max_fee_per_gas() as u64;
let mut expected_promotions = vec![
PromotionTest {
blobfee: max_fee_per_blob_gas + 1,
basefee: max_fee_per_gas + 1,
subpool: SubPool::Blob,
blobfee_update: max_fee_per_blob_gas + 1,
basefee_update: max_fee_per_gas + 1,
new_subpool: SubPool::Blob,
},
PromotionTest {
blobfee: max_fee_per_blob_gas + 1,
basefee: max_fee_per_gas + 1,
subpool: SubPool::Blob,
blobfee_update: max_fee_per_blob_gas,
basefee_update: max_fee_per_gas + 1,
new_subpool: SubPool::Blob,
},
PromotionTest {
blobfee: max_fee_per_blob_gas + 1,
basefee: max_fee_per_gas + 1,
subpool: SubPool::Blob,
blobfee_update: max_fee_per_blob_gas + 1,
basefee_update: max_fee_per_gas,
new_subpool: SubPool::Blob,
},
PromotionTest {
blobfee: max_fee_per_blob_gas + 1,
basefee: max_fee_per_gas + 1,
subpool: SubPool::Blob,
blobfee_update: max_fee_per_blob_gas,
basefee_update: max_fee_per_gas,
new_subpool: SubPool::Pending,
},
PromotionTest {
blobfee: max_fee_per_blob_gas,
basefee: max_fee_per_gas + 1,
subpool: SubPool::Blob,
blobfee_update: max_fee_per_blob_gas,
basefee_update: max_fee_per_gas,
new_subpool: SubPool::Pending,
},
PromotionTest {
blobfee: max_fee_per_blob_gas + 1,
basefee: max_fee_per_gas,
subpool: SubPool::Blob,
blobfee_update: max_fee_per_blob_gas,
basefee_update: max_fee_per_gas,
new_subpool: SubPool::Pending,
},
PromotionTest {
blobfee: max_fee_per_blob_gas,
basefee: max_fee_per_gas,
subpool: SubPool::Pending,
blobfee_update: max_fee_per_blob_gas,
basefee_update: max_fee_per_gas,
new_subpool: SubPool::Pending,
},
];
let reversed = expected_promotions.iter().map(|test| test.opposite()).collect::<Vec<_>>();
expected_promotions.extend(reversed);
let expected_promotions = expected_promotions.into_iter().collect::<HashSet<_>>();
for promotion_test in &expected_promotions {
let mut pool = TxPool::new(MockOrdering::default(), Default::default());
let mut block_info = pool.block_info();
block_info.pending_blob_fee = Some(promotion_test.blobfee);
block_info.pending_basefee = promotion_test.basefee;
pool.set_block_info(block_info);
let validated = f.validated(tx.clone());
let id = *validated.id();
pool.add_transaction(validated, on_chain_balance, on_chain_nonce).unwrap();
promotion_test.assert_single_tx_starting_subpool(&pool);
let internal_tx = pool.all_transactions.txs.get(&id).unwrap();
assert_eq!(
internal_tx.subpool, promotion_test.subpool,
"Subpools do not match at start of test: {promotion_test:?}"
);
block_info.pending_basefee = promotion_test.basefee_update;
block_info.pending_blob_fee = Some(promotion_test.blobfee_update);
pool.set_block_info(block_info);
let internal_tx = pool.all_transactions.txs.get(&id).unwrap();
assert_eq!(
internal_tx.subpool, promotion_test.new_subpool,
"Subpools do not match at end of test: {promotion_test:?}"
);
promotion_test.assert_single_tx_ending_subpool(&pool);
}
}
#[test]
fn test_insert_pending() {
let on_chain_balance = U256::MAX;
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = AllTransactions::default();
let tx = MockTransaction::eip1559().inc_price().inc_limit();
let valid_tx = f.validated(tx);
let InsertOk { updates, replaced_tx, move_to, state, .. } =
pool.insert_tx(valid_tx.clone(), on_chain_balance, on_chain_nonce).unwrap();
assert!(updates.is_empty());
assert!(replaced_tx.is_none());
assert!(state.contains(TxState::NO_NONCE_GAPS));
assert!(state.contains(TxState::ENOUGH_BALANCE));
assert_eq!(move_to, SubPool::Pending);
let inserted = pool.txs.get(&valid_tx.transaction_id).unwrap();
assert_eq!(inserted.subpool, SubPool::Pending);
}
#[test]
fn test_simple_insert() {
let on_chain_balance = U256::ZERO;
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = AllTransactions::default();
let mut tx = MockTransaction::eip1559().inc_price().inc_limit();
tx.set_priority_fee(100);
tx.set_max_fee(100);
let valid_tx = f.validated(tx.clone());
let InsertOk { updates, replaced_tx, move_to, state, .. } =
pool.insert_tx(valid_tx.clone(), on_chain_balance, on_chain_nonce).unwrap();
assert!(updates.is_empty());
assert!(replaced_tx.is_none());
assert!(state.contains(TxState::NO_NONCE_GAPS));
assert!(!state.contains(TxState::ENOUGH_BALANCE));
assert_eq!(move_to, SubPool::Queued);
assert_eq!(pool.len(), 1);
assert!(pool.contains(valid_tx.hash()));
let expected_state = TxState::ENOUGH_FEE_CAP_BLOCK | TxState::NO_NONCE_GAPS;
let inserted = pool.get(valid_tx.id()).unwrap();
assert!(inserted.state.intersects(expected_state));
let res = pool.insert_tx(valid_tx, on_chain_balance, on_chain_nonce);
res.unwrap_err();
assert_eq!(pool.len(), 1);
let valid_tx = f.validated(tx.next());
let InsertOk { updates, replaced_tx, move_to, state, .. } =
pool.insert_tx(valid_tx.clone(), on_chain_balance, on_chain_nonce).unwrap();
assert!(updates.is_empty());
assert!(replaced_tx.is_none());
assert!(state.contains(TxState::NO_NONCE_GAPS));
assert!(!state.contains(TxState::ENOUGH_BALANCE));
assert_eq!(move_to, SubPool::Queued);
assert!(pool.contains(valid_tx.hash()));
assert_eq!(pool.len(), 2);
let inserted = pool.get(valid_tx.id()).unwrap();
assert!(inserted.state.intersects(expected_state));
}
#[test]
fn insert_already_imported() {
let on_chain_balance = U256::ZERO;
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = TxPool::new(MockOrdering::default(), Default::default());
let tx = MockTransaction::eip1559().inc_price().inc_limit();
let tx = f.validated(tx);
pool.add_transaction(tx.clone(), on_chain_balance, on_chain_nonce).unwrap();
match pool.add_transaction(tx, on_chain_balance, on_chain_nonce).unwrap_err().kind {
PoolErrorKind::AlreadyImported => {}
_ => unreachable!(),
}
}
#[test]
fn insert_replace() {
let on_chain_balance = U256::ZERO;
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = AllTransactions::default();
let tx = MockTransaction::eip1559().inc_price().inc_limit();
let first = f.validated(tx.clone());
let _ = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce).unwrap();
let replacement = f.validated(tx.rng_hash().inc_price());
let InsertOk { updates, replaced_tx, .. } =
pool.insert_tx(replacement.clone(), on_chain_balance, on_chain_nonce).unwrap();
assert!(updates.is_empty());
let replaced = replaced_tx.unwrap();
assert_eq!(replaced.0.hash(), first.hash());
assert!(!pool.contains(first.hash()));
assert!(pool.contains(replacement.hash()));
assert_eq!(pool.len(), 1);
}
#[test]
fn insert_replace_txpool() {
let on_chain_balance = U256::ZERO;
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = TxPool::mock();
let tx = MockTransaction::eip1559().inc_price().inc_limit();
let first = f.validated(tx.clone());
let first_added = pool.add_transaction(first, on_chain_balance, on_chain_nonce).unwrap();
let replacement = f.validated(tx.rng_hash().inc_price());
let replacement_added =
pool.add_transaction(replacement.clone(), on_chain_balance, on_chain_nonce).unwrap();
assert!(!pool.contains(first_added.hash()));
assert!(pool.subpool_contains(replacement_added.subpool(), replacement_added.id()));
assert!(pool.contains(replacement.hash()));
let size = pool.size();
assert_eq!(size.total, 1);
size.assert_invariants();
}
#[test]
fn insert_replace_underpriced() {
let on_chain_balance = U256::ZERO;
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = AllTransactions::default();
let tx = MockTransaction::eip1559().inc_price().inc_limit();
let first = f.validated(tx.clone());
let _res = pool.insert_tx(first, on_chain_balance, on_chain_nonce);
let mut replacement = f.validated(tx.rng_hash());
replacement.transaction = replacement.transaction.decr_price();
let err = pool.insert_tx(replacement, on_chain_balance, on_chain_nonce).unwrap_err();
assert!(matches!(err, InsertErr::Underpriced { .. }));
}
#[test]
fn insert_replace_underpriced_not_enough_bump() {
let on_chain_balance = U256::ZERO;
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = AllTransactions::default();
let mut tx = MockTransaction::eip1559().inc_price().inc_limit();
tx.set_priority_fee(100);
tx.set_max_fee(100);
let first = f.validated(tx.clone());
let _ = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce).unwrap();
let mut replacement = f.validated(tx.rng_hash().inc_price());
replacement.transaction.set_priority_fee(109);
replacement.transaction.set_max_fee(109);
let err =
pool.insert_tx(replacement.clone(), on_chain_balance, on_chain_nonce).unwrap_err();
assert!(matches!(err, InsertErr::Underpriced { .. }));
assert!(pool.contains(first.hash()));
assert_eq!(pool.len(), 1);
replacement.transaction.set_priority_fee(110);
replacement.transaction.set_max_fee(109);
let err =
pool.insert_tx(replacement.clone(), on_chain_balance, on_chain_nonce).unwrap_err();
assert!(matches!(err, InsertErr::Underpriced { .. }));
assert!(pool.contains(first.hash()));
assert_eq!(pool.len(), 1);
replacement.transaction.set_priority_fee(109);
replacement.transaction.set_max_fee(110);
let err = pool.insert_tx(replacement, on_chain_balance, on_chain_nonce).unwrap_err();
assert!(matches!(err, InsertErr::Underpriced { .. }));
assert!(pool.contains(first.hash()));
assert_eq!(pool.len(), 1);
}
#[test]
fn insert_conflicting_type_normal_to_blob() {
let on_chain_balance = U256::from(10_000);
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = AllTransactions::default();
let tx = MockTransaction::eip1559().inc_price().inc_limit();
let first = f.validated(tx.clone());
pool.insert_tx(first, on_chain_balance, on_chain_nonce).unwrap();
let tx = MockTransaction::eip4844().set_sender(tx.sender()).inc_price_by(100).inc_limit();
let blob = f.validated(tx);
let err = pool.insert_tx(blob, on_chain_balance, on_chain_nonce).unwrap_err();
assert!(matches!(err, InsertErr::TxTypeConflict { .. }), "{err:?}");
}
#[test]
fn insert_conflicting_type_blob_to_normal() {
let on_chain_balance = U256::from(10_000);
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = AllTransactions::default();
let tx = MockTransaction::eip4844().inc_price().inc_limit();
let first = f.validated(tx.clone());
pool.insert_tx(first, on_chain_balance, on_chain_nonce).unwrap();
let tx = MockTransaction::eip1559().set_sender(tx.sender()).inc_price_by(100).inc_limit();
let tx = f.validated(tx);
let err = pool.insert_tx(tx, on_chain_balance, on_chain_nonce).unwrap_err();
assert!(matches!(err, InsertErr::TxTypeConflict { .. }), "{err:?}");
}
#[test]
fn insert_previous() {
let on_chain_balance = U256::ZERO;
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = AllTransactions::default();
let tx = MockTransaction::eip1559().inc_nonce().inc_price().inc_limit();
let first = f.validated(tx.clone());
let _res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce);
let first_in_pool = pool.get(first.id()).unwrap();
assert!(!first_in_pool.state.contains(TxState::NO_NONCE_GAPS));
let prev = f.validated(tx.prev());
let InsertOk { updates, replaced_tx, state, move_to, .. } =
pool.insert_tx(prev, on_chain_balance, on_chain_nonce).unwrap();
assert!(updates.is_empty());
assert!(replaced_tx.is_none());
assert!(state.contains(TxState::NO_NONCE_GAPS));
assert_eq!(move_to, SubPool::Queued);
let first_in_pool = pool.get(first.id()).unwrap();
assert!(first_in_pool.state.contains(TxState::NO_NONCE_GAPS));
}
#[test]
fn insert_with_updates() {
let on_chain_balance = U256::from(10_000);
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = AllTransactions::default();
let tx = MockTransaction::eip1559().inc_nonce().set_gas_price(100).inc_limit();
let first = f.validated(tx.clone());
let _res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce).unwrap();
let first_in_pool = pool.get(first.id()).unwrap();
assert!(!first_in_pool.state.contains(TxState::NO_NONCE_GAPS));
assert_eq!(SubPool::Queued, first_in_pool.subpool);
let prev = f.validated(tx.prev());
let InsertOk { updates, replaced_tx, state, move_to, .. } =
pool.insert_tx(prev, on_chain_balance, on_chain_nonce).unwrap();
assert_eq!(updates.len(), 1);
assert!(replaced_tx.is_none());
assert!(state.contains(TxState::NO_NONCE_GAPS));
assert_eq!(move_to, SubPool::Pending);
let first_in_pool = pool.get(first.id()).unwrap();
assert!(first_in_pool.state.contains(TxState::NO_NONCE_GAPS));
assert_eq!(SubPool::Pending, first_in_pool.subpool);
}
#[test]
fn insert_previous_blocking() {
let on_chain_balance = U256::from(1_000);
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = AllTransactions::default();
pool.pending_fees.base_fee = pool.minimal_protocol_basefee.checked_add(1).unwrap();
let tx = MockTransaction::eip1559().inc_nonce().inc_limit();
let first = f.validated(tx.clone());
let _res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce);
let first_in_pool = pool.get(first.id()).unwrap();
assert!(tx.get_gas_price() < pool.pending_fees.base_fee as u128);
assert!(!first_in_pool.state.contains(TxState::NO_NONCE_GAPS));
let prev = f.validated(tx.prev());
let InsertOk { updates, replaced_tx, state, move_to, .. } =
pool.insert_tx(prev, on_chain_balance, on_chain_nonce).unwrap();
assert!(!state.contains(TxState::ENOUGH_FEE_CAP_BLOCK));
assert!(updates.is_empty());
assert!(replaced_tx.is_none());
assert!(state.contains(TxState::NO_NONCE_GAPS));
assert_eq!(move_to, SubPool::BaseFee);
let first_in_pool = pool.get(first.id()).unwrap();
assert!(first_in_pool.state.contains(TxState::NO_NONCE_GAPS));
}
#[test]
fn rejects_spammer() {
let on_chain_balance = U256::from(1_000);
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = AllTransactions::default();
let mut tx = MockTransaction::eip1559();
let unblocked_tx = tx.clone();
for _ in 0..pool.max_account_slots {
tx = tx.next();
pool.insert_tx(f.validated(tx.clone()), on_chain_balance, on_chain_nonce).unwrap();
}
assert_eq!(
pool.max_account_slots,
pool.tx_count(f.ids.sender_id(tx.get_sender()).unwrap())
);
let err =
pool.insert_tx(f.validated(tx.next()), on_chain_balance, on_chain_nonce).unwrap_err();
assert!(matches!(err, InsertErr::ExceededSenderTransactionsCapacity { .. }));
assert!(pool
.insert_tx(f.validated(unblocked_tx), on_chain_balance, on_chain_nonce)
.is_ok());
}
#[test]
fn allow_local_spamming() {
let on_chain_balance = U256::from(1_000);
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = AllTransactions::default();
let mut tx = MockTransaction::eip1559();
for _ in 0..pool.max_account_slots {
tx = tx.next();
pool.insert_tx(
f.validated_with_origin(TransactionOrigin::Local, tx.clone()),
on_chain_balance,
on_chain_nonce,
)
.unwrap();
}
assert_eq!(
pool.max_account_slots,
pool.tx_count(f.ids.sender_id(tx.get_sender()).unwrap())
);
pool.insert_tx(
f.validated_with_origin(TransactionOrigin::Local, tx.next()),
on_chain_balance,
on_chain_nonce,
)
.unwrap();
}
#[test]
fn reject_tx_over_gas_limit() {
let on_chain_balance = U256::from(1_000);
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = AllTransactions::default();
let tx = MockTransaction::eip1559().with_gas_limit(30_000_001);
assert!(matches!(
pool.insert_tx(f.validated(tx), on_chain_balance, on_chain_nonce),
Err(InsertErr::TxGasLimitMoreThanAvailableBlockGas { .. })
));
}
#[test]
fn test_tx_equal_gas_limit() {
let on_chain_balance = U256::from(1_000);
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = AllTransactions::default();
let tx = MockTransaction::eip1559().with_gas_limit(30_000_000);
let InsertOk { state, .. } =
pool.insert_tx(f.validated(tx), on_chain_balance, on_chain_nonce).unwrap();
assert!(state.contains(TxState::NOT_TOO_MUCH_GAS));
}
#[test]
fn update_basefee_subpools() {
let mut f = MockTransactionFactory::default();
let mut pool = TxPool::new(MockOrdering::default(), Default::default());
let tx = MockTransaction::eip1559().inc_price_by(10);
let validated = f.validated(tx.clone());
let id = *validated.id();
pool.add_transaction(validated, U256::from(1_000), 0).unwrap();
assert_eq!(pool.pending_pool.len(), 1);
pool.update_basefee((tx.max_fee_per_gas() + 1) as u64);
assert!(pool.pending_pool.is_empty());
assert_eq!(pool.basefee_pool.len(), 1);
assert_eq!(pool.all_transactions.txs.get(&id).unwrap().subpool, SubPool::BaseFee)
}
#[test]
fn update_basefee_subpools_setting_block_info() {
let mut f = MockTransactionFactory::default();
let mut pool = TxPool::new(MockOrdering::default(), Default::default());
let tx = MockTransaction::eip1559().inc_price_by(10);
let validated = f.validated(tx.clone());
let id = *validated.id();
pool.add_transaction(validated, U256::from(1_000), 0).unwrap();
assert_eq!(pool.pending_pool.len(), 1);
let mut block_info = pool.block_info();
block_info.pending_basefee = (tx.max_fee_per_gas() + 1) as u64;
pool.set_block_info(block_info);
assert!(pool.pending_pool.is_empty());
assert_eq!(pool.basefee_pool.len(), 1);
assert_eq!(pool.all_transactions.txs.get(&id).unwrap().subpool, SubPool::BaseFee)
}
#[test]
fn get_highest_transaction_by_sender_and_nonce() {
let mut f = MockTransactionFactory::default();
let mut pool = TxPool::new(MockOrdering::default(), Default::default());
let tx = MockTransaction::eip1559();
pool.add_transaction(f.validated(tx.clone()), U256::from(1_000), 0).unwrap();
let tx1 = tx.inc_price().next();
let tx1_validated = f.validated(tx1.clone());
pool.add_transaction(tx1_validated, U256::from(1_000), 0).unwrap();
assert_eq!(
pool.get_highest_nonce_by_sender(f.ids.sender_id(&tx.sender()).unwrap()),
Some(1)
);
let highest_tx = pool
.get_highest_transaction_by_sender(f.ids.sender_id(&tx.sender()).unwrap())
.expect("Failed to retrieve highest transaction");
assert_eq!(highest_tx.as_ref().transaction, tx1);
}
#[test]
fn get_highest_consecutive_transaction_by_sender() {
let mut pool = TxPool::new(MockOrdering::default(), PoolConfig::default());
let mut f = MockTransactionFactory::default();
let sender = Address::random();
let txs: Vec<_> = vec![0, 1, 2, 4, 5, 8, 9];
for nonce in txs {
let mut mock_tx = MockTransaction::eip1559();
mock_tx.set_sender(sender);
mock_tx.set_nonce(nonce);
let validated_tx = f.validated(mock_tx);
pool.add_transaction(validated_tx, U256::from(1000), 0).unwrap();
}
let sender_id = f.ids.sender_id(&sender).unwrap();
let next_tx =
pool.get_highest_consecutive_transaction_by_sender(sender_id.into_transaction_id(0));
assert_eq!(next_tx.map(|tx| tx.nonce()), Some(2), "Expected nonce 2 for on-chain nonce 0");
let next_tx =
pool.get_highest_consecutive_transaction_by_sender(sender_id.into_transaction_id(4));
assert_eq!(next_tx.map(|tx| tx.nonce()), Some(5), "Expected nonce 5 for on-chain nonce 4");
let next_tx =
pool.get_highest_consecutive_transaction_by_sender(sender_id.into_transaction_id(5));
assert_eq!(next_tx.map(|tx| tx.nonce()), Some(5), "Expected nonce 5 for on-chain nonce 5");
let mut info = SenderInfo::default();
info.update(8, U256::ZERO);
pool.sender_info.insert(sender_id, info);
let next_tx =
pool.get_highest_consecutive_transaction_by_sender(sender_id.into_transaction_id(5));
assert_eq!(next_tx.map(|tx| tx.nonce()), Some(9), "Expected nonce 9 for on-chain nonce 8");
}
#[test]
fn discard_nonce_too_low() {
let mut f = MockTransactionFactory::default();
let mut pool = TxPool::new(MockOrdering::default(), Default::default());
let tx = MockTransaction::eip1559().inc_price_by(10);
let validated = f.validated(tx.clone());
let id = *validated.id();
pool.add_transaction(validated, U256::from(1_000), 0).unwrap();
let next = tx.next();
let validated = f.validated(next.clone());
pool.add_transaction(validated, U256::from(1_000), 0).unwrap();
assert_eq!(pool.pending_pool.len(), 2);
let mut changed_senders = HashMap::default();
changed_senders.insert(
id.sender,
SenderInfo { state_nonce: next.nonce(), balance: U256::from(1_000) },
);
let outcome = pool.update_accounts(changed_senders);
assert_eq!(outcome.discarded.len(), 1);
assert_eq!(pool.pending_pool.len(), 1);
}
#[test]
fn discard_with_large_blob_txs() {
reth_tracing::init_test_tracing();
let mut f = MockTransactionFactory::default();
let mut pool = TxPool::new(MockOrdering::default(), Default::default());
let default_limits = pool.config.blob_limit;
let a_sender = address!("000000000000000000000000000000000000000a");
let mut block_info = pool.block_info();
block_info.pending_blob_fee = Some(100);
block_info.pending_basefee = 100;
pool.set_block_info(block_info);
let a_txs = MockTransactionSet::dependent(a_sender, 0, 2, TxType::Eip4844)
.into_iter()
.map(|mut tx| {
tx.set_size(default_limits.max_size / 2 + 1);
tx.set_max_fee((block_info.pending_basefee - 1).into());
tx
})
.collect::<Vec<_>>();
for tx in a_txs {
pool.add_transaction(f.validated(tx), U256::from(1_000), 0).unwrap();
}
let removed = pool.discard_worst();
assert_eq!(removed.len(), 1);
}
#[test]
fn discard_with_parked_large_txs() {
reth_tracing::init_test_tracing();
let mut f = MockTransactionFactory::default();
let mut pool = TxPool::new(MockOrdering::default(), Default::default());
let default_limits = pool.config.queued_limit;
let a_sender = address!("000000000000000000000000000000000000000a");
let pool_base_fee = 100;
pool.update_basefee(pool_base_fee);
let a_txs = MockTransactionSet::dependent(a_sender, 0, 3, TxType::Eip1559)
.into_iter()
.map(|mut tx| {
tx.set_size(default_limits.max_size / 2 + 1);
tx.set_max_fee((pool_base_fee - 1).into());
tx
})
.collect::<Vec<_>>();
for tx in a_txs {
pool.add_transaction(f.validated(tx), U256::from(1_000), 0).unwrap();
}
let removed = pool.discard_worst();
assert_eq!(removed.len(), 1);
}
#[test]
fn discard_at_capacity() {
let mut f = MockTransactionFactory::default();
let queued_limit = SubPoolLimit::new(1000, usize::MAX);
let mut pool =
TxPool::new(MockOrdering::default(), PoolConfig { queued_limit, ..Default::default() });
for _ in 0..queued_limit.max_txs {
let tx = MockTransaction::eip1559().inc_price_by(10).inc_nonce();
let validated = f.validated(tx.clone());
let _id = *validated.id();
pool.add_transaction(validated, U256::from(1_000), 0).unwrap();
}
let size = pool.size();
assert_eq!(size.queued, queued_limit.max_txs);
for _ in 0..queued_limit.max_txs {
let tx = MockTransaction::eip1559().inc_price_by(10).inc_nonce();
let validated = f.validated(tx.clone());
let _id = *validated.id();
pool.add_transaction(validated, U256::from(1_000), 0).unwrap();
pool.discard_worst();
pool.assert_invariants();
assert!(pool.size().queued <= queued_limit.max_txs);
}
}
#[test]
fn discard_blobs_at_capacity() {
let mut f = MockTransactionFactory::default();
let blob_limit = SubPoolLimit::new(1000, usize::MAX);
let mut pool =
TxPool::new(MockOrdering::default(), PoolConfig { blob_limit, ..Default::default() });
pool.all_transactions.pending_fees.blob_fee = 10000;
for _ in 0..blob_limit.max_txs {
let tx = MockTransaction::eip4844().inc_price_by(100).with_blob_fee(100);
let validated = f.validated(tx.clone());
let _id = *validated.id();
pool.add_transaction(validated, U256::from(1_000), 0).unwrap();
}
let size = pool.size();
assert_eq!(size.blob, blob_limit.max_txs);
for _ in 0..blob_limit.max_txs {
let tx = MockTransaction::eip4844().inc_price_by(100).with_blob_fee(100);
let validated = f.validated(tx.clone());
let _id = *validated.id();
pool.add_transaction(validated, U256::from(1_000), 0).unwrap();
pool.discard_worst();
pool.assert_invariants();
assert!(pool.size().blob <= blob_limit.max_txs);
}
}
#[test]
fn account_updates_nonce_gap() {
let on_chain_balance = U256::from(10_000);
let mut on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = TxPool::new(MockOrdering::default(), Default::default());
let tx_0 = MockTransaction::eip1559().set_gas_price(100).inc_limit();
let tx_1 = tx_0.next();
let tx_2 = tx_1.next();
let v0 = f.validated(tx_0);
let v1 = f.validated(tx_1);
let v2 = f.validated(tx_2);
let _res = pool.add_transaction(v0.clone(), on_chain_balance, on_chain_nonce).unwrap();
let _res = pool.add_transaction(v1, on_chain_balance, on_chain_nonce).unwrap();
assert!(pool.queued_transactions().is_empty());
assert_eq!(2, pool.pending_transactions().len());
pool.prune_transaction_by_hash(v0.hash());
let _res = pool.add_transaction(v2, on_chain_balance, on_chain_nonce).unwrap();
assert_eq!(1, pool.queued_transactions().len());
assert_eq!(1, pool.pending_transactions().len());
let mut updated_accounts = HashMap::default();
on_chain_nonce += 1;
updated_accounts.insert(
v0.sender_id(),
SenderInfo { state_nonce: on_chain_nonce, balance: on_chain_balance },
);
pool.update_accounts(updated_accounts);
assert!(pool.queued_transactions().is_empty());
assert_eq!(2, pool.pending_transactions().len());
}
#[test]
fn test_transaction_removal() {
let on_chain_balance = U256::from(10_000);
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = TxPool::new(MockOrdering::default(), Default::default());
let tx_0 = MockTransaction::eip1559().set_gas_price(100).inc_limit();
let tx_1 = tx_0.next();
let v0 = f.validated(tx_0);
let v1 = f.validated(tx_1);
let _res = pool.add_transaction(v0.clone(), on_chain_balance, on_chain_nonce).unwrap();
let _res = pool.add_transaction(v1.clone(), on_chain_balance, on_chain_nonce).unwrap();
assert_eq!(0, pool.queued_transactions().len());
assert_eq!(2, pool.pending_transactions().len());
pool.remove_transaction(v0.id());
let pool_txs = pool.best_transactions().map(|x| x.id().nonce).collect::<Vec<_>>();
assert_eq!(vec![v1.nonce()], pool_txs);
}
#[test]
fn test_remove_transactions() {
let on_chain_balance = U256::from(10_000);
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = TxPool::new(MockOrdering::default(), Default::default());
let tx_0 = MockTransaction::eip1559().set_gas_price(100).inc_limit();
let tx_1 = tx_0.next();
let tx_2 = MockTransaction::eip1559().set_gas_price(100).inc_limit();
let tx_3 = tx_2.next();
let v0 = f.validated(tx_0);
let v1 = f.validated(tx_1);
let v2 = f.validated(tx_2);
let v3 = f.validated(tx_3);
let _res = pool.add_transaction(v0.clone(), on_chain_balance, on_chain_nonce).unwrap();
let _res = pool.add_transaction(v1.clone(), on_chain_balance, on_chain_nonce).unwrap();
let _res = pool.add_transaction(v2.clone(), on_chain_balance, on_chain_nonce).unwrap();
let _res = pool.add_transaction(v3.clone(), on_chain_balance, on_chain_nonce).unwrap();
assert_eq!(0, pool.queued_transactions().len());
assert_eq!(4, pool.pending_transactions().len());
pool.remove_transactions(vec![*v0.hash(), *v2.hash()]);
assert_eq!(0, pool.queued_transactions().len());
assert_eq!(2, pool.pending_transactions().len());
assert!(pool.contains(v1.hash()));
assert!(pool.contains(v3.hash()));
}
#[test]
fn test_remove_transactions_and_descendants() {
let on_chain_balance = U256::from(10_000);
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = TxPool::new(MockOrdering::default(), Default::default());
let tx_0 = MockTransaction::eip1559().set_gas_price(100).inc_limit();
let tx_1 = tx_0.next();
let tx_2 = MockTransaction::eip1559().set_gas_price(100).inc_limit();
let tx_3 = tx_2.next();
let tx_4 = tx_3.next();
let v0 = f.validated(tx_0);
let v1 = f.validated(tx_1);
let v2 = f.validated(tx_2);
let v3 = f.validated(tx_3);
let v4 = f.validated(tx_4);
let _res = pool.add_transaction(v0.clone(), on_chain_balance, on_chain_nonce).unwrap();
let _res = pool.add_transaction(v1, on_chain_balance, on_chain_nonce).unwrap();
let _res = pool.add_transaction(v2.clone(), on_chain_balance, on_chain_nonce).unwrap();
let _res = pool.add_transaction(v3, on_chain_balance, on_chain_nonce).unwrap();
let _res = pool.add_transaction(v4, on_chain_balance, on_chain_nonce).unwrap();
assert_eq!(0, pool.queued_transactions().len());
assert_eq!(5, pool.pending_transactions().len());
pool.remove_transactions_and_descendants(vec![*v0.hash(), *v2.hash()]);
assert_eq!(0, pool.queued_transactions().len());
assert_eq!(0, pool.pending_transactions().len());
}
#[test]
fn test_remove_descendants() {
let on_chain_balance = U256::from(10_000);
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = TxPool::new(MockOrdering::default(), Default::default());
let tx_0 = MockTransaction::eip1559().set_gas_price(100).inc_limit();
let tx_1 = tx_0.next();
let tx_2 = tx_1.next();
let tx_3 = tx_2.next();
let v0 = f.validated(tx_0);
let v1 = f.validated(tx_1);
let v2 = f.validated(tx_2);
let v3 = f.validated(tx_3);
let _res = pool.add_transaction(v0.clone(), on_chain_balance, on_chain_nonce).unwrap();
let _res = pool.add_transaction(v1, on_chain_balance, on_chain_nonce).unwrap();
let _res = pool.add_transaction(v2, on_chain_balance, on_chain_nonce).unwrap();
let _res = pool.add_transaction(v3, on_chain_balance, on_chain_nonce).unwrap();
assert_eq!(0, pool.queued_transactions().len());
assert_eq!(4, pool.pending_transactions().len());
let mut removed = Vec::new();
pool.remove_transaction(v0.id());
pool.remove_descendants(v0.id(), &mut removed);
assert_eq!(0, pool.queued_transactions().len());
assert_eq!(0, pool.pending_transactions().len());
assert_eq!(3, removed.len());
}
#[test]
fn test_remove_transactions_by_sender() {
let on_chain_balance = U256::from(10_000);
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = TxPool::new(MockOrdering::default(), Default::default());
let tx_0 = MockTransaction::eip1559().set_gas_price(100).inc_limit();
let tx_1 = tx_0.next();
let tx_2 = MockTransaction::eip1559().set_gas_price(100).inc_limit();
let tx_3 = tx_2.next();
let tx_4 = tx_3.next();
let v0 = f.validated(tx_0);
let v1 = f.validated(tx_1);
let v2 = f.validated(tx_2);
let v3 = f.validated(tx_3);
let v4 = f.validated(tx_4);
let _res = pool.add_transaction(v0.clone(), on_chain_balance, on_chain_nonce).unwrap();
let _res = pool.add_transaction(v1.clone(), on_chain_balance, on_chain_nonce).unwrap();
let _res = pool.add_transaction(v2.clone(), on_chain_balance, on_chain_nonce).unwrap();
let _res = pool.add_transaction(v3, on_chain_balance, on_chain_nonce).unwrap();
let _res = pool.add_transaction(v4, on_chain_balance, on_chain_nonce).unwrap();
assert_eq!(0, pool.queued_transactions().len());
assert_eq!(5, pool.pending_transactions().len());
pool.remove_transactions_by_sender(v2.sender_id());
assert_eq!(0, pool.queued_transactions().len());
assert_eq!(2, pool.pending_transactions().len());
assert!(pool.contains(v0.hash()));
assert!(pool.contains(v1.hash()));
}
#[test]
fn wrong_best_order_of_transactions() {
let on_chain_balance = U256::from(10_000);
let mut on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = TxPool::new(MockOrdering::default(), Default::default());
let tx_0 = MockTransaction::eip1559().set_gas_price(100).inc_limit();
let tx_1 = tx_0.next();
let tx_2 = tx_1.next();
let tx_3 = tx_2.next();
let v0 = f.validated(tx_0);
let v1 = f.validated(tx_1);
let v2 = f.validated(tx_2);
let v3 = f.validated(tx_3);
let _res = pool.add_transaction(v0.clone(), on_chain_balance, on_chain_nonce).unwrap();
let _res = pool.add_transaction(v1, on_chain_balance, on_chain_nonce).unwrap();
assert_eq!(0, pool.queued_transactions().len());
assert_eq!(2, pool.pending_transactions().len());
pool.remove_transaction(v0.id());
let _res = pool.add_transaction(v2, on_chain_balance, on_chain_nonce).unwrap();
assert_eq!(1, pool.queued_transactions().len());
assert_eq!(1, pool.pending_transactions().len());
let mut updated_accounts = HashMap::default();
on_chain_nonce += 1;
updated_accounts.insert(
v0.sender_id(),
SenderInfo { state_nonce: on_chain_nonce, balance: on_chain_balance },
);
pool.update_accounts(updated_accounts);
assert_eq!(0, pool.queued_transactions().len());
assert_eq!(2, pool.pending_transactions().len());
let _res = pool.add_transaction(v3, on_chain_balance, on_chain_nonce).unwrap();
assert_eq!(0, pool.queued_transactions().len());
assert_eq!(3, pool.pending_transactions().len());
assert_eq!(
pool.best_transactions().map(|x| x.id().nonce).collect::<Vec<_>>(),
vec![1, 2, 3]
);
}
#[test]
fn test_pending_ordering() {
let mut f = MockTransactionFactory::default();
let mut pool = TxPool::new(MockOrdering::default(), Default::default());
let tx_0 = MockTransaction::eip1559().with_nonce(1).set_gas_price(100).inc_limit();
let tx_1 = tx_0.next();
let v0 = f.validated(tx_0);
let v1 = f.validated(tx_1);
pool.add_transaction(v0.clone(), U256::MAX, 0).unwrap();
assert_eq!(1, pool.queued_transactions().len());
pool.add_transaction(v1, U256::MAX, 1).unwrap();
assert_eq!(2, pool.pending_transactions().len());
assert_eq!(0, pool.queued_transactions().len());
assert_eq!(
pool.pending_pool.independent().get(&v0.sender_id()).unwrap().transaction.nonce(),
v0.nonce()
);
}
#[test]
fn one_sender_one_independent_transaction() {
let mut on_chain_balance = U256::from(4_999); let mut on_chain_nonce = 40;
let mut f = MockTransactionFactory::default();
let mut pool = TxPool::mock();
let mut submitted_txs = Vec::new();
let template =
MockTransaction::eip1559().inc_price().inc_limit().with_value(U256::from(1_001));
for tx_nonce in 40..48 {
let tx = f.validated(template.clone().with_nonce(tx_nonce).rng_hash());
submitted_txs.push(*tx.id());
pool.add_transaction(tx, on_chain_balance, on_chain_nonce).unwrap();
}
on_chain_balance = U256::from(999_999);
on_chain_nonce = 42;
pool.remove_transaction(&submitted_txs[0]);
pool.remove_transaction(&submitted_txs[1]);
for tx_nonce in 48..52 {
pool.add_transaction(
f.validated(template.clone().with_nonce(tx_nonce).rng_hash()),
on_chain_balance,
on_chain_nonce,
)
.unwrap();
}
let best_txs: Vec<_> = pool.pending().best().map(|tx| *tx.id()).collect();
assert_eq!(best_txs.len(), 10); assert_eq!(pool.pending_pool.independent().len(), 1);
}
}