use crate::{
identifier::{SenderId, TransactionId},
pool::size::SizeTracker,
PoolTransaction, SubPoolLimit, ValidPoolTransaction, TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER,
};
use rustc_hash::FxHashMap;
use smallvec::SmallVec;
use std::{
cmp::Ordering,
collections::{hash_map::Entry, BTreeMap, BTreeSet},
ops::{Bound::Unbounded, Deref},
sync::Arc,
};
#[derive(Debug, Clone)]
pub struct ParkedPool<T: ParkedOrd> {
submission_id: u64,
by_id: BTreeMap<TransactionId, ParkedPoolTransaction<T>>,
best: BTreeSet<ParkedPoolTransaction<T>>,
last_sender_submission: BTreeSet<SubmissionSenderId>,
sender_transaction_count: FxHashMap<SenderId, SenderTransactionCount>,
size_of: SizeTracker,
}
impl<T: ParkedOrd> ParkedPool<T> {
pub fn add_transaction(&mut self, tx: Arc<ValidPoolTransaction<T::Transaction>>) {
let id = *tx.id();
assert!(
!self.contains(&id),
"transaction already included {:?}",
self.get(&id).unwrap().transaction.transaction
);
let submission_id = self.next_id();
self.size_of += tx.size();
self.add_sender_count(tx.sender_id(), submission_id);
let transaction = ParkedPoolTransaction { submission_id, transaction: tx.into() };
self.by_id.insert(id, transaction.clone());
self.best.insert(transaction);
}
fn add_sender_count(&mut self, sender: SenderId, submission_id: u64) {
match self.sender_transaction_count.entry(sender) {
Entry::Occupied(mut entry) => {
let value = entry.get_mut();
self.last_sender_submission
.remove(&SubmissionSenderId::new(sender, value.last_submission_id));
value.count += 1;
value.last_submission_id = submission_id;
}
Entry::Vacant(entry) => {
entry
.insert(SenderTransactionCount { count: 1, last_submission_id: submission_id });
}
}
self.last_sender_submission.insert(SubmissionSenderId::new(sender, submission_id));
}
fn remove_sender_count(&mut self, sender_id: SenderId) {
let removed_sender = match self.sender_transaction_count.entry(sender_id) {
Entry::Occupied(mut entry) => {
let value = entry.get_mut();
value.count -= 1;
if value.count == 0 {
entry.remove()
} else {
return
}
}
Entry::Vacant(_) => {
unreachable!("sender count not found {:?}", sender_id);
}
};
assert!(
self.last_sender_submission
.remove(&SubmissionSenderId::new(sender_id, removed_sender.last_submission_id)),
"last sender transaction not found {sender_id:?}"
);
}
pub(crate) fn all(
&self,
) -> impl Iterator<Item = Arc<ValidPoolTransaction<T::Transaction>>> + '_ {
self.by_id.values().map(|tx| tx.transaction.clone().into())
}
pub(crate) fn remove_transaction(
&mut self,
id: &TransactionId,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let tx = self.by_id.remove(id)?;
self.best.remove(&tx);
self.remove_sender_count(tx.transaction.sender_id());
self.size_of -= tx.transaction.size();
Some(tx.transaction.into())
}
pub(crate) fn get_txs_by_sender(
&self,
sender: SenderId,
) -> SmallVec<[TransactionId; TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER]> {
self.by_id
.range((sender.start_bound(), Unbounded))
.take_while(move |(other, _)| sender == other.sender)
.map(|(tx_id, _)| *tx_id)
.collect()
}
#[cfg(test)]
pub(crate) fn get_senders_by_submission_id(
&self,
) -> impl Iterator<Item = SubmissionSenderId> + '_ {
self.last_sender_submission.iter().copied()
}
pub fn truncate_pool(
&mut self,
limit: SubPoolLimit,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
if !self.exceeds(&limit) {
return Vec::new()
}
let mut removed = Vec::new();
while limit.is_exceeded(self.len(), self.size()) && !self.last_sender_submission.is_empty()
{
let sender_id = self.last_sender_submission.last().expect("not empty").sender_id;
let list = self.get_txs_by_sender(sender_id);
for txid in list.into_iter().rev() {
if let Some(tx) = self.remove_transaction(&txid) {
removed.push(tx);
}
if !self.exceeds(&limit) {
break
}
}
}
removed
}
fn next_id(&mut self) -> u64 {
let id = self.submission_id;
self.submission_id = self.submission_id.wrapping_add(1);
id
}
pub(crate) fn size(&self) -> usize {
self.size_of.into()
}
pub(crate) fn len(&self) -> usize {
self.by_id.len()
}
#[inline]
pub(crate) fn exceeds(&self, limit: &SubPoolLimit) -> bool {
limit.is_exceeded(self.len(), self.size())
}
#[cfg(test)]
#[allow(dead_code)]
pub(crate) fn is_empty(&self) -> bool {
self.by_id.is_empty()
}
pub(crate) fn contains(&self, id: &TransactionId) -> bool {
self.by_id.contains_key(id)
}
fn get(&self, id: &TransactionId) -> Option<&ParkedPoolTransaction<T>> {
self.by_id.get(id)
}
#[cfg(any(test, feature = "test-utils"))]
pub(crate) fn assert_invariants(&self) {
assert_eq!(self.by_id.len(), self.best.len(), "by_id.len() != best.len()");
assert_eq!(
self.last_sender_submission.len(),
self.sender_transaction_count.len(),
"last_sender_transaction.len() != sender_to_last_transaction.len()"
);
}
}
impl<T: PoolTransaction> ParkedPool<BasefeeOrd<T>> {
#[allow(dead_code)]
pub(crate) fn satisfy_base_fee_transactions(
&self,
basefee: u64,
) -> Vec<Arc<ValidPoolTransaction<T>>> {
let ids = self.satisfy_base_fee_ids(basefee);
let mut txs = Vec::with_capacity(ids.len());
for id in ids {
txs.push(self.get(&id).expect("transaction exists").transaction.clone().into());
}
txs
}
fn satisfy_base_fee_ids(&self, basefee: u64) -> Vec<TransactionId> {
let mut transactions = Vec::new();
{
let mut iter = self.by_id.iter().peekable();
while let Some((id, tx)) = iter.next() {
if tx.transaction.transaction.max_fee_per_gas() < basefee as u128 {
'this: while let Some((peek, _)) = iter.peek() {
if peek.sender != id.sender {
break 'this
}
iter.next();
}
} else {
transactions.push(*id);
}
}
}
transactions
}
pub(crate) fn enforce_basefee(&mut self, basefee: u64) -> Vec<Arc<ValidPoolTransaction<T>>> {
let to_remove = self.satisfy_base_fee_ids(basefee);
let mut removed = Vec::with_capacity(to_remove.len());
for id in to_remove {
removed.push(self.remove_transaction(&id).expect("transaction exists"));
}
removed
}
}
impl<T: ParkedOrd> Default for ParkedPool<T> {
fn default() -> Self {
Self {
submission_id: 0,
by_id: Default::default(),
best: Default::default(),
last_sender_submission: Default::default(),
sender_transaction_count: Default::default(),
size_of: Default::default(),
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
struct SenderTransactionCount {
count: u64,
last_submission_id: u64,
}
#[derive(Debug)]
struct ParkedPoolTransaction<T: ParkedOrd> {
submission_id: u64,
transaction: T,
}
impl<T: ParkedOrd> Clone for ParkedPoolTransaction<T> {
fn clone(&self) -> Self {
Self { submission_id: self.submission_id, transaction: self.transaction.clone() }
}
}
impl<T: ParkedOrd> Eq for ParkedPoolTransaction<T> {}
impl<T: ParkedOrd> PartialEq<Self> for ParkedPoolTransaction<T> {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl<T: ParkedOrd> PartialOrd<Self> for ParkedPoolTransaction<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<T: ParkedOrd> Ord for ParkedPoolTransaction<T> {
fn cmp(&self, other: &Self) -> Ordering {
self.transaction
.cmp(&other.transaction)
.then_with(|| other.submission_id.cmp(&self.submission_id))
}
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub(crate) struct SubmissionSenderId {
pub(crate) sender_id: SenderId,
pub(crate) submission_id: u64,
}
impl SubmissionSenderId {
const fn new(sender_id: SenderId, submission_id: u64) -> Self {
Self { sender_id, submission_id }
}
}
impl Ord for SubmissionSenderId {
fn cmp(&self, other: &Self) -> Ordering {
other.submission_id.cmp(&self.submission_id)
}
}
impl PartialOrd for SubmissionSenderId {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
pub trait ParkedOrd:
Ord
+ Clone
+ From<Arc<ValidPoolTransaction<Self::Transaction>>>
+ Into<Arc<ValidPoolTransaction<Self::Transaction>>>
+ Deref<Target = Arc<ValidPoolTransaction<Self::Transaction>>>
{
type Transaction: PoolTransaction;
}
macro_rules! impl_ord_wrapper {
($name:ident) => {
impl<T: PoolTransaction> Clone for $name<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T: PoolTransaction> Eq for $name<T> {}
impl<T: PoolTransaction> PartialEq<Self> for $name<T> {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl<T: PoolTransaction> PartialOrd<Self> for $name<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<T: PoolTransaction> Deref for $name<T> {
type Target = Arc<ValidPoolTransaction<T>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T: PoolTransaction> ParkedOrd for $name<T> {
type Transaction = T;
}
impl<T: PoolTransaction> From<Arc<ValidPoolTransaction<T>>> for $name<T> {
fn from(value: Arc<ValidPoolTransaction<T>>) -> Self {
Self(value)
}
}
impl<T: PoolTransaction> From<$name<T>> for Arc<ValidPoolTransaction<T>> {
fn from(value: $name<T>) -> Arc<ValidPoolTransaction<T>> {
value.0
}
}
};
}
#[derive(Debug)]
pub struct BasefeeOrd<T: PoolTransaction>(Arc<ValidPoolTransaction<T>>);
impl_ord_wrapper!(BasefeeOrd);
impl<T: PoolTransaction> Ord for BasefeeOrd<T> {
fn cmp(&self, other: &Self) -> Ordering {
self.0.transaction.max_fee_per_gas().cmp(&other.0.transaction.max_fee_per_gas())
}
}
#[derive(Debug)]
pub struct QueuedOrd<T: PoolTransaction>(Arc<ValidPoolTransaction<T>>);
impl_ord_wrapper!(QueuedOrd);
impl<T: PoolTransaction> Ord for QueuedOrd<T> {
fn cmp(&self, other: &Self) -> Ordering {
self.max_fee_per_gas().cmp(&self.max_fee_per_gas()).then_with(||
other.timestamp.cmp(&self.timestamp))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{MockTransaction, MockTransactionFactory, MockTransactionSet};
use alloy_primitives::address;
use reth_primitives::TxType;
use std::collections::HashSet;
#[test]
fn test_enforce_parked_basefee() {
let mut f = MockTransactionFactory::default();
let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
let tx = f.validated_arc(MockTransaction::eip1559().inc_price());
pool.add_transaction(tx.clone());
assert!(pool.contains(tx.id()));
assert_eq!(pool.len(), 1);
let removed = pool.enforce_basefee(u64::MAX);
assert!(removed.is_empty());
let removed = pool.enforce_basefee((tx.max_fee_per_gas() - 1) as u64);
assert_eq!(removed.len(), 1);
assert!(pool.is_empty());
}
#[test]
fn test_enforce_parked_basefee_descendant() {
let mut f = MockTransactionFactory::default();
let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
let t = MockTransaction::eip1559().inc_price_by(10);
let root_tx = f.validated_arc(t.clone());
pool.add_transaction(root_tx.clone());
let descendant_tx = f.validated_arc(t.inc_nonce().decr_price());
pool.add_transaction(descendant_tx.clone());
assert!(pool.contains(root_tx.id()));
assert!(pool.contains(descendant_tx.id()));
assert_eq!(pool.len(), 2);
let removed = pool.enforce_basefee(u64::MAX);
assert!(removed.is_empty());
assert_eq!(pool.len(), 2);
{
let mut pool2 = pool.clone();
let removed = pool2.enforce_basefee(root_tx.max_fee_per_gas() as u64);
assert_eq!(removed.len(), 1);
assert_eq!(pool2.len(), 1);
assert!(!pool2.contains(root_tx.id()));
assert!(pool2.contains(descendant_tx.id()));
}
let removed = pool.enforce_basefee(descendant_tx.max_fee_per_gas() as u64);
assert_eq!(removed.len(), 2);
assert!(pool.is_empty());
}
#[test]
fn truncate_parked_by_submission_id() {
let mut f = MockTransactionFactory::default();
let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
let a_sender = address!("000000000000000000000000000000000000000a");
let b_sender = address!("000000000000000000000000000000000000000b");
let c_sender = address!("000000000000000000000000000000000000000c");
let d_sender = address!("000000000000000000000000000000000000000d");
let mut tx_set = MockTransactionSet::dependent(a_sender, 0, 4, TxType::Eip1559);
let a = tx_set.clone().into_vec();
let b = MockTransactionSet::dependent(b_sender, 0, 3, TxType::Eip1559).into_vec();
tx_set.extend(b.clone());
let c = MockTransactionSet::dependent(c_sender, 0, 3, TxType::Eip1559).into_vec();
tx_set.extend(c.clone());
let d = MockTransactionSet::dependent(d_sender, 0, 1, TxType::Eip1559).into_vec();
tx_set.extend(d.clone());
let all_txs = tx_set.into_vec();
let expected_parked = vec![c[0].clone(), c[1].clone(), c[2].clone(), d[0].clone()]
.into_iter()
.map(|tx| (tx.sender(), tx.nonce()))
.collect::<HashSet<_>>();
let expected_removed = vec![
a[0].clone(),
a[1].clone(),
a[2].clone(),
a[3].clone(),
b[0].clone(),
b[1].clone(),
b[2].clone(),
]
.into_iter()
.map(|tx| (tx.sender(), tx.nonce()))
.collect::<HashSet<_>>();
for tx in all_txs {
pool.add_transaction(f.validated_arc(tx));
}
let pool_limit = SubPoolLimit { max_txs: 4, max_size: usize::MAX };
let removed = pool.truncate_pool(pool_limit);
assert_eq!(removed.len(), expected_removed.len());
let removed =
removed.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::<HashSet<_>>();
assert_eq!(removed, expected_removed);
let parked = pool.all().collect::<Vec<_>>();
assert_eq!(parked.len(), expected_parked.len());
let parked = parked.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::<HashSet<_>>();
assert_eq!(parked, expected_parked);
}
#[test]
fn test_truncate_parked_with_large_tx() {
let mut f = MockTransactionFactory::default();
let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
let default_limits = SubPoolLimit::default();
let a_sender = address!("000000000000000000000000000000000000000a");
let a_txs = MockTransactionSet::dependent(a_sender, 0, 2, TxType::Eip1559)
.into_iter()
.map(|mut tx| {
tx.set_size(default_limits.max_size / 2 + 1);
tx
})
.collect::<Vec<_>>();
for tx in a_txs {
pool.add_transaction(f.validated_arc(tx));
}
let removed = pool.truncate_pool(default_limits);
assert_eq!(removed.len(), 1);
}
#[test]
fn test_senders_by_submission_id() {
let mut f = MockTransactionFactory::default();
let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
let a_sender = address!("000000000000000000000000000000000000000a");
let b_sender = address!("000000000000000000000000000000000000000b");
let c_sender = address!("000000000000000000000000000000000000000c");
let d_sender = address!("000000000000000000000000000000000000000d");
let mut tx_set =
MockTransactionSet::dependent(a_sender, 0, 4, reth_primitives::TxType::Eip1559);
let a = tx_set.clone().into_vec();
let b = MockTransactionSet::dependent(b_sender, 0, 3, reth_primitives::TxType::Eip1559)
.into_vec();
tx_set.extend(b.clone());
let c = MockTransactionSet::dependent(c_sender, 0, 3, reth_primitives::TxType::Eip1559)
.into_vec();
tx_set.extend(c.clone());
let d = MockTransactionSet::dependent(d_sender, 0, 1, reth_primitives::TxType::Eip1559)
.into_vec();
tx_set.extend(d.clone());
let all_txs = tx_set.into_vec();
for tx in all_txs {
pool.add_transaction(f.validated_arc(tx));
}
let senders = pool.get_senders_by_submission_id().map(|s| s.sender_id).collect::<Vec<_>>();
assert_eq!(senders.len(), 4);
let expected_senders = vec![d_sender, c_sender, b_sender, a_sender]
.into_iter()
.map(|s| f.ids.sender_id(&s).unwrap())
.collect::<Vec<_>>();
assert_eq!(senders, expected_senders);
let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
let all_txs = vec![d[0].clone(), b[0].clone(), c[0].clone(), a[0].clone()];
for tx in all_txs {
pool.add_transaction(f.validated_arc(tx));
}
let senders = pool.get_senders_by_submission_id().map(|s| s.sender_id).collect::<Vec<_>>();
assert_eq!(senders.len(), 4);
let expected_senders = vec![a_sender, c_sender, b_sender, d_sender]
.into_iter()
.map(|s| f.ids.sender_id(&s).unwrap())
.collect::<Vec<_>>();
assert_eq!(senders, expected_senders);
}
#[test]
fn test_add_sender_count_new_sender() {
let mut f = MockTransactionFactory::default();
let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
let tx = f.validated_arc(MockTransaction::eip1559().inc_price());
pool.add_transaction(tx);
let sender: SenderId = 11.into();
let submission_id = 1;
pool.add_sender_count(sender, submission_id);
assert_eq!(pool.sender_transaction_count.len(), 2);
let sender_info = pool.sender_transaction_count.get(&sender).unwrap();
assert_eq!(sender_info.count, 1);
assert_eq!(sender_info.last_submission_id, submission_id);
assert_eq!(pool.last_sender_submission.len(), 2);
let submission_info = pool.last_sender_submission.iter().next().unwrap();
assert_eq!(submission_info.sender_id, sender);
assert_eq!(submission_info.submission_id, submission_id);
}
#[test]
fn test_add_sender_count_existing_sender() {
let mut f = MockTransactionFactory::default();
let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
let tx = f.validated_arc(MockTransaction::eip1559().inc_price());
pool.add_transaction(tx);
let sender: SenderId = 11.into();
let initial_submission_id = 1;
pool.add_sender_count(sender, initial_submission_id);
let new_submission_id = 2;
pool.add_sender_count(sender, new_submission_id);
assert_eq!(pool.sender_transaction_count.len(), 2);
let sender_info = pool.sender_transaction_count.get(&sender).unwrap();
assert_eq!(sender_info.count, 2);
assert_eq!(sender_info.last_submission_id, new_submission_id);
assert_eq!(pool.last_sender_submission.len(), 2);
let submission_info = pool.last_sender_submission.iter().next().unwrap();
assert_eq!(submission_info.sender_id, sender);
assert_eq!(submission_info.submission_id, new_submission_id);
}
#[test]
fn test_add_sender_count_multiple_senders() {
let mut f = MockTransactionFactory::default();
let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
let tx1 = f.validated_arc(MockTransaction::eip1559().inc_price());
let tx2 = f.validated_arc(MockTransaction::eip1559().inc_price());
pool.add_transaction(tx1);
pool.add_transaction(tx2);
let sender1: SenderId = 11.into();
let sender2: SenderId = 22.into();
pool.add_sender_count(sender1, 1);
pool.add_sender_count(sender2, 2);
assert_eq!(pool.sender_transaction_count.len(), 4);
let sender1_info = pool.sender_transaction_count.get(&sender1).unwrap();
assert_eq!(sender1_info.count, 1);
assert_eq!(sender1_info.last_submission_id, 1);
let sender2_info = pool.sender_transaction_count.get(&sender2).unwrap();
assert_eq!(sender2_info.count, 1);
assert_eq!(sender2_info.last_submission_id, 2);
assert_eq!(pool.last_sender_submission.len(), 3);
let submission_info1 =
pool.last_sender_submission.iter().find(|info| info.sender_id == sender1);
assert!(submission_info1.is_none());
let submission_info2 =
pool.last_sender_submission.iter().find(|info| info.sender_id == sender2).unwrap();
assert_eq!(submission_info2.sender_id, sender2);
assert_eq!(submission_info2.submission_id, 2);
}
#[test]
fn test_remove_sender_count() {
let mut f = MockTransactionFactory::default();
let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
let tx1 = f.validated_arc(MockTransaction::eip1559().inc_price());
let tx2 = f.validated_arc(MockTransaction::eip1559().inc_price());
pool.add_transaction(tx1);
pool.add_transaction(tx2);
let sender1: SenderId = 11.into();
let sender2: SenderId = 22.into();
pool.add_sender_count(sender1, 1);
pool.add_sender_count(sender2, 2);
pool.add_sender_count(sender2, 3);
assert_eq!(pool.sender_transaction_count.len(), 4);
assert!(pool.sender_transaction_count.contains_key(&sender1));
assert_eq!(pool.sender_transaction_count.get(&sender1).unwrap().count, 1);
pool.remove_sender_count(sender1);
assert_eq!(pool.sender_transaction_count.len(), 3);
assert!(!pool.sender_transaction_count.contains_key(&sender1));
assert_eq!(
*pool.sender_transaction_count.get(&sender2).unwrap(),
SenderTransactionCount { count: 2, last_submission_id: 3 }
);
pool.remove_sender_count(sender2);
assert_eq!(pool.sender_transaction_count.len(), 3);
assert!(pool.sender_transaction_count.contains_key(&sender2));
assert_eq!(
*pool.sender_transaction_count.get(&sender2).unwrap(),
SenderTransactionCount { count: 1, last_submission_id: 3 }
);
}
}