1use crate::{
2 error::{Eip4844PoolTransactionError, InvalidPoolTransactionError},
3 identifier::{SenderId, TransactionId},
4 pool::pending::PendingTransaction,
5 PoolTransaction, Priority, TransactionOrdering, ValidPoolTransaction,
6};
7use alloy_consensus::Transaction;
8use alloy_primitives::map::AddressSet;
9use core::fmt;
10use imbl::OrdMap;
11use reth_primitives_traits::transaction::error::InvalidTransactionError;
12use std::{
13 collections::{BTreeSet, HashSet, VecDeque},
14 sync::Arc,
15};
16use tokio::sync::broadcast::{error::TryRecvError, Receiver};
17use tracing::debug;
18
19const MAX_NEW_TRANSACTIONS_PER_BATCH: usize = 16;
20
21pub(crate) struct BestTransactionsWithFees<T: TransactionOrdering> {
29 pub(crate) best: BestTransactions<T>,
30 pub(crate) base_fee: u64,
31 pub(crate) base_fee_per_blob_gas: u64,
32}
33
34impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactionsWithFees<T> {
35 fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
36 BestTransactions::mark_invalid(&mut self.best, tx, kind)
37 }
38
39 fn no_updates(&mut self) {
40 self.best.no_updates()
41 }
42
43 fn skip_blobs(&mut self) {
44 self.set_skip_blobs(true)
45 }
46
47 fn set_skip_blobs(&mut self, skip_blobs: bool) {
48 self.best.set_skip_blobs(skip_blobs)
49 }
50}
51
52impl<T: TransactionOrdering> Iterator for BestTransactionsWithFees<T> {
53 type Item = Arc<ValidPoolTransaction<T::Transaction>>;
54
55 fn next(&mut self) -> Option<Self::Item> {
56 loop {
58 let best = Iterator::next(&mut self.best)?;
59 if best.transaction.max_fee_per_gas() >= self.base_fee as u128 &&
62 best.transaction
63 .max_fee_per_blob_gas()
64 .is_none_or(|fee| fee >= self.base_fee_per_blob_gas as u128)
65 {
66 return Some(best);
67 }
68 crate::traits::BestTransactions::mark_invalid(
69 self,
70 &best,
71 InvalidPoolTransactionError::Underpriced,
72 );
73 }
74 }
75}
76
77#[derive(Debug)]
85pub struct BestTransactions<T: TransactionOrdering> {
86 pub(crate) all: OrdMap<TransactionId, PendingTransaction<T>>,
89 pub(crate) independent: BTreeSet<PendingTransaction<T>>,
94 pub(crate) invalid: HashSet<SenderId>,
96 pub(crate) new_transaction_receiver: Option<Receiver<PendingTransaction<T>>>,
102 pub(crate) last_priority: Option<Priority<T::PriorityValue>>,
106 pub(crate) skip_blobs: bool,
108}
109
110impl<T: TransactionOrdering> BestTransactions<T> {
111 pub(crate) fn mark_invalid(
113 &mut self,
114 tx: &Arc<ValidPoolTransaction<T::Transaction>>,
115 _kind: InvalidPoolTransactionError,
116 ) {
117 self.invalid.insert(tx.sender_id());
118 }
119
120 pub(crate) fn ancestor(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
125 self.all.get(&id.unchecked_ancestor()?)
126 }
127
128 fn try_recv(&mut self) -> Option<IncomingTransaction<T>> {
130 loop {
131 match self.new_transaction_receiver.as_mut()?.try_recv() {
132 Ok(tx) => {
133 if let Some(last_priority) = &self.last_priority &&
134 &tx.priority > last_priority
135 {
136 return Some(IncomingTransaction::Stash(tx))
139 }
140 return Some(IncomingTransaction::Process(tx))
141 }
142 Err(TryRecvError::Lagged(_)) => {
148 }
151
152 Err(_) => return None,
155 }
156 }
157 }
158
159 fn pop_best(&mut self) -> Option<PendingTransaction<T>> {
162 self.independent.pop_last().inspect(|best| {
163 self.all.remove(best.transaction.id());
164 })
165 }
166
167 fn add_new_transactions(&mut self) {
170 for _ in 0..MAX_NEW_TRANSACTIONS_PER_BATCH {
171 if let Some(pending_tx) = self.try_recv() {
172 match pending_tx {
175 IncomingTransaction::Process(tx) => {
176 let tx_id = *tx.transaction.id();
177 if self.ancestor(&tx_id).is_none() {
178 self.independent.insert(tx.clone());
179 }
180 self.all.insert(tx_id, tx);
181 }
182 IncomingTransaction::Stash(tx) => {
183 let tx_id = *tx.transaction.id();
184 self.all.insert(tx_id, tx);
185 }
186 }
187 } else {
188 break;
189 }
190 }
191 }
192
193 #[expect(clippy::type_complexity)]
195 pub fn next_tx_and_priority(
196 &mut self,
197 ) -> Option<(Arc<ValidPoolTransaction<T::Transaction>>, Priority<T::PriorityValue>)> {
198 loop {
199 self.add_new_transactions();
200 let best = self.pop_best()?;
202 let sender_id = best.transaction.sender_id();
203
204 if self.invalid.contains(&sender_id) {
206 debug!(
207 target: "txpool",
208 "[{:?}] skipping invalid transaction",
209 best.transaction.hash()
210 );
211 continue
212 }
213
214 if let Some(unlocked) = self.all.get(&best.unlocks()) {
216 self.independent.insert(unlocked.clone());
217 }
218
219 if self.skip_blobs && best.transaction.is_eip4844() {
220 self.mark_invalid(
223 &best.transaction,
224 InvalidPoolTransactionError::Eip4844(
225 Eip4844PoolTransactionError::NoEip4844Blobs,
226 ),
227 )
228 } else {
229 if self.new_transaction_receiver.is_some() {
230 self.last_priority = Some(best.priority.clone())
231 }
232 return Some((best.transaction, best.priority))
233 }
234 }
235 }
236}
237
238enum IncomingTransaction<T: TransactionOrdering> {
243 Process(PendingTransaction<T>),
250
251 Stash(PendingTransaction<T>),
263}
264
265impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactions<T> {
266 fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
267 Self::mark_invalid(self, tx, kind)
268 }
269
270 fn no_updates(&mut self) {
271 self.new_transaction_receiver.take();
272 self.last_priority.take();
273 }
274
275 fn skip_blobs(&mut self) {
276 self.set_skip_blobs(true);
277 }
278
279 fn set_skip_blobs(&mut self, skip_blobs: bool) {
280 self.skip_blobs = skip_blobs;
281 }
282}
283
284impl<T: TransactionOrdering> Iterator for BestTransactions<T> {
285 type Item = Arc<ValidPoolTransaction<T::Transaction>>;
286
287 fn next(&mut self) -> Option<Self::Item> {
288 self.next_tx_and_priority().map(|(tx, _)| tx)
289 }
290}
291
292pub struct BestTransactionFilter<I, P> {
298 pub(crate) best: I,
299 pub(crate) predicate: P,
300}
301
302impl<I, P> BestTransactionFilter<I, P> {
303 pub const fn new(best: I, predicate: P) -> Self {
305 Self { best, predicate }
306 }
307}
308
309impl<I, P> Iterator for BestTransactionFilter<I, P>
310where
311 I: crate::traits::BestTransactions,
312 P: FnMut(&<I as Iterator>::Item) -> bool,
313{
314 type Item = <I as Iterator>::Item;
315
316 fn next(&mut self) -> Option<Self::Item> {
317 loop {
318 let best = self.best.next()?;
319 if (self.predicate)(&best) {
320 return Some(best)
321 }
322 self.best.mark_invalid(
323 &best,
324 InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
325 );
326 }
327 }
328}
329
330impl<I, P> crate::traits::BestTransactions for BestTransactionFilter<I, P>
331where
332 I: crate::traits::BestTransactions,
333 P: FnMut(&<I as Iterator>::Item) -> bool + Send,
334{
335 fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
336 crate::traits::BestTransactions::mark_invalid(&mut self.best, tx, kind)
337 }
338
339 fn no_updates(&mut self) {
340 self.best.no_updates()
341 }
342
343 fn skip_blobs(&mut self) {
344 self.set_skip_blobs(true)
345 }
346
347 fn set_skip_blobs(&mut self, skip_blobs: bool) {
348 self.best.set_skip_blobs(skip_blobs)
349 }
350}
351
352impl<I: fmt::Debug, P> fmt::Debug for BestTransactionFilter<I, P> {
353 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
354 f.debug_struct("BestTransactionFilter").field("best", &self.best).finish()
355 }
356}
357
358#[derive(Debug)]
361pub struct BestTransactionsWithPrioritizedSenders<I: Iterator> {
362 inner: I,
364 prioritized_senders: AddressSet,
366 max_prioritized_gas: u64,
368 buffer: VecDeque<I::Item>,
371 prioritized_gas: u64,
374}
375
376impl<I: Iterator> BestTransactionsWithPrioritizedSenders<I> {
377 pub fn new(prioritized_senders: AddressSet, max_prioritized_gas: u64, inner: I) -> Self {
379 Self {
380 inner,
381 prioritized_senders,
382 max_prioritized_gas,
383 buffer: Default::default(),
384 prioritized_gas: Default::default(),
385 }
386 }
387}
388
389impl<I, T> Iterator for BestTransactionsWithPrioritizedSenders<I>
390where
391 I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
392 T: PoolTransaction,
393{
394 type Item = <I as Iterator>::Item;
395
396 fn next(&mut self) -> Option<Self::Item> {
397 if self.prioritized_gas < self.max_prioritized_gas {
399 for item in &mut self.inner {
400 if self.prioritized_senders.contains(&item.transaction.sender()) &&
401 self.prioritized_gas + item.transaction.gas_limit() <=
402 self.max_prioritized_gas
403 {
404 self.prioritized_gas += item.transaction.gas_limit();
405 return Some(item)
406 }
407 self.buffer.push_back(item);
408 }
409 }
410
411 if let Some(item) = self.buffer.pop_front() {
412 Some(item)
413 } else {
414 self.inner.next()
415 }
416 }
417}
418
419impl<I, T> crate::traits::BestTransactions for BestTransactionsWithPrioritizedSenders<I>
420where
421 I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
422 T: PoolTransaction,
423{
424 fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
425 self.inner.mark_invalid(tx, kind)
426 }
427
428 fn no_updates(&mut self) {
429 self.inner.no_updates()
430 }
431
432 fn set_skip_blobs(&mut self, skip_blobs: bool) {
433 if skip_blobs {
434 self.buffer.retain(|tx| !tx.transaction.is_eip4844())
435 }
436 self.inner.set_skip_blobs(skip_blobs)
437 }
438}
439
440#[cfg(test)]
441mod tests {
442 use super::*;
443 use crate::{
444 pool::pending::PendingPool,
445 test_utils::{MockOrdering, MockTransaction, MockTransactionFactory},
446 BestTransactions, Priority,
447 };
448
449 #[test]
450 fn test_best_iter() {
451 let mut pool = PendingPool::new(MockOrdering::default());
452 let mut f = MockTransactionFactory::default();
453
454 let num_tx = 10;
455 let tx = MockTransaction::eip1559();
457 for nonce in 0..num_tx {
458 let tx = tx.clone().rng_hash().with_nonce(nonce);
459 let valid_tx = f.validated(tx);
460 pool.add_transaction(Arc::new(valid_tx), 0);
461 }
462
463 let mut best = pool.best();
464 assert_eq!(best.all.len(), num_tx as usize);
465 assert_eq!(best.independent.len(), 1);
466
467 for nonce in 0..num_tx {
469 assert_eq!(best.independent.len(), 1);
470 let tx = best.next().unwrap();
471 assert_eq!(tx.nonce(), nonce);
472 }
473 }
474
475 #[test]
476 fn test_best_iter_invalid() {
477 let mut pool = PendingPool::new(MockOrdering::default());
478 let mut f = MockTransactionFactory::default();
479
480 let num_tx = 10;
481 let tx = MockTransaction::eip1559();
483 for nonce in 0..num_tx {
484 let tx = tx.clone().rng_hash().with_nonce(nonce);
485 let valid_tx = f.validated(tx);
486 pool.add_transaction(Arc::new(valid_tx), 0);
487 }
488
489 let mut best = pool.best();
490
491 let invalid = best.independent.iter().next().unwrap();
493 best.mark_invalid(
494 &invalid.transaction.clone(),
495 InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
496 );
497
498 assert!(best.next().is_none());
500 }
501
502 #[test]
503 fn test_best_transactions_iter_invalid() {
504 let mut pool = PendingPool::new(MockOrdering::default());
505 let mut f = MockTransactionFactory::default();
506
507 let num_tx = 10;
508 let tx = MockTransaction::eip1559();
510 for nonce in 0..num_tx {
511 let tx = tx.clone().rng_hash().with_nonce(nonce);
512 let valid_tx = f.validated(tx);
513 pool.add_transaction(Arc::new(valid_tx), 0);
514 }
515
516 let mut best: Box<
517 dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<MockTransaction>>>,
518 > = Box::new(pool.best());
519
520 let tx = Iterator::next(&mut best).unwrap();
521 crate::traits::BestTransactions::mark_invalid(
522 &mut *best,
523 &tx,
524 InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
525 );
526 assert!(Iterator::next(&mut best).is_none());
527 }
528
529 #[test]
530 fn test_best_with_fees_iter_base_fee_satisfied() {
531 let mut pool = PendingPool::new(MockOrdering::default());
532 let mut f = MockTransactionFactory::default();
533
534 let num_tx = 5;
535 let base_fee: u64 = 10;
536 let base_fee_per_blob_gas: u64 = 15;
537
538 for nonce in 0..num_tx {
541 let tx = MockTransaction::eip1559()
542 .rng_hash()
543 .with_nonce(nonce)
544 .with_max_fee(base_fee as u128 + 5);
545 let valid_tx = f.validated(tx);
546 pool.add_transaction(Arc::new(valid_tx), 0);
547 }
548
549 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
550
551 for nonce in 0..num_tx {
552 let tx = best.next().expect("Transaction should be returned");
553 assert_eq!(tx.nonce(), nonce);
554 assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
555 }
556 }
557
558 #[test]
559 fn test_best_with_fees_iter_base_fee_violated() {
560 let mut pool = PendingPool::new(MockOrdering::default());
561 let mut f = MockTransactionFactory::default();
562
563 let num_tx = 5;
564 let base_fee: u64 = 20;
565 let base_fee_per_blob_gas: u64 = 15;
566
567 for nonce in 0..num_tx {
569 let tx = MockTransaction::eip1559()
570 .rng_hash()
571 .with_nonce(nonce)
572 .with_max_fee(base_fee as u128 - 5);
573 let valid_tx = f.validated(tx);
574 pool.add_transaction(Arc::new(valid_tx), 0);
575 }
576
577 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
578
579 assert!(best.next().is_none());
581 }
582
583 #[test]
584 fn test_best_with_fees_iter_blob_fee_satisfied() {
585 let mut pool = PendingPool::new(MockOrdering::default());
586 let mut f = MockTransactionFactory::default();
587
588 let num_tx = 5;
589 let base_fee: u64 = 10;
590 let base_fee_per_blob_gas: u64 = 20;
591
592 for nonce in 0..num_tx {
595 let tx = MockTransaction::eip4844()
596 .rng_hash()
597 .with_nonce(nonce)
598 .with_max_fee(base_fee as u128 + 5)
599 .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
600 let valid_tx = f.validated(tx);
601 pool.add_transaction(Arc::new(valid_tx), 0);
602 }
603
604 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
605
606 for nonce in 0..num_tx {
609 let tx = best.next().expect("Transaction should be returned");
610 assert_eq!(tx.nonce(), nonce);
611 assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
612 assert!(
613 tx.transaction.max_fee_per_blob_gas().unwrap() >= base_fee_per_blob_gas as u128
614 );
615 }
616
617 assert!(best.next().is_none());
619 }
620
621 #[test]
622 fn test_best_with_fees_iter_blob_fee_violated() {
623 let mut pool = PendingPool::new(MockOrdering::default());
624 let mut f = MockTransactionFactory::default();
625
626 let num_tx = 5;
627 let base_fee: u64 = 10;
628 let base_fee_per_blob_gas: u64 = 20;
629
630 for nonce in 0..num_tx {
632 let tx = MockTransaction::eip4844()
633 .rng_hash()
634 .with_nonce(nonce)
635 .with_max_fee(base_fee as u128 + 5)
636 .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
637 let valid_tx = f.validated(tx);
638 pool.add_transaction(Arc::new(valid_tx), 0);
639 }
640
641 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
642
643 assert!(best.next().is_none());
645 }
646
647 #[test]
648 fn test_best_with_fees_iter_mixed_fees() {
649 let mut pool = PendingPool::new(MockOrdering::default());
650 let mut f = MockTransactionFactory::default();
651
652 let base_fee: u64 = 10;
653 let base_fee_per_blob_gas: u64 = 20;
654
655 let tx1 =
657 MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
658 let tx2 = MockTransaction::eip4844()
659 .rng_hash()
660 .with_nonce(1)
661 .with_max_fee(base_fee as u128 + 5)
662 .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
663 let tx3 = MockTransaction::eip4844()
664 .rng_hash()
665 .with_nonce(2)
666 .with_max_fee(base_fee as u128 + 5)
667 .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
668 let tx4 =
669 MockTransaction::eip1559().rng_hash().with_nonce(3).with_max_fee(base_fee as u128 - 5);
670
671 pool.add_transaction(Arc::new(f.validated(tx1.clone())), 0);
672 pool.add_transaction(Arc::new(f.validated(tx2.clone())), 0);
673 pool.add_transaction(Arc::new(f.validated(tx3)), 0);
674 pool.add_transaction(Arc::new(f.validated(tx4)), 0);
675
676 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
677
678 let expected_order = vec![tx1, tx2];
679 for expected_tx in expected_order {
680 let tx = best.next().expect("Transaction should be returned");
681 assert_eq!(tx.transaction, expected_tx);
682 }
683
684 assert!(best.next().is_none());
686 }
687
688 #[test]
689 fn test_best_add_transaction_with_next_nonce() {
690 let mut pool = PendingPool::new(MockOrdering::default());
691 let mut f = MockTransactionFactory::default();
692
693 let num_tx = 5;
695 let tx = MockTransaction::eip1559();
696 for nonce in 0..num_tx {
697 let tx = tx.clone().rng_hash().with_nonce(nonce);
698 let valid_tx = f.validated(tx);
699 pool.add_transaction(Arc::new(valid_tx), 0);
700 }
701
702 let mut best = pool.best();
704
705 let (tx_sender, tx_receiver) =
707 tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
708 best.new_transaction_receiver = Some(tx_receiver);
709
710 let new_tx = MockTransaction::eip1559().rng_hash().with_nonce(5);
712 let valid_new_tx = f.validated(new_tx);
713
714 let pending_tx = PendingTransaction {
716 submission_id: 10,
717 transaction: Arc::new(valid_new_tx.clone()),
718 priority: Priority::Value(1000),
719 };
720 tx_sender.send(pending_tx.clone()).unwrap();
721
722 best.add_new_transactions();
724
725 assert_eq!(best.all.len(), 6);
727 assert!(best.all.contains_key(valid_new_tx.id()));
728
729 assert_eq!(best.independent.len(), 2);
731 assert!(best.independent.contains(&pending_tx));
732 }
733
734 #[test]
735 fn test_best_add_transaction_with_ancestor() {
736 let mut pool = PendingPool::new(MockOrdering::default());
738 let mut f = MockTransactionFactory::default();
739
740 let num_tx = 5;
742 let tx = MockTransaction::eip1559();
743 for nonce in 0..num_tx {
744 let tx = tx.clone().rng_hash().with_nonce(nonce);
745 let valid_tx = f.validated(tx);
746 pool.add_transaction(Arc::new(valid_tx), 0);
747 }
748
749 let mut best = pool.best();
751
752 let (tx_sender, tx_receiver) =
754 tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
755 best.new_transaction_receiver = Some(tx_receiver);
756
757 let base_tx1 = MockTransaction::eip1559().rng_hash().with_nonce(5);
759 let valid_new_tx1 = f.validated(base_tx1.clone());
760
761 let pending_tx1 = PendingTransaction {
763 submission_id: 10,
764 transaction: Arc::new(valid_new_tx1.clone()),
765 priority: Priority::Value(1000),
766 };
767 tx_sender.send(pending_tx1.clone()).unwrap();
768
769 best.add_new_transactions();
771
772 assert_eq!(best.all.len(), 6);
774 assert!(best.all.contains_key(valid_new_tx1.id()));
775
776 assert_eq!(best.independent.len(), 2);
778 assert!(best.independent.contains(&pending_tx1));
779
780 let base_tx2 = base_tx1.with_nonce(6);
782 let valid_new_tx2 = f.validated(base_tx2);
783
784 let pending_tx2 = PendingTransaction {
786 submission_id: 11, transaction: Arc::new(valid_new_tx2.clone()),
788 priority: Priority::Value(1000),
789 };
790 tx_sender.send(pending_tx2.clone()).unwrap();
791
792 best.add_new_transactions();
794
795 assert_eq!(best.all.len(), 7);
797 assert!(best.all.contains_key(valid_new_tx2.id()));
798
799 assert_eq!(best.independent.len(), 2);
801 assert!(!best.independent.contains(&pending_tx2));
802 }
803
804 #[test]
805 fn test_best_transactions_filter_trait_object() {
806 let mut pool = PendingPool::new(MockOrdering::default());
808 let mut f = MockTransactionFactory::default();
809
810 let num_tx = 5;
812 let tx = MockTransaction::eip1559();
813 for nonce in 0..num_tx {
814 let tx = tx.clone().rng_hash().with_nonce(nonce);
815 let valid_tx = f.validated(tx);
816 pool.add_transaction(Arc::new(valid_tx), 0);
817 }
818
819 let best: Box<dyn crate::traits::BestTransactions<Item = _>> = Box::new(pool.best());
821
822 let filter =
824 BestTransactionFilter::new(best, |tx: &Arc<ValidPoolTransaction<MockTransaction>>| {
825 tx.nonce().is_multiple_of(2)
826 });
827
828 for tx in filter {
830 assert_eq!(tx.nonce() % 2, 0);
831 }
832 }
833
834 #[test]
835 fn test_best_transactions_prioritized_senders() {
836 let mut pool = PendingPool::new(MockOrdering::default());
837 let mut f = MockTransactionFactory::default();
838
839 for gas_price in 0..5 {
841 let tx = MockTransaction::eip1559().with_gas_price((gas_price + 1) * 10);
842 let valid_tx = f.validated(tx);
843 pool.add_transaction(Arc::new(valid_tx), 0);
844 }
845
846 let prioritized_tx = MockTransaction::eip1559().with_gas_price(5).with_gas_limit(200);
848 let valid_prioritized_tx = f.validated(prioritized_tx.clone());
849 pool.add_transaction(Arc::new(valid_prioritized_tx), 0);
850
851 let prioritized_tx2 = MockTransaction::eip1559().with_gas_price(3);
854 let valid_prioritized_tx2 = f.validated(prioritized_tx2.clone());
855 pool.add_transaction(Arc::new(valid_prioritized_tx2), 0);
856
857 let prioritized_senders =
858 AddressSet::from_iter([prioritized_tx.sender(), prioritized_tx2.sender()]);
859 let best =
860 BestTransactionsWithPrioritizedSenders::new(prioritized_senders, 200, pool.best());
861
862 let mut iter = best.into_iter();
865 let top_of_block_tx = iter.next().unwrap();
866 assert_eq!(top_of_block_tx.max_fee_per_gas(), 5);
867 assert_eq!(top_of_block_tx.sender(), prioritized_tx.sender());
868 for gas_price in (0..5).rev() {
869 assert_eq!(iter.next().unwrap().max_fee_per_gas(), (gas_price + 1) * 10);
870 }
871
872 let top_of_block_tx2 = iter.next().unwrap();
875 assert_eq!(top_of_block_tx2.max_fee_per_gas(), 3);
876 assert_eq!(top_of_block_tx2.sender(), prioritized_tx2.sender());
877 }
878
879 #[test]
880 fn test_best_with_fees_iter_no_blob_fee_required() {
881 let mut pool = PendingPool::new(MockOrdering::default());
883 let mut f = MockTransactionFactory::default();
884
885 let base_fee: u64 = 10;
886 let base_fee_per_blob_gas: u64 = 0; for nonce in 0..5 {
890 let tx = MockTransaction::eip1559()
891 .rng_hash()
892 .with_nonce(nonce)
893 .with_max_fee(base_fee as u128 + 5);
894 let valid_tx = f.validated(tx);
895 pool.add_transaction(Arc::new(valid_tx), 0);
896 }
897
898 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
899
900 for nonce in 0..5 {
902 let tx = best.next().expect("Transaction should be returned");
903 assert_eq!(tx.nonce(), nonce);
904 }
905
906 assert!(best.next().is_none());
908 }
909
910 #[test]
911 fn test_best_with_fees_iter_mix_of_blob_and_non_blob_transactions() {
912 let mut pool = PendingPool::new(MockOrdering::default());
914 let mut f = MockTransactionFactory::default();
915
916 let base_fee: u64 = 10;
917 let base_fee_per_blob_gas: u64 = 15;
918
919 let tx_non_blob =
921 MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
922 pool.add_transaction(Arc::new(f.validated(tx_non_blob.clone())), 0);
923
924 let tx_blob = MockTransaction::eip4844()
926 .rng_hash()
927 .with_nonce(1)
928 .with_max_fee(base_fee as u128 + 5)
929 .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
930 pool.add_transaction(Arc::new(f.validated(tx_blob.clone())), 0);
931
932 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
933
934 let tx = best.next().expect("Transaction should be returned");
936 assert_eq!(tx.transaction, tx_non_blob);
937
938 let tx = best.next().expect("Transaction should be returned");
939 assert_eq!(tx.transaction, tx_blob);
940
941 assert!(best.next().is_none());
943 }
944
945 #[test]
946 fn test_best_transactions_with_skipping_blobs() {
947 let mut pool = PendingPool::new(MockOrdering::default());
949 let mut f = MockTransactionFactory::default();
950
951 let tx_blob = MockTransaction::eip4844().rng_hash().with_nonce(0).with_blob_fee(100);
953 let valid_blob_tx = f.validated(tx_blob);
954 pool.add_transaction(Arc::new(valid_blob_tx), 0);
955
956 let tx_non_blob = MockTransaction::eip1559().rng_hash().with_nonce(1).with_max_fee(200);
958 let valid_non_blob_tx = f.validated(tx_non_blob.clone());
959 pool.add_transaction(Arc::new(valid_non_blob_tx), 0);
960
961 let mut best = pool.best();
962 best.skip_blobs();
963
964 let tx = best.next().expect("Transaction should be returned");
966 assert_eq!(tx.transaction, tx_non_blob);
967
968 assert!(best.next().is_none());
970 }
971
972 #[test]
973 fn test_best_transactions_no_updates() {
974 let mut pool = PendingPool::new(MockOrdering::default());
977 let mut f = MockTransactionFactory::default();
978
979 let tx = MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(100);
981 let valid_tx = f.validated(tx);
982 pool.add_transaction(Arc::new(valid_tx), 0);
983
984 let mut best = pool.best();
985
986 let (_tx_sender, tx_receiver) =
988 tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
989 best.new_transaction_receiver = Some(tx_receiver);
990
991 assert!(best.new_transaction_receiver.is_some());
993
994 best.no_updates();
996
997 assert!(best.new_transaction_receiver.is_none());
999 }
1000
1001 #[test]
1002 fn test_best_update_transaction_priority() {
1003 let mut pool = PendingPool::new(MockOrdering::default());
1004 let mut f = MockTransactionFactory::default();
1005
1006 let num_tx = 5;
1008 let tx = MockTransaction::eip1559();
1009 for nonce in 0..num_tx {
1010 let tx = tx.clone().rng_hash().with_nonce(nonce);
1011 let valid_tx = f.validated(tx);
1012 pool.add_transaction(Arc::new(valid_tx), 0);
1013 }
1014
1015 let mut best = pool.best();
1017
1018 let (tx_sender, tx_receiver) =
1020 tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
1021 best.new_transaction_receiver = Some(tx_receiver);
1022
1023 let first = best.next().unwrap();
1025
1026 let new_higher_fee_tx = MockTransaction::eip1559().with_nonce(0);
1028 let valid_new_higher_fee_tx = f.validated(new_higher_fee_tx);
1029
1030 let pending_tx = PendingTransaction {
1032 submission_id: 10,
1033 transaction: Arc::new(valid_new_higher_fee_tx.clone()),
1034 priority: Priority::Value(u128::MAX),
1035 };
1036 tx_sender.send(pending_tx).unwrap();
1037
1038 for tx in best {
1040 assert_eq!(tx.sender_id(), first.sender_id());
1041 assert_ne!(tx.sender_id(), valid_new_higher_fee_tx.sender_id());
1042 }
1043 }
1044
1045 #[test]
1050 fn test_blob_transaction_ordering_multiple_clients_shape() {
1051 let mut pool = PendingPool::new(MockOrdering::default());
1052 let mut f = MockTransactionFactory::default();
1053
1054 let base_fee: u64 = 10;
1055 let base_fee_per_blob_gas: u64 = 1;
1056 let max_blob_count: u64 = 6;
1057
1058 let sender_a = MockTransaction::eip4844()
1059 .with_blob_hashes(5)
1060 .with_max_fee(base_fee as u128 + 20)
1061 .with_priority_fee(base_fee as u128 + 20)
1062 .with_blob_fee(120);
1063 for nonce in 0..5u64 {
1064 let tx = sender_a.clone().rng_hash().with_nonce(nonce);
1065 pool.add_transaction(Arc::new(f.validated(tx)), 0);
1066 }
1067
1068 let sender_b = MockTransaction::eip4844()
1069 .with_blob_hashes(1)
1070 .with_max_fee(base_fee as u128 + 20)
1071 .with_priority_fee(base_fee as u128 + 20)
1072 .with_blob_fee(100);
1073 for nonce in 0..5u64 {
1074 let tx = sender_b.clone().rng_hash().with_nonce(nonce);
1075 pool.add_transaction(Arc::new(f.validated(tx)), 0);
1076 }
1077
1078 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
1079 let mut block_blob_count = 0u64;
1080 let mut included_txs = 0u64;
1081
1082 while let Some(tx) = best.next() {
1083 if let Some(blob_hashes) = tx.transaction.blob_versioned_hashes() {
1084 let tx_blob_count = blob_hashes.len() as u64;
1085
1086 if block_blob_count + tx_blob_count > max_blob_count {
1087 crate::traits::BestTransactions::mark_invalid(
1088 &mut best,
1089 &tx,
1090 InvalidPoolTransactionError::Eip4844(
1091 Eip4844PoolTransactionError::TooManyEip4844Blobs {
1092 have: block_blob_count + tx_blob_count,
1093 permitted: max_blob_count,
1094 },
1095 ),
1096 );
1097 continue;
1098 }
1099
1100 block_blob_count += tx_blob_count;
1101 included_txs += 1;
1102
1103 if block_blob_count == max_blob_count {
1104 best.skip_blobs();
1105 break;
1106 }
1107 }
1108 }
1109
1110 assert_eq!(
1111 block_blob_count, max_blob_count,
1112 "expected a full blob block (5+1 blobs across senders)"
1113 );
1114 assert_eq!(included_txs, 2, "expected one 5-blob tx and one 1-blob tx in the block");
1115 }
1116}