1use crate::{
2 error::{Eip4844PoolTransactionError, InvalidPoolTransactionError},
3 identifier::{SenderId, TransactionId},
4 pool::pending::PendingTransaction,
5 PoolTransaction, Priority, TransactionOrdering, ValidPoolTransaction,
6};
7use alloy_consensus::Transaction;
8use alloy_primitives::map::AddressSet;
9use core::fmt;
10use reth_primitives_traits::transaction::error::InvalidTransactionError;
11use std::{
12 collections::{BTreeMap, BTreeSet, HashSet, VecDeque},
13 sync::Arc,
14};
15use tokio::sync::broadcast::{error::TryRecvError, Receiver};
16use tracing::debug;
17
18const MAX_NEW_TRANSACTIONS_PER_BATCH: usize = 16;
19
20pub(crate) struct BestTransactionsWithFees<T: TransactionOrdering> {
28 pub(crate) best: BestTransactions<T>,
29 pub(crate) base_fee: u64,
30 pub(crate) base_fee_per_blob_gas: u64,
31}
32
33impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactionsWithFees<T> {
34 fn mark_invalid(&mut self, tx: &Self::Item, kind: &InvalidPoolTransactionError) {
35 BestTransactions::mark_invalid(&mut self.best, tx, kind)
36 }
37
38 fn no_updates(&mut self) {
39 self.best.no_updates()
40 }
41
42 fn skip_blobs(&mut self) {
43 self.set_skip_blobs(true)
44 }
45
46 fn set_skip_blobs(&mut self, skip_blobs: bool) {
47 self.best.set_skip_blobs(skip_blobs)
48 }
49}
50
51impl<T: TransactionOrdering> Iterator for BestTransactionsWithFees<T> {
52 type Item = Arc<ValidPoolTransaction<T::Transaction>>;
53
54 fn next(&mut self) -> Option<Self::Item> {
55 loop {
57 let best = Iterator::next(&mut self.best)?;
58 if best.transaction.max_fee_per_gas() >= self.base_fee as u128 &&
61 best.transaction
62 .max_fee_per_blob_gas()
63 .is_none_or(|fee| fee >= self.base_fee_per_blob_gas as u128)
64 {
65 return Some(best);
66 }
67 crate::traits::BestTransactions::mark_invalid(
68 self,
69 &best,
70 &InvalidPoolTransactionError::Underpriced,
71 );
72 }
73 }
74}
75
76#[derive(Debug)]
84pub struct BestTransactions<T: TransactionOrdering> {
85 pub(crate) all: BTreeMap<TransactionId, PendingTransaction<T>>,
88 pub(crate) independent: BTreeSet<PendingTransaction<T>>,
93 pub(crate) invalid: HashSet<SenderId>,
95 pub(crate) new_transaction_receiver: Option<Receiver<PendingTransaction<T>>>,
101 pub(crate) last_priority: Option<Priority<T::PriorityValue>>,
105 pub(crate) skip_blobs: bool,
107}
108
109impl<T: TransactionOrdering> BestTransactions<T> {
110 pub(crate) fn mark_invalid(
112 &mut self,
113 tx: &Arc<ValidPoolTransaction<T::Transaction>>,
114 _kind: &InvalidPoolTransactionError,
115 ) {
116 self.invalid.insert(tx.sender_id());
117 }
118
119 pub(crate) fn ancestor(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
124 self.all.get(&id.unchecked_ancestor()?)
125 }
126
127 fn try_recv(&mut self) -> Option<IncomingTransaction<T>> {
129 loop {
130 match self.new_transaction_receiver.as_mut()?.try_recv() {
131 Ok(tx) => {
132 if let Some(last_priority) = &self.last_priority &&
133 &tx.priority > last_priority
134 {
135 return Some(IncomingTransaction::Stash(tx))
138 }
139 return Some(IncomingTransaction::Process(tx))
140 }
141 Err(TryRecvError::Lagged(_)) => {
147 }
150
151 Err(_) => return None,
154 }
155 }
156 }
157
158 fn pop_best(&mut self) -> Option<PendingTransaction<T>> {
161 self.independent.pop_last().inspect(|best| {
162 self.all.remove(best.transaction.id());
163 })
164 }
165
166 fn add_new_transactions(&mut self) {
169 for _ in 0..MAX_NEW_TRANSACTIONS_PER_BATCH {
170 if let Some(pending_tx) = self.try_recv() {
171 match pending_tx {
174 IncomingTransaction::Process(tx) => {
175 let tx_id = *tx.transaction.id();
176 if self.ancestor(&tx_id).is_none() {
177 self.independent.insert(tx.clone());
178 }
179 self.all.insert(tx_id, tx);
180 }
181 IncomingTransaction::Stash(tx) => {
182 let tx_id = *tx.transaction.id();
183 self.all.insert(tx_id, tx);
184 }
185 }
186 } else {
187 break;
188 }
189 }
190 }
191
192 #[allow(clippy::type_complexity)]
194 pub fn next_tx_and_priority(
195 &mut self,
196 ) -> Option<(Arc<ValidPoolTransaction<T::Transaction>>, Priority<T::PriorityValue>)> {
197 loop {
198 self.add_new_transactions();
199 let best = self.pop_best()?;
201 let sender_id = best.transaction.sender_id();
202
203 if self.invalid.contains(&sender_id) {
205 debug!(
206 target: "txpool",
207 "[{:?}] skipping invalid transaction",
208 best.transaction.hash()
209 );
210 continue
211 }
212
213 if let Some(unlocked) = self.all.get(&best.unlocks()) {
215 self.independent.insert(unlocked.clone());
216 }
217
218 if self.skip_blobs && best.transaction.is_eip4844() {
219 self.mark_invalid(
222 &best.transaction,
223 &InvalidPoolTransactionError::Eip4844(
224 Eip4844PoolTransactionError::NoEip4844Blobs,
225 ),
226 )
227 } else {
228 if self.new_transaction_receiver.is_some() {
229 self.last_priority = Some(best.priority.clone())
230 }
231 return Some((best.transaction, best.priority))
232 }
233 }
234 }
235}
236
237enum IncomingTransaction<T: TransactionOrdering> {
242 Process(PendingTransaction<T>),
249
250 Stash(PendingTransaction<T>),
262}
263
264impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactions<T> {
265 fn mark_invalid(&mut self, tx: &Self::Item, kind: &InvalidPoolTransactionError) {
266 Self::mark_invalid(self, tx, kind)
267 }
268
269 fn no_updates(&mut self) {
270 self.new_transaction_receiver.take();
271 self.last_priority.take();
272 }
273
274 fn skip_blobs(&mut self) {
275 self.set_skip_blobs(true);
276 }
277
278 fn set_skip_blobs(&mut self, skip_blobs: bool) {
279 self.skip_blobs = skip_blobs;
280 }
281}
282
283impl<T: TransactionOrdering> Iterator for BestTransactions<T> {
284 type Item = Arc<ValidPoolTransaction<T::Transaction>>;
285
286 fn next(&mut self) -> Option<Self::Item> {
287 self.next_tx_and_priority().map(|(tx, _)| tx)
288 }
289}
290
291pub struct BestTransactionFilter<I, P> {
297 pub(crate) best: I,
298 pub(crate) predicate: P,
299}
300
301impl<I, P> BestTransactionFilter<I, P> {
302 pub const fn new(best: I, predicate: P) -> Self {
304 Self { best, predicate }
305 }
306}
307
308impl<I, P> Iterator for BestTransactionFilter<I, P>
309where
310 I: crate::traits::BestTransactions,
311 P: FnMut(&<I as Iterator>::Item) -> bool,
312{
313 type Item = <I as Iterator>::Item;
314
315 fn next(&mut self) -> Option<Self::Item> {
316 loop {
317 let best = self.best.next()?;
318 if (self.predicate)(&best) {
319 return Some(best)
320 }
321 self.best.mark_invalid(
322 &best,
323 &InvalidPoolTransactionError::Consensus(
324 InvalidTransactionError::TxTypeNotSupported,
325 ),
326 );
327 }
328 }
329}
330
331impl<I, P> crate::traits::BestTransactions for BestTransactionFilter<I, P>
332where
333 I: crate::traits::BestTransactions,
334 P: FnMut(&<I as Iterator>::Item) -> bool + Send,
335{
336 fn mark_invalid(&mut self, tx: &Self::Item, kind: &InvalidPoolTransactionError) {
337 crate::traits::BestTransactions::mark_invalid(&mut self.best, tx, kind)
338 }
339
340 fn no_updates(&mut self) {
341 self.best.no_updates()
342 }
343
344 fn skip_blobs(&mut self) {
345 self.set_skip_blobs(true)
346 }
347
348 fn set_skip_blobs(&mut self, skip_blobs: bool) {
349 self.best.set_skip_blobs(skip_blobs)
350 }
351}
352
353impl<I: fmt::Debug, P> fmt::Debug for BestTransactionFilter<I, P> {
354 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
355 f.debug_struct("BestTransactionFilter").field("best", &self.best).finish()
356 }
357}
358
359#[derive(Debug)]
362pub struct BestTransactionsWithPrioritizedSenders<I: Iterator> {
363 inner: I,
365 prioritized_senders: AddressSet,
367 max_prioritized_gas: u64,
369 buffer: VecDeque<I::Item>,
372 prioritized_gas: u64,
375}
376
377impl<I: Iterator> BestTransactionsWithPrioritizedSenders<I> {
378 pub fn new(prioritized_senders: AddressSet, max_prioritized_gas: u64, inner: I) -> Self {
380 Self {
381 inner,
382 prioritized_senders,
383 max_prioritized_gas,
384 buffer: Default::default(),
385 prioritized_gas: Default::default(),
386 }
387 }
388}
389
390impl<I, T> Iterator for BestTransactionsWithPrioritizedSenders<I>
391where
392 I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
393 T: PoolTransaction,
394{
395 type Item = <I as Iterator>::Item;
396
397 fn next(&mut self) -> Option<Self::Item> {
398 if self.prioritized_gas < self.max_prioritized_gas {
400 for item in &mut self.inner {
401 if self.prioritized_senders.contains(&item.transaction.sender()) &&
402 self.prioritized_gas + item.transaction.gas_limit() <=
403 self.max_prioritized_gas
404 {
405 self.prioritized_gas += item.transaction.gas_limit();
406 return Some(item)
407 }
408 self.buffer.push_back(item);
409 }
410 }
411
412 if let Some(item) = self.buffer.pop_front() {
413 Some(item)
414 } else {
415 self.inner.next()
416 }
417 }
418}
419
420impl<I, T> crate::traits::BestTransactions for BestTransactionsWithPrioritizedSenders<I>
421where
422 I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
423 T: PoolTransaction,
424{
425 fn mark_invalid(&mut self, tx: &Self::Item, kind: &InvalidPoolTransactionError) {
426 self.inner.mark_invalid(tx, kind)
427 }
428
429 fn no_updates(&mut self) {
430 self.inner.no_updates()
431 }
432
433 fn set_skip_blobs(&mut self, skip_blobs: bool) {
434 if skip_blobs {
435 self.buffer.retain(|tx| !tx.transaction.is_eip4844())
436 }
437 self.inner.set_skip_blobs(skip_blobs)
438 }
439}
440
441#[cfg(test)]
442mod tests {
443 use super::*;
444 use crate::{
445 pool::pending::PendingPool,
446 test_utils::{MockOrdering, MockTransaction, MockTransactionFactory},
447 BestTransactions, Priority,
448 };
449
450 #[test]
451 fn test_best_iter() {
452 let mut pool = PendingPool::new(MockOrdering::default());
453 let mut f = MockTransactionFactory::default();
454
455 let num_tx = 10;
456 let tx = MockTransaction::eip1559();
458 for nonce in 0..num_tx {
459 let tx = tx.clone().rng_hash().with_nonce(nonce);
460 let valid_tx = f.validated(tx);
461 pool.add_transaction(Arc::new(valid_tx), 0);
462 }
463
464 let mut best = pool.best();
465 assert_eq!(best.all.len(), num_tx as usize);
466 assert_eq!(best.independent.len(), 1);
467
468 for nonce in 0..num_tx {
470 assert_eq!(best.independent.len(), 1);
471 let tx = best.next().unwrap();
472 assert_eq!(tx.nonce(), nonce);
473 }
474 }
475
476 #[test]
477 fn test_best_iter_invalid() {
478 let mut pool = PendingPool::new(MockOrdering::default());
479 let mut f = MockTransactionFactory::default();
480
481 let num_tx = 10;
482 let tx = MockTransaction::eip1559();
484 for nonce in 0..num_tx {
485 let tx = tx.clone().rng_hash().with_nonce(nonce);
486 let valid_tx = f.validated(tx);
487 pool.add_transaction(Arc::new(valid_tx), 0);
488 }
489
490 let mut best = pool.best();
491
492 let invalid = best.independent.iter().next().unwrap();
494 best.mark_invalid(
495 &invalid.transaction.clone(),
496 &InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
497 );
498
499 assert!(best.next().is_none());
501 }
502
503 #[test]
504 fn test_best_transactions_iter_invalid() {
505 let mut pool = PendingPool::new(MockOrdering::default());
506 let mut f = MockTransactionFactory::default();
507
508 let num_tx = 10;
509 let tx = MockTransaction::eip1559();
511 for nonce in 0..num_tx {
512 let tx = tx.clone().rng_hash().with_nonce(nonce);
513 let valid_tx = f.validated(tx);
514 pool.add_transaction(Arc::new(valid_tx), 0);
515 }
516
517 let mut best: Box<
518 dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<MockTransaction>>>,
519 > = Box::new(pool.best());
520
521 let tx = Iterator::next(&mut best).unwrap();
522 crate::traits::BestTransactions::mark_invalid(
523 &mut *best,
524 &tx,
525 &InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
526 );
527 assert!(Iterator::next(&mut best).is_none());
528 }
529
530 #[test]
531 fn test_best_with_fees_iter_base_fee_satisfied() {
532 let mut pool = PendingPool::new(MockOrdering::default());
533 let mut f = MockTransactionFactory::default();
534
535 let num_tx = 5;
536 let base_fee: u64 = 10;
537 let base_fee_per_blob_gas: u64 = 15;
538
539 for nonce in 0..num_tx {
542 let tx = MockTransaction::eip1559()
543 .rng_hash()
544 .with_nonce(nonce)
545 .with_max_fee(base_fee as u128 + 5);
546 let valid_tx = f.validated(tx);
547 pool.add_transaction(Arc::new(valid_tx), 0);
548 }
549
550 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
551
552 for nonce in 0..num_tx {
553 let tx = best.next().expect("Transaction should be returned");
554 assert_eq!(tx.nonce(), nonce);
555 assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
556 }
557 }
558
559 #[test]
560 fn test_best_with_fees_iter_base_fee_violated() {
561 let mut pool = PendingPool::new(MockOrdering::default());
562 let mut f = MockTransactionFactory::default();
563
564 let num_tx = 5;
565 let base_fee: u64 = 20;
566 let base_fee_per_blob_gas: u64 = 15;
567
568 for nonce in 0..num_tx {
570 let tx = MockTransaction::eip1559()
571 .rng_hash()
572 .with_nonce(nonce)
573 .with_max_fee(base_fee as u128 - 5);
574 let valid_tx = f.validated(tx);
575 pool.add_transaction(Arc::new(valid_tx), 0);
576 }
577
578 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
579
580 assert!(best.next().is_none());
582 }
583
584 #[test]
585 fn test_best_with_fees_iter_blob_fee_satisfied() {
586 let mut pool = PendingPool::new(MockOrdering::default());
587 let mut f = MockTransactionFactory::default();
588
589 let num_tx = 5;
590 let base_fee: u64 = 10;
591 let base_fee_per_blob_gas: u64 = 20;
592
593 for nonce in 0..num_tx {
596 let tx = MockTransaction::eip4844()
597 .rng_hash()
598 .with_nonce(nonce)
599 .with_max_fee(base_fee as u128 + 5)
600 .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
601 let valid_tx = f.validated(tx);
602 pool.add_transaction(Arc::new(valid_tx), 0);
603 }
604
605 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
606
607 for nonce in 0..num_tx {
610 let tx = best.next().expect("Transaction should be returned");
611 assert_eq!(tx.nonce(), nonce);
612 assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
613 assert!(
614 tx.transaction.max_fee_per_blob_gas().unwrap() >= base_fee_per_blob_gas as u128
615 );
616 }
617
618 assert!(best.next().is_none());
620 }
621
622 #[test]
623 fn test_best_with_fees_iter_blob_fee_violated() {
624 let mut pool = PendingPool::new(MockOrdering::default());
625 let mut f = MockTransactionFactory::default();
626
627 let num_tx = 5;
628 let base_fee: u64 = 10;
629 let base_fee_per_blob_gas: u64 = 20;
630
631 for nonce in 0..num_tx {
633 let tx = MockTransaction::eip4844()
634 .rng_hash()
635 .with_nonce(nonce)
636 .with_max_fee(base_fee as u128 + 5)
637 .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
638 let valid_tx = f.validated(tx);
639 pool.add_transaction(Arc::new(valid_tx), 0);
640 }
641
642 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
643
644 assert!(best.next().is_none());
646 }
647
648 #[test]
649 fn test_best_with_fees_iter_mixed_fees() {
650 let mut pool = PendingPool::new(MockOrdering::default());
651 let mut f = MockTransactionFactory::default();
652
653 let base_fee: u64 = 10;
654 let base_fee_per_blob_gas: u64 = 20;
655
656 let tx1 =
658 MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
659 let tx2 = MockTransaction::eip4844()
660 .rng_hash()
661 .with_nonce(1)
662 .with_max_fee(base_fee as u128 + 5)
663 .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
664 let tx3 = MockTransaction::eip4844()
665 .rng_hash()
666 .with_nonce(2)
667 .with_max_fee(base_fee as u128 + 5)
668 .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
669 let tx4 =
670 MockTransaction::eip1559().rng_hash().with_nonce(3).with_max_fee(base_fee as u128 - 5);
671
672 pool.add_transaction(Arc::new(f.validated(tx1.clone())), 0);
673 pool.add_transaction(Arc::new(f.validated(tx2.clone())), 0);
674 pool.add_transaction(Arc::new(f.validated(tx3)), 0);
675 pool.add_transaction(Arc::new(f.validated(tx4)), 0);
676
677 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
678
679 let expected_order = vec![tx1, tx2];
680 for expected_tx in expected_order {
681 let tx = best.next().expect("Transaction should be returned");
682 assert_eq!(tx.transaction, expected_tx);
683 }
684
685 assert!(best.next().is_none());
687 }
688
689 #[test]
690 fn test_best_add_transaction_with_next_nonce() {
691 let mut pool = PendingPool::new(MockOrdering::default());
692 let mut f = MockTransactionFactory::default();
693
694 let num_tx = 5;
696 let tx = MockTransaction::eip1559();
697 for nonce in 0..num_tx {
698 let tx = tx.clone().rng_hash().with_nonce(nonce);
699 let valid_tx = f.validated(tx);
700 pool.add_transaction(Arc::new(valid_tx), 0);
701 }
702
703 let mut best = pool.best();
705
706 let (tx_sender, tx_receiver) =
708 tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
709 best.new_transaction_receiver = Some(tx_receiver);
710
711 let new_tx = MockTransaction::eip1559().rng_hash().with_nonce(5);
713 let valid_new_tx = f.validated(new_tx);
714
715 let pending_tx = PendingTransaction {
717 submission_id: 10,
718 transaction: Arc::new(valid_new_tx.clone()),
719 priority: Priority::Value(1000),
720 };
721 tx_sender.send(pending_tx.clone()).unwrap();
722
723 best.add_new_transactions();
725
726 assert_eq!(best.all.len(), 6);
728 assert!(best.all.contains_key(valid_new_tx.id()));
729
730 assert_eq!(best.independent.len(), 2);
732 assert!(best.independent.contains(&pending_tx));
733 }
734
735 #[test]
736 fn test_best_add_transaction_with_ancestor() {
737 let mut pool = PendingPool::new(MockOrdering::default());
739 let mut f = MockTransactionFactory::default();
740
741 let num_tx = 5;
743 let tx = MockTransaction::eip1559();
744 for nonce in 0..num_tx {
745 let tx = tx.clone().rng_hash().with_nonce(nonce);
746 let valid_tx = f.validated(tx);
747 pool.add_transaction(Arc::new(valid_tx), 0);
748 }
749
750 let mut best = pool.best();
752
753 let (tx_sender, tx_receiver) =
755 tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
756 best.new_transaction_receiver = Some(tx_receiver);
757
758 let base_tx1 = MockTransaction::eip1559().rng_hash().with_nonce(5);
760 let valid_new_tx1 = f.validated(base_tx1.clone());
761
762 let pending_tx1 = PendingTransaction {
764 submission_id: 10,
765 transaction: Arc::new(valid_new_tx1.clone()),
766 priority: Priority::Value(1000),
767 };
768 tx_sender.send(pending_tx1.clone()).unwrap();
769
770 best.add_new_transactions();
772
773 assert_eq!(best.all.len(), 6);
775 assert!(best.all.contains_key(valid_new_tx1.id()));
776
777 assert_eq!(best.independent.len(), 2);
779 assert!(best.independent.contains(&pending_tx1));
780
781 let base_tx2 = base_tx1.with_nonce(6);
783 let valid_new_tx2 = f.validated(base_tx2);
784
785 let pending_tx2 = PendingTransaction {
787 submission_id: 11, transaction: Arc::new(valid_new_tx2.clone()),
789 priority: Priority::Value(1000),
790 };
791 tx_sender.send(pending_tx2.clone()).unwrap();
792
793 best.add_new_transactions();
795
796 assert_eq!(best.all.len(), 7);
798 assert!(best.all.contains_key(valid_new_tx2.id()));
799
800 assert_eq!(best.independent.len(), 2);
802 assert!(!best.independent.contains(&pending_tx2));
803 }
804
805 #[test]
806 fn test_best_transactions_filter_trait_object() {
807 let mut pool = PendingPool::new(MockOrdering::default());
809 let mut f = MockTransactionFactory::default();
810
811 let num_tx = 5;
813 let tx = MockTransaction::eip1559();
814 for nonce in 0..num_tx {
815 let tx = tx.clone().rng_hash().with_nonce(nonce);
816 let valid_tx = f.validated(tx);
817 pool.add_transaction(Arc::new(valid_tx), 0);
818 }
819
820 let best: Box<dyn crate::traits::BestTransactions<Item = _>> = Box::new(pool.best());
822
823 let filter =
825 BestTransactionFilter::new(best, |tx: &Arc<ValidPoolTransaction<MockTransaction>>| {
826 tx.nonce().is_multiple_of(2)
827 });
828
829 for tx in filter {
831 assert_eq!(tx.nonce() % 2, 0);
832 }
833 }
834
835 #[test]
836 fn test_best_transactions_prioritized_senders() {
837 let mut pool = PendingPool::new(MockOrdering::default());
838 let mut f = MockTransactionFactory::default();
839
840 for gas_price in 0..5 {
842 let tx = MockTransaction::eip1559().with_gas_price((gas_price + 1) * 10);
843 let valid_tx = f.validated(tx);
844 pool.add_transaction(Arc::new(valid_tx), 0);
845 }
846
847 let prioritized_tx = MockTransaction::eip1559().with_gas_price(5).with_gas_limit(200);
849 let valid_prioritized_tx = f.validated(prioritized_tx.clone());
850 pool.add_transaction(Arc::new(valid_prioritized_tx), 0);
851
852 let prioritized_tx2 = MockTransaction::eip1559().with_gas_price(3);
855 let valid_prioritized_tx2 = f.validated(prioritized_tx2.clone());
856 pool.add_transaction(Arc::new(valid_prioritized_tx2), 0);
857
858 let prioritized_senders =
859 AddressSet::from_iter([prioritized_tx.sender(), prioritized_tx2.sender()]);
860 let best =
861 BestTransactionsWithPrioritizedSenders::new(prioritized_senders, 200, pool.best());
862
863 let mut iter = best.into_iter();
866 let top_of_block_tx = iter.next().unwrap();
867 assert_eq!(top_of_block_tx.max_fee_per_gas(), 5);
868 assert_eq!(top_of_block_tx.sender(), prioritized_tx.sender());
869 for gas_price in (0..5).rev() {
870 assert_eq!(iter.next().unwrap().max_fee_per_gas(), (gas_price + 1) * 10);
871 }
872
873 let top_of_block_tx2 = iter.next().unwrap();
876 assert_eq!(top_of_block_tx2.max_fee_per_gas(), 3);
877 assert_eq!(top_of_block_tx2.sender(), prioritized_tx2.sender());
878 }
879
880 #[test]
881 fn test_best_with_fees_iter_no_blob_fee_required() {
882 let mut pool = PendingPool::new(MockOrdering::default());
884 let mut f = MockTransactionFactory::default();
885
886 let base_fee: u64 = 10;
887 let base_fee_per_blob_gas: u64 = 0; for nonce in 0..5 {
891 let tx = MockTransaction::eip1559()
892 .rng_hash()
893 .with_nonce(nonce)
894 .with_max_fee(base_fee as u128 + 5);
895 let valid_tx = f.validated(tx);
896 pool.add_transaction(Arc::new(valid_tx), 0);
897 }
898
899 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
900
901 for nonce in 0..5 {
903 let tx = best.next().expect("Transaction should be returned");
904 assert_eq!(tx.nonce(), nonce);
905 }
906
907 assert!(best.next().is_none());
909 }
910
911 #[test]
912 fn test_best_with_fees_iter_mix_of_blob_and_non_blob_transactions() {
913 let mut pool = PendingPool::new(MockOrdering::default());
915 let mut f = MockTransactionFactory::default();
916
917 let base_fee: u64 = 10;
918 let base_fee_per_blob_gas: u64 = 15;
919
920 let tx_non_blob =
922 MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
923 pool.add_transaction(Arc::new(f.validated(tx_non_blob.clone())), 0);
924
925 let tx_blob = MockTransaction::eip4844()
927 .rng_hash()
928 .with_nonce(1)
929 .with_max_fee(base_fee as u128 + 5)
930 .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
931 pool.add_transaction(Arc::new(f.validated(tx_blob.clone())), 0);
932
933 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
934
935 let tx = best.next().expect("Transaction should be returned");
937 assert_eq!(tx.transaction, tx_non_blob);
938
939 let tx = best.next().expect("Transaction should be returned");
940 assert_eq!(tx.transaction, tx_blob);
941
942 assert!(best.next().is_none());
944 }
945
946 #[test]
947 fn test_best_transactions_with_skipping_blobs() {
948 let mut pool = PendingPool::new(MockOrdering::default());
950 let mut f = MockTransactionFactory::default();
951
952 let tx_blob = MockTransaction::eip4844().rng_hash().with_nonce(0).with_blob_fee(100);
954 let valid_blob_tx = f.validated(tx_blob);
955 pool.add_transaction(Arc::new(valid_blob_tx), 0);
956
957 let tx_non_blob = MockTransaction::eip1559().rng_hash().with_nonce(1).with_max_fee(200);
959 let valid_non_blob_tx = f.validated(tx_non_blob.clone());
960 pool.add_transaction(Arc::new(valid_non_blob_tx), 0);
961
962 let mut best = pool.best();
963 best.skip_blobs();
964
965 let tx = best.next().expect("Transaction should be returned");
967 assert_eq!(tx.transaction, tx_non_blob);
968
969 assert!(best.next().is_none());
971 }
972
973 #[test]
974 fn test_best_transactions_no_updates() {
975 let mut pool = PendingPool::new(MockOrdering::default());
978 let mut f = MockTransactionFactory::default();
979
980 let tx = MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(100);
982 let valid_tx = f.validated(tx);
983 pool.add_transaction(Arc::new(valid_tx), 0);
984
985 let mut best = pool.best();
986
987 let (_tx_sender, tx_receiver) =
989 tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
990 best.new_transaction_receiver = Some(tx_receiver);
991
992 assert!(best.new_transaction_receiver.is_some());
994
995 best.no_updates();
997
998 assert!(best.new_transaction_receiver.is_none());
1000 }
1001
1002 #[test]
1003 fn test_best_update_transaction_priority() {
1004 let mut pool = PendingPool::new(MockOrdering::default());
1005 let mut f = MockTransactionFactory::default();
1006
1007 let num_tx = 5;
1009 let tx = MockTransaction::eip1559();
1010 for nonce in 0..num_tx {
1011 let tx = tx.clone().rng_hash().with_nonce(nonce);
1012 let valid_tx = f.validated(tx);
1013 pool.add_transaction(Arc::new(valid_tx), 0);
1014 }
1015
1016 let mut best = pool.best();
1018
1019 let (tx_sender, tx_receiver) =
1021 tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
1022 best.new_transaction_receiver = Some(tx_receiver);
1023
1024 let first = best.next().unwrap();
1026
1027 let new_higher_fee_tx = MockTransaction::eip1559().with_nonce(0);
1029 let valid_new_higher_fee_tx = f.validated(new_higher_fee_tx);
1030
1031 let pending_tx = PendingTransaction {
1033 submission_id: 10,
1034 transaction: Arc::new(valid_new_higher_fee_tx.clone()),
1035 priority: Priority::Value(u128::MAX),
1036 };
1037 tx_sender.send(pending_tx).unwrap();
1038
1039 for tx in best {
1041 assert_eq!(tx.sender_id(), first.sender_id());
1042 assert_ne!(tx.sender_id(), valid_new_higher_fee_tx.sender_id());
1043 }
1044 }
1045
1046 #[test]
1051 fn test_blob_transaction_ordering_multiple_clients_shape() {
1052 let mut pool = PendingPool::new(MockOrdering::default());
1053 let mut f = MockTransactionFactory::default();
1054
1055 let base_fee: u64 = 10;
1056 let base_fee_per_blob_gas: u64 = 1;
1057 let max_blob_count: u64 = 6;
1058
1059 let sender_a = MockTransaction::eip4844()
1060 .with_blob_hashes(5)
1061 .with_max_fee(base_fee as u128 + 20)
1062 .with_priority_fee(base_fee as u128 + 20)
1063 .with_blob_fee(120);
1064 for nonce in 0..5u64 {
1065 let tx = sender_a.clone().rng_hash().with_nonce(nonce);
1066 pool.add_transaction(Arc::new(f.validated(tx)), 0);
1067 }
1068
1069 let sender_b = MockTransaction::eip4844()
1070 .with_blob_hashes(1)
1071 .with_max_fee(base_fee as u128 + 20)
1072 .with_priority_fee(base_fee as u128 + 20)
1073 .with_blob_fee(100);
1074 for nonce in 0..5u64 {
1075 let tx = sender_b.clone().rng_hash().with_nonce(nonce);
1076 pool.add_transaction(Arc::new(f.validated(tx)), 0);
1077 }
1078
1079 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
1080 let mut block_blob_count = 0u64;
1081 let mut included_txs = 0u64;
1082
1083 while let Some(tx) = best.next() {
1084 if let Some(blob_hashes) = tx.transaction.blob_versioned_hashes() {
1085 let tx_blob_count = blob_hashes.len() as u64;
1086
1087 if block_blob_count + tx_blob_count > max_blob_count {
1088 crate::traits::BestTransactions::mark_invalid(
1089 &mut best,
1090 &tx,
1091 &InvalidPoolTransactionError::Eip4844(
1092 Eip4844PoolTransactionError::TooManyEip4844Blobs {
1093 have: block_blob_count + tx_blob_count,
1094 permitted: max_blob_count,
1095 },
1096 ),
1097 );
1098 continue;
1099 }
1100
1101 block_blob_count += tx_blob_count;
1102 included_txs += 1;
1103
1104 if block_blob_count == max_blob_count {
1105 best.skip_blobs();
1106 break;
1107 }
1108 }
1109 }
1110
1111 assert_eq!(
1112 block_blob_count, max_blob_count,
1113 "expected a full blob block (5+1 blobs across senders)"
1114 );
1115 assert_eq!(included_txs, 2, "expected one 5-blob tx and one 1-blob tx in the block");
1116 }
1117}