1use crate::{
2 error::{Eip4844PoolTransactionError, InvalidPoolTransactionError},
3 identifier::{SenderId, TransactionId},
4 pool::pending::PendingTransaction,
5 PoolTransaction, Priority, TransactionOrdering, ValidPoolTransaction,
6};
7use alloy_consensus::Transaction;
8use alloy_primitives::map::AddressSet;
9use core::fmt;
10use imbl::OrdMap;
11use reth_primitives_traits::transaction::error::InvalidTransactionError;
12use rustc_hash::FxHashSet;
13use std::{
14 collections::{BTreeSet, VecDeque},
15 sync::Arc,
16};
17use tokio::sync::broadcast::{error::TryRecvError, Receiver};
18use tracing::debug;
19
20const MAX_NEW_TRANSACTIONS_PER_BATCH: usize = 16;
21
22pub(crate) struct BestTransactionsWithFees<T: TransactionOrdering> {
30 pub(crate) best: BestTransactions<T>,
31 pub(crate) base_fee: u64,
32 pub(crate) base_fee_per_blob_gas: u64,
33}
34
35impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactionsWithFees<T> {
36 fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
37 BestTransactions::mark_invalid(&mut self.best, tx, kind)
38 }
39
40 fn no_updates(&mut self) {
41 self.best.no_updates()
42 }
43
44 fn skip_blobs(&mut self) {
45 self.set_skip_blobs(true)
46 }
47
48 fn set_skip_blobs(&mut self, skip_blobs: bool) {
49 self.best.set_skip_blobs(skip_blobs)
50 }
51}
52
53impl<T: TransactionOrdering> Iterator for BestTransactionsWithFees<T> {
54 type Item = Arc<ValidPoolTransaction<T::Transaction>>;
55
56 fn next(&mut self) -> Option<Self::Item> {
57 loop {
59 let best = Iterator::next(&mut self.best)?;
60 if best.transaction.max_fee_per_gas() >= self.base_fee as u128 &&
63 best.transaction
64 .max_fee_per_blob_gas()
65 .is_none_or(|fee| fee >= self.base_fee_per_blob_gas as u128)
66 {
67 return Some(best);
68 }
69 crate::traits::BestTransactions::mark_invalid(
70 self,
71 &best,
72 InvalidPoolTransactionError::Underpriced,
73 );
74 }
75 }
76
77 fn size_hint(&self) -> (usize, Option<usize>) {
78 let (_, upper) = self.best.size_hint();
79 (0, upper)
80 }
81}
82
83#[derive(Debug)]
91pub struct BestTransactions<T: TransactionOrdering> {
92 pub(crate) all: OrdMap<TransactionId, PendingTransaction<T>>,
95 pub(crate) independent: BTreeSet<PendingTransaction<T>>,
100 pub(crate) invalid: FxHashSet<SenderId>,
102 pub(crate) new_transaction_receiver: Option<Receiver<PendingTransaction<T>>>,
108 pub(crate) last_priority: Option<Priority<T::PriorityValue>>,
112 pub(crate) skip_blobs: bool,
114}
115
116impl<T: TransactionOrdering> BestTransactions<T> {
117 pub(crate) fn mark_invalid(
119 &mut self,
120 tx: &Arc<ValidPoolTransaction<T::Transaction>>,
121 _kind: InvalidPoolTransactionError,
122 ) {
123 self.invalid.insert(tx.sender_id());
124 }
125
126 pub(crate) fn ancestor(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
131 self.all.get(&id.unchecked_ancestor()?)
132 }
133
134 fn try_recv(&mut self) -> Option<IncomingTransaction<T>> {
136 loop {
137 match self.new_transaction_receiver.as_mut()?.try_recv() {
138 Ok(tx) => {
139 if let Some(last_priority) = &self.last_priority &&
140 &tx.priority > last_priority
141 {
142 return Some(IncomingTransaction::Stash(tx))
145 }
146 return Some(IncomingTransaction::Process(tx))
147 }
148 Err(TryRecvError::Lagged(_)) => {
154 }
157
158 Err(_) => return None,
161 }
162 }
163 }
164
165 fn pop_best(&mut self) -> Option<PendingTransaction<T>> {
168 self.independent.pop_last().inspect(|best| {
169 self.all.remove(best.transaction.id());
170 })
171 }
172
173 fn add_new_transactions(&mut self) {
176 for _ in 0..MAX_NEW_TRANSACTIONS_PER_BATCH {
177 if let Some(pending_tx) = self.try_recv() {
178 match pending_tx {
181 IncomingTransaction::Process(tx) => {
182 let tx_id = *tx.transaction.id();
183 if self.ancestor(&tx_id).is_none() {
184 self.independent.insert(tx.clone());
185 }
186 self.all.insert(tx_id, tx);
187 }
188 IncomingTransaction::Stash(tx) => {
189 let tx_id = *tx.transaction.id();
190 self.all.insert(tx_id, tx);
191 }
192 }
193 } else {
194 break;
195 }
196 }
197 }
198
199 #[expect(clippy::type_complexity)]
201 pub fn next_tx_and_priority(
202 &mut self,
203 ) -> Option<(Arc<ValidPoolTransaction<T::Transaction>>, Priority<T::PriorityValue>)> {
204 loop {
205 self.add_new_transactions();
206 let best = self.pop_best()?;
208 let sender_id = best.transaction.sender_id();
209
210 if self.invalid.contains(&sender_id) {
212 debug!(
213 target: "txpool",
214 "[{:?}] skipping invalid transaction",
215 best.transaction.hash()
216 );
217 continue
218 }
219
220 if let Some(unlocked) = self.all.get(&best.unlocks()) {
222 self.independent.insert(unlocked.clone());
223 }
224
225 if self.skip_blobs && best.transaction.is_eip4844() {
226 self.mark_invalid(
229 &best.transaction,
230 InvalidPoolTransactionError::Eip4844(
231 Eip4844PoolTransactionError::NoEip4844Blobs,
232 ),
233 )
234 } else {
235 if self.new_transaction_receiver.is_some() {
236 self.last_priority = Some(best.priority.clone())
237 }
238 return Some((best.transaction, best.priority))
239 }
240 }
241 }
242}
243
244enum IncomingTransaction<T: TransactionOrdering> {
249 Process(PendingTransaction<T>),
256
257 Stash(PendingTransaction<T>),
269}
270
271impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactions<T> {
272 fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
273 Self::mark_invalid(self, tx, kind)
274 }
275
276 fn no_updates(&mut self) {
277 self.new_transaction_receiver.take();
278 self.last_priority.take();
279 }
280
281 fn skip_blobs(&mut self) {
282 self.set_skip_blobs(true);
283 }
284
285 fn set_skip_blobs(&mut self, skip_blobs: bool) {
286 self.skip_blobs = skip_blobs;
287 }
288}
289
290impl<T: TransactionOrdering> Iterator for BestTransactions<T> {
291 type Item = Arc<ValidPoolTransaction<T::Transaction>>;
292
293 fn next(&mut self) -> Option<Self::Item> {
294 self.next_tx_and_priority().map(|(tx, _)| tx)
295 }
296
297 fn size_hint(&self) -> (usize, Option<usize>) {
298 (0, self.new_transaction_receiver.is_none().then_some(self.all.len()))
299 }
300}
301
302pub struct BestTransactionFilter<I, P> {
308 pub(crate) best: I,
309 pub(crate) predicate: P,
310}
311
312impl<I, P> BestTransactionFilter<I, P> {
313 pub const fn new(best: I, predicate: P) -> Self {
315 Self { best, predicate }
316 }
317}
318
319impl<I, P> Iterator for BestTransactionFilter<I, P>
320where
321 I: crate::traits::BestTransactions,
322 P: FnMut(&<I as Iterator>::Item) -> bool,
323{
324 type Item = <I as Iterator>::Item;
325
326 fn next(&mut self) -> Option<Self::Item> {
327 loop {
328 let best = self.best.next()?;
329 if (self.predicate)(&best) {
330 return Some(best)
331 }
332 self.best.mark_invalid(
333 &best,
334 InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
335 );
336 }
337 }
338
339 fn size_hint(&self) -> (usize, Option<usize>) {
340 let (_, upper) = self.best.size_hint();
341 (0, upper)
342 }
343}
344
345impl<I, P> crate::traits::BestTransactions for BestTransactionFilter<I, P>
346where
347 I: crate::traits::BestTransactions,
348 P: FnMut(&<I as Iterator>::Item) -> bool + Send,
349{
350 fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
351 crate::traits::BestTransactions::mark_invalid(&mut self.best, tx, kind)
352 }
353
354 fn no_updates(&mut self) {
355 self.best.no_updates()
356 }
357
358 fn skip_blobs(&mut self) {
359 self.set_skip_blobs(true)
360 }
361
362 fn set_skip_blobs(&mut self, skip_blobs: bool) {
363 self.best.set_skip_blobs(skip_blobs)
364 }
365}
366
367impl<I: fmt::Debug, P> fmt::Debug for BestTransactionFilter<I, P> {
368 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
369 f.debug_struct("BestTransactionFilter").field("best", &self.best).finish()
370 }
371}
372
373#[derive(Debug)]
376pub struct BestTransactionsWithPrioritizedSenders<I: Iterator> {
377 inner: I,
379 prioritized_senders: AddressSet,
381 max_prioritized_gas: u64,
383 buffer: VecDeque<I::Item>,
386 prioritized_gas: u64,
389}
390
391impl<I: Iterator> BestTransactionsWithPrioritizedSenders<I> {
392 pub fn new(prioritized_senders: AddressSet, max_prioritized_gas: u64, inner: I) -> Self {
394 Self {
395 inner,
396 prioritized_senders,
397 max_prioritized_gas,
398 buffer: Default::default(),
399 prioritized_gas: Default::default(),
400 }
401 }
402}
403
404impl<I, T> Iterator for BestTransactionsWithPrioritizedSenders<I>
405where
406 I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
407 T: PoolTransaction,
408{
409 type Item = <I as Iterator>::Item;
410
411 fn next(&mut self) -> Option<Self::Item> {
412 if self.prioritized_gas < self.max_prioritized_gas {
414 for item in &mut self.inner {
415 if self.prioritized_senders.contains(&item.transaction.sender()) &&
416 self.prioritized_gas + item.transaction.gas_limit() <=
417 self.max_prioritized_gas
418 {
419 self.prioritized_gas += item.transaction.gas_limit();
420 return Some(item)
421 }
422 self.buffer.push_back(item);
423 }
424 }
425
426 if let Some(item) = self.buffer.pop_front() {
427 Some(item)
428 } else {
429 self.inner.next()
430 }
431 }
432
433 fn size_hint(&self) -> (usize, Option<usize>) {
434 let buffered = self.buffer.len();
435 let (inner_lower, inner_upper) = self.inner.size_hint();
436
437 (
438 buffered.saturating_add(inner_lower),
439 inner_upper.and_then(|upper| upper.checked_add(buffered)),
440 )
441 }
442}
443
444impl<I, T> crate::traits::BestTransactions for BestTransactionsWithPrioritizedSenders<I>
445where
446 I: crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T>>>,
447 T: PoolTransaction,
448{
449 fn mark_invalid(&mut self, tx: &Self::Item, kind: InvalidPoolTransactionError) {
450 self.inner.mark_invalid(tx, kind)
451 }
452
453 fn no_updates(&mut self) {
454 self.inner.no_updates()
455 }
456
457 fn set_skip_blobs(&mut self, skip_blobs: bool) {
458 if skip_blobs {
459 self.buffer.retain(|tx| !tx.transaction.is_eip4844())
460 }
461 self.inner.set_skip_blobs(skip_blobs)
462 }
463}
464
465#[cfg(test)]
466mod tests {
467 use super::*;
468 use crate::{
469 pool::pending::PendingPool,
470 test_utils::{MockOrdering, MockTransaction, MockTransactionFactory},
471 BestTransactions, Priority,
472 };
473
474 #[test]
475 fn test_best_iter() {
476 let mut pool = PendingPool::new(MockOrdering::default());
477 let mut f = MockTransactionFactory::default();
478
479 let num_tx = 10;
480 let tx = MockTransaction::eip1559();
482 for nonce in 0..num_tx {
483 let tx = tx.clone().rng_hash().with_nonce(nonce);
484 let valid_tx = f.validated(tx);
485 pool.add_transaction(Arc::new(valid_tx), 0);
486 }
487
488 let mut best = pool.best();
489 assert_eq!(best.all.len(), num_tx as usize);
490 assert_eq!(best.independent.len(), 1);
491
492 for nonce in 0..num_tx {
494 assert_eq!(best.independent.len(), 1);
495 let tx = best.next().unwrap();
496 assert_eq!(tx.nonce(), nonce);
497 }
498 }
499
500 #[test]
501 fn test_best_transactions_size_hint() {
502 let mut pool = PendingPool::new(MockOrdering::default());
503 let mut f = MockTransactionFactory::default();
504
505 for nonce in 0..3 {
506 let tx = MockTransaction::eip1559().rng_hash().with_nonce(nonce);
507 pool.add_transaction(Arc::new(f.validated(tx)), 0);
508 }
509
510 let mut best = pool.best();
511 assert_eq!(best.size_hint(), (0, None));
512
513 best.no_updates();
514 assert_eq!(best.size_hint(), (0, Some(3)));
515
516 assert_eq!(best.next().unwrap().nonce(), 0);
517 assert_eq!(best.size_hint(), (0, Some(2)));
518 }
519
520 #[test]
521 fn test_best_transactions_with_fees_size_hint() {
522 let mut pool = PendingPool::new(MockOrdering::default());
523 let mut f = MockTransactionFactory::default();
524
525 for nonce in 0..3 {
526 let tx = MockTransaction::eip1559().rng_hash().with_nonce(nonce).with_max_fee(100);
527 pool.add_transaction(Arc::new(f.validated(tx)), 0);
528 }
529
530 let mut best = pool.best_with_basefee_and_blobfee(10, 0);
531 best.no_updates();
532
533 assert_eq!(best.size_hint(), (0, Some(3)));
534 assert_eq!(best.next().unwrap().nonce(), 0);
535 assert_eq!(best.size_hint(), (0, Some(2)));
536 }
537
538 #[test]
539 fn test_best_transaction_filter_size_hint() {
540 let mut pool = PendingPool::new(MockOrdering::default());
541 let mut f = MockTransactionFactory::default();
542
543 for nonce in 0..3 {
544 let tx = MockTransaction::eip1559().rng_hash().with_nonce(nonce);
545 pool.add_transaction(Arc::new(f.validated(tx)), 0);
546 }
547
548 let best = pool.best().without_updates();
549 let mut filter =
550 BestTransactionFilter::new(best, |_: &Arc<ValidPoolTransaction<MockTransaction>>| {
551 false
552 });
553
554 assert_eq!(filter.size_hint(), (0, Some(3)));
555 assert!(filter.next().is_none());
556 assert_eq!(filter.size_hint(), (0, Some(0)));
557 }
558
559 #[test]
560 fn test_best_transactions_with_prioritized_senders_size_hint() {
561 let mut pool = PendingPool::new(MockOrdering::default());
562 let mut f = MockTransactionFactory::default();
563
564 for gas_price in 0..5 {
565 let tx = MockTransaction::eip1559().with_gas_price((gas_price + 1) * 10);
566 pool.add_transaction(Arc::new(f.validated(tx)), 0);
567 }
568
569 let prioritized_tx = MockTransaction::eip1559().with_gas_price(5).with_gas_limit(200);
570 let prioritized_sender = prioritized_tx.sender();
571 pool.add_transaction(Arc::new(f.validated(prioritized_tx)), 0);
572
573 let mut best = BestTransactionsWithPrioritizedSenders::new(
574 AddressSet::from_iter([prioritized_sender]),
575 200,
576 pool.best().without_updates(),
577 );
578
579 assert_eq!(best.size_hint(), (0, Some(6)));
580 assert_eq!(best.next().unwrap().sender(), prioritized_sender);
581 assert_eq!(best.size_hint(), (5, Some(5)));
582 }
583
584 #[test]
585 fn test_best_iter_invalid() {
586 let mut pool = PendingPool::new(MockOrdering::default());
587 let mut f = MockTransactionFactory::default();
588
589 let num_tx = 10;
590 let tx = MockTransaction::eip1559();
592 for nonce in 0..num_tx {
593 let tx = tx.clone().rng_hash().with_nonce(nonce);
594 let valid_tx = f.validated(tx);
595 pool.add_transaction(Arc::new(valid_tx), 0);
596 }
597
598 let mut best = pool.best();
599
600 let invalid = best.independent.iter().next().unwrap();
602 best.mark_invalid(
603 &invalid.transaction.clone(),
604 InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
605 );
606
607 assert!(best.next().is_none());
609 }
610
611 #[test]
612 fn test_best_transactions_iter_invalid() {
613 let mut pool = PendingPool::new(MockOrdering::default());
614 let mut f = MockTransactionFactory::default();
615
616 let num_tx = 10;
617 let tx = MockTransaction::eip1559();
619 for nonce in 0..num_tx {
620 let tx = tx.clone().rng_hash().with_nonce(nonce);
621 let valid_tx = f.validated(tx);
622 pool.add_transaction(Arc::new(valid_tx), 0);
623 }
624
625 let mut best: Box<
626 dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<MockTransaction>>>,
627 > = Box::new(pool.best());
628
629 let tx = Iterator::next(&mut best).unwrap();
630 crate::traits::BestTransactions::mark_invalid(
631 &mut *best,
632 &tx,
633 InvalidPoolTransactionError::Consensus(InvalidTransactionError::TxTypeNotSupported),
634 );
635 assert!(Iterator::next(&mut best).is_none());
636 }
637
638 #[test]
639 fn test_best_with_fees_iter_base_fee_satisfied() {
640 let mut pool = PendingPool::new(MockOrdering::default());
641 let mut f = MockTransactionFactory::default();
642
643 let num_tx = 5;
644 let base_fee: u64 = 10;
645 let base_fee_per_blob_gas: u64 = 15;
646
647 for nonce in 0..num_tx {
650 let tx = MockTransaction::eip1559()
651 .rng_hash()
652 .with_nonce(nonce)
653 .with_max_fee(base_fee as u128 + 5);
654 let valid_tx = f.validated(tx);
655 pool.add_transaction(Arc::new(valid_tx), 0);
656 }
657
658 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
659
660 for nonce in 0..num_tx {
661 let tx = best.next().expect("Transaction should be returned");
662 assert_eq!(tx.nonce(), nonce);
663 assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
664 }
665 }
666
667 #[test]
668 fn test_best_with_fees_iter_base_fee_violated() {
669 let mut pool = PendingPool::new(MockOrdering::default());
670 let mut f = MockTransactionFactory::default();
671
672 let num_tx = 5;
673 let base_fee: u64 = 20;
674 let base_fee_per_blob_gas: u64 = 15;
675
676 for nonce in 0..num_tx {
678 let tx = MockTransaction::eip1559()
679 .rng_hash()
680 .with_nonce(nonce)
681 .with_max_fee(base_fee as u128 - 5);
682 let valid_tx = f.validated(tx);
683 pool.add_transaction(Arc::new(valid_tx), 0);
684 }
685
686 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
687
688 assert!(best.next().is_none());
690 }
691
692 #[test]
693 fn test_best_with_fees_iter_blob_fee_satisfied() {
694 let mut pool = PendingPool::new(MockOrdering::default());
695 let mut f = MockTransactionFactory::default();
696
697 let num_tx = 5;
698 let base_fee: u64 = 10;
699 let base_fee_per_blob_gas: u64 = 20;
700
701 for nonce in 0..num_tx {
704 let tx = MockTransaction::eip4844()
705 .rng_hash()
706 .with_nonce(nonce)
707 .with_max_fee(base_fee as u128 + 5)
708 .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
709 let valid_tx = f.validated(tx);
710 pool.add_transaction(Arc::new(valid_tx), 0);
711 }
712
713 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
714
715 for nonce in 0..num_tx {
718 let tx = best.next().expect("Transaction should be returned");
719 assert_eq!(tx.nonce(), nonce);
720 assert!(tx.transaction.max_fee_per_gas() >= base_fee as u128);
721 assert!(
722 tx.transaction.max_fee_per_blob_gas().unwrap() >= base_fee_per_blob_gas as u128
723 );
724 }
725
726 assert!(best.next().is_none());
728 }
729
730 #[test]
731 fn test_best_with_fees_iter_blob_fee_violated() {
732 let mut pool = PendingPool::new(MockOrdering::default());
733 let mut f = MockTransactionFactory::default();
734
735 let num_tx = 5;
736 let base_fee: u64 = 10;
737 let base_fee_per_blob_gas: u64 = 20;
738
739 for nonce in 0..num_tx {
741 let tx = MockTransaction::eip4844()
742 .rng_hash()
743 .with_nonce(nonce)
744 .with_max_fee(base_fee as u128 + 5)
745 .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
746 let valid_tx = f.validated(tx);
747 pool.add_transaction(Arc::new(valid_tx), 0);
748 }
749
750 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
751
752 assert!(best.next().is_none());
754 }
755
756 #[test]
757 fn test_best_with_fees_iter_mixed_fees() {
758 let mut pool = PendingPool::new(MockOrdering::default());
759 let mut f = MockTransactionFactory::default();
760
761 let base_fee: u64 = 10;
762 let base_fee_per_blob_gas: u64 = 20;
763
764 let tx1 =
766 MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
767 let tx2 = MockTransaction::eip4844()
768 .rng_hash()
769 .with_nonce(1)
770 .with_max_fee(base_fee as u128 + 5)
771 .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
772 let tx3 = MockTransaction::eip4844()
773 .rng_hash()
774 .with_nonce(2)
775 .with_max_fee(base_fee as u128 + 5)
776 .with_blob_fee(base_fee_per_blob_gas as u128 - 5);
777 let tx4 =
778 MockTransaction::eip1559().rng_hash().with_nonce(3).with_max_fee(base_fee as u128 - 5);
779
780 pool.add_transaction(Arc::new(f.validated(tx1.clone())), 0);
781 pool.add_transaction(Arc::new(f.validated(tx2.clone())), 0);
782 pool.add_transaction(Arc::new(f.validated(tx3)), 0);
783 pool.add_transaction(Arc::new(f.validated(tx4)), 0);
784
785 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
786
787 let expected_order = vec![tx1, tx2];
788 for expected_tx in expected_order {
789 let tx = best.next().expect("Transaction should be returned");
790 assert_eq!(tx.transaction, expected_tx);
791 }
792
793 assert!(best.next().is_none());
795 }
796
797 #[test]
798 fn test_best_add_transaction_with_next_nonce() {
799 let mut pool = PendingPool::new(MockOrdering::default());
800 let mut f = MockTransactionFactory::default();
801
802 let num_tx = 5;
804 let tx = MockTransaction::eip1559();
805 for nonce in 0..num_tx {
806 let tx = tx.clone().rng_hash().with_nonce(nonce);
807 let valid_tx = f.validated(tx);
808 pool.add_transaction(Arc::new(valid_tx), 0);
809 }
810
811 let mut best = pool.best();
813
814 let (tx_sender, tx_receiver) =
816 tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
817 best.new_transaction_receiver = Some(tx_receiver);
818
819 let new_tx = MockTransaction::eip1559().rng_hash().with_nonce(5);
821 let valid_new_tx = f.validated(new_tx);
822
823 let pending_tx = PendingTransaction {
825 submission_id: 10,
826 transaction: Arc::new(valid_new_tx.clone()),
827 priority: Priority::Value(1000),
828 };
829 tx_sender.send(pending_tx.clone()).unwrap();
830
831 best.add_new_transactions();
833
834 assert_eq!(best.all.len(), 6);
836 assert!(best.all.contains_key(valid_new_tx.id()));
837
838 assert_eq!(best.independent.len(), 2);
840 assert!(best.independent.contains(&pending_tx));
841 }
842
843 #[test]
844 fn test_best_add_transaction_with_ancestor() {
845 let mut pool = PendingPool::new(MockOrdering::default());
847 let mut f = MockTransactionFactory::default();
848
849 let num_tx = 5;
851 let tx = MockTransaction::eip1559();
852 for nonce in 0..num_tx {
853 let tx = tx.clone().rng_hash().with_nonce(nonce);
854 let valid_tx = f.validated(tx);
855 pool.add_transaction(Arc::new(valid_tx), 0);
856 }
857
858 let mut best = pool.best();
860
861 let (tx_sender, tx_receiver) =
863 tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
864 best.new_transaction_receiver = Some(tx_receiver);
865
866 let base_tx1 = MockTransaction::eip1559().rng_hash().with_nonce(5);
868 let valid_new_tx1 = f.validated(base_tx1.clone());
869
870 let pending_tx1 = PendingTransaction {
872 submission_id: 10,
873 transaction: Arc::new(valid_new_tx1.clone()),
874 priority: Priority::Value(1000),
875 };
876 tx_sender.send(pending_tx1.clone()).unwrap();
877
878 best.add_new_transactions();
880
881 assert_eq!(best.all.len(), 6);
883 assert!(best.all.contains_key(valid_new_tx1.id()));
884
885 assert_eq!(best.independent.len(), 2);
887 assert!(best.independent.contains(&pending_tx1));
888
889 let base_tx2 = base_tx1.with_nonce(6);
891 let valid_new_tx2 = f.validated(base_tx2);
892
893 let pending_tx2 = PendingTransaction {
895 submission_id: 11, transaction: Arc::new(valid_new_tx2.clone()),
897 priority: Priority::Value(1000),
898 };
899 tx_sender.send(pending_tx2.clone()).unwrap();
900
901 best.add_new_transactions();
903
904 assert_eq!(best.all.len(), 7);
906 assert!(best.all.contains_key(valid_new_tx2.id()));
907
908 assert_eq!(best.independent.len(), 2);
910 assert!(!best.independent.contains(&pending_tx2));
911 }
912
913 #[test]
914 fn test_best_transactions_filter_trait_object() {
915 let mut pool = PendingPool::new(MockOrdering::default());
917 let mut f = MockTransactionFactory::default();
918
919 let num_tx = 5;
921 let tx = MockTransaction::eip1559();
922 for nonce in 0..num_tx {
923 let tx = tx.clone().rng_hash().with_nonce(nonce);
924 let valid_tx = f.validated(tx);
925 pool.add_transaction(Arc::new(valid_tx), 0);
926 }
927
928 let best: Box<dyn crate::traits::BestTransactions<Item = _>> = Box::new(pool.best());
930
931 let filter =
933 BestTransactionFilter::new(best, |tx: &Arc<ValidPoolTransaction<MockTransaction>>| {
934 tx.nonce().is_multiple_of(2)
935 });
936
937 for tx in filter {
939 assert_eq!(tx.nonce() % 2, 0);
940 }
941 }
942
943 #[test]
944 fn test_best_transactions_prioritized_senders() {
945 let mut pool = PendingPool::new(MockOrdering::default());
946 let mut f = MockTransactionFactory::default();
947
948 for gas_price in 0..5 {
950 let tx = MockTransaction::eip1559().with_gas_price((gas_price + 1) * 10);
951 let valid_tx = f.validated(tx);
952 pool.add_transaction(Arc::new(valid_tx), 0);
953 }
954
955 let prioritized_tx = MockTransaction::eip1559().with_gas_price(5).with_gas_limit(200);
957 let valid_prioritized_tx = f.validated(prioritized_tx.clone());
958 pool.add_transaction(Arc::new(valid_prioritized_tx), 0);
959
960 let prioritized_tx2 = MockTransaction::eip1559().with_gas_price(3);
963 let valid_prioritized_tx2 = f.validated(prioritized_tx2.clone());
964 pool.add_transaction(Arc::new(valid_prioritized_tx2), 0);
965
966 let prioritized_senders =
967 AddressSet::from_iter([prioritized_tx.sender(), prioritized_tx2.sender()]);
968 let best =
969 BestTransactionsWithPrioritizedSenders::new(prioritized_senders, 200, pool.best());
970
971 let mut iter = best.into_iter();
974 let top_of_block_tx = iter.next().unwrap();
975 assert_eq!(top_of_block_tx.max_fee_per_gas(), 5);
976 assert_eq!(top_of_block_tx.sender(), prioritized_tx.sender());
977 for gas_price in (0..5).rev() {
978 assert_eq!(iter.next().unwrap().max_fee_per_gas(), (gas_price + 1) * 10);
979 }
980
981 let top_of_block_tx2 = iter.next().unwrap();
984 assert_eq!(top_of_block_tx2.max_fee_per_gas(), 3);
985 assert_eq!(top_of_block_tx2.sender(), prioritized_tx2.sender());
986 }
987
988 #[test]
989 fn test_best_with_fees_iter_no_blob_fee_required() {
990 let mut pool = PendingPool::new(MockOrdering::default());
992 let mut f = MockTransactionFactory::default();
993
994 let base_fee: u64 = 10;
995 let base_fee_per_blob_gas: u64 = 0; for nonce in 0..5 {
999 let tx = MockTransaction::eip1559()
1000 .rng_hash()
1001 .with_nonce(nonce)
1002 .with_max_fee(base_fee as u128 + 5);
1003 let valid_tx = f.validated(tx);
1004 pool.add_transaction(Arc::new(valid_tx), 0);
1005 }
1006
1007 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
1008
1009 for nonce in 0..5 {
1011 let tx = best.next().expect("Transaction should be returned");
1012 assert_eq!(tx.nonce(), nonce);
1013 }
1014
1015 assert!(best.next().is_none());
1017 }
1018
1019 #[test]
1020 fn test_best_with_fees_iter_mix_of_blob_and_non_blob_transactions() {
1021 let mut pool = PendingPool::new(MockOrdering::default());
1023 let mut f = MockTransactionFactory::default();
1024
1025 let base_fee: u64 = 10;
1026 let base_fee_per_blob_gas: u64 = 15;
1027
1028 let tx_non_blob =
1030 MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(base_fee as u128 + 5);
1031 pool.add_transaction(Arc::new(f.validated(tx_non_blob.clone())), 0);
1032
1033 let tx_blob = MockTransaction::eip4844()
1035 .rng_hash()
1036 .with_nonce(1)
1037 .with_max_fee(base_fee as u128 + 5)
1038 .with_blob_fee(base_fee_per_blob_gas as u128 + 5);
1039 pool.add_transaction(Arc::new(f.validated(tx_blob.clone())), 0);
1040
1041 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
1042
1043 let tx = best.next().expect("Transaction should be returned");
1045 assert_eq!(tx.transaction, tx_non_blob);
1046
1047 let tx = best.next().expect("Transaction should be returned");
1048 assert_eq!(tx.transaction, tx_blob);
1049
1050 assert!(best.next().is_none());
1052 }
1053
1054 #[test]
1055 fn test_best_transactions_with_skipping_blobs() {
1056 let mut pool = PendingPool::new(MockOrdering::default());
1058 let mut f = MockTransactionFactory::default();
1059
1060 let tx_blob = MockTransaction::eip4844().rng_hash().with_nonce(0).with_blob_fee(100);
1062 let valid_blob_tx = f.validated(tx_blob);
1063 pool.add_transaction(Arc::new(valid_blob_tx), 0);
1064
1065 let tx_non_blob = MockTransaction::eip1559().rng_hash().with_nonce(1).with_max_fee(200);
1067 let valid_non_blob_tx = f.validated(tx_non_blob.clone());
1068 pool.add_transaction(Arc::new(valid_non_blob_tx), 0);
1069
1070 let mut best = pool.best();
1071 best.skip_blobs();
1072
1073 let tx = best.next().expect("Transaction should be returned");
1075 assert_eq!(tx.transaction, tx_non_blob);
1076
1077 assert!(best.next().is_none());
1079 }
1080
1081 #[test]
1082 fn test_best_transactions_no_updates() {
1083 let mut pool = PendingPool::new(MockOrdering::default());
1086 let mut f = MockTransactionFactory::default();
1087
1088 let tx = MockTransaction::eip1559().rng_hash().with_nonce(0).with_max_fee(100);
1090 let valid_tx = f.validated(tx);
1091 pool.add_transaction(Arc::new(valid_tx), 0);
1092
1093 let mut best = pool.best();
1094
1095 let (_tx_sender, tx_receiver) =
1097 tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
1098 best.new_transaction_receiver = Some(tx_receiver);
1099
1100 assert!(best.new_transaction_receiver.is_some());
1102
1103 best.no_updates();
1105
1106 assert!(best.new_transaction_receiver.is_none());
1108 }
1109
1110 #[test]
1111 fn test_best_update_transaction_priority() {
1112 let mut pool = PendingPool::new(MockOrdering::default());
1113 let mut f = MockTransactionFactory::default();
1114
1115 let num_tx = 5;
1117 let tx = MockTransaction::eip1559();
1118 for nonce in 0..num_tx {
1119 let tx = tx.clone().rng_hash().with_nonce(nonce);
1120 let valid_tx = f.validated(tx);
1121 pool.add_transaction(Arc::new(valid_tx), 0);
1122 }
1123
1124 let mut best = pool.best();
1126
1127 let (tx_sender, tx_receiver) =
1129 tokio::sync::broadcast::channel::<PendingTransaction<MockOrdering>>(1000);
1130 best.new_transaction_receiver = Some(tx_receiver);
1131
1132 let first = best.next().unwrap();
1134
1135 let new_higher_fee_tx = MockTransaction::eip1559().with_nonce(0);
1137 let valid_new_higher_fee_tx = f.validated(new_higher_fee_tx);
1138
1139 let pending_tx = PendingTransaction {
1141 submission_id: 10,
1142 transaction: Arc::new(valid_new_higher_fee_tx.clone()),
1143 priority: Priority::Value(u128::MAX),
1144 };
1145 tx_sender.send(pending_tx).unwrap();
1146
1147 for tx in best {
1149 assert_eq!(tx.sender_id(), first.sender_id());
1150 assert_ne!(tx.sender_id(), valid_new_higher_fee_tx.sender_id());
1151 }
1152 }
1153
1154 #[test]
1159 fn test_blob_transaction_ordering_multiple_clients_shape() {
1160 let mut pool = PendingPool::new(MockOrdering::default());
1161 let mut f = MockTransactionFactory::default();
1162
1163 let base_fee: u64 = 10;
1164 let base_fee_per_blob_gas: u64 = 1;
1165 let max_blob_count: u64 = 6;
1166
1167 let sender_a = MockTransaction::eip4844()
1168 .with_blob_hashes(5)
1169 .with_max_fee(base_fee as u128 + 20)
1170 .with_priority_fee(base_fee as u128 + 20)
1171 .with_blob_fee(120);
1172 for nonce in 0..5u64 {
1173 let tx = sender_a.clone().rng_hash().with_nonce(nonce);
1174 pool.add_transaction(Arc::new(f.validated(tx)), 0);
1175 }
1176
1177 let sender_b = MockTransaction::eip4844()
1178 .with_blob_hashes(1)
1179 .with_max_fee(base_fee as u128 + 20)
1180 .with_priority_fee(base_fee as u128 + 20)
1181 .with_blob_fee(100);
1182 for nonce in 0..5u64 {
1183 let tx = sender_b.clone().rng_hash().with_nonce(nonce);
1184 pool.add_transaction(Arc::new(f.validated(tx)), 0);
1185 }
1186
1187 let mut best = pool.best_with_basefee_and_blobfee(base_fee, base_fee_per_blob_gas);
1188 let mut block_blob_count = 0u64;
1189 let mut included_txs = 0u64;
1190
1191 while let Some(tx) = best.next() {
1192 if let Some(blob_hashes) = tx.transaction.blob_versioned_hashes() {
1193 let tx_blob_count = blob_hashes.len() as u64;
1194
1195 if block_blob_count + tx_blob_count > max_blob_count {
1196 crate::traits::BestTransactions::mark_invalid(
1197 &mut best,
1198 &tx,
1199 InvalidPoolTransactionError::Eip4844(
1200 Eip4844PoolTransactionError::TooManyEip4844Blobs {
1201 have: block_blob_count + tx_blob_count,
1202 permitted: max_blob_count,
1203 },
1204 ),
1205 );
1206 continue;
1207 }
1208
1209 block_blob_count += tx_blob_count;
1210 included_txs += 1;
1211
1212 if block_blob_count == max_blob_count {
1213 best.skip_blobs();
1214 break;
1215 }
1216 }
1217 }
1218
1219 assert_eq!(
1220 block_blob_count, max_blob_count,
1221 "expected a full blob block (5+1 blobs across senders)"
1222 );
1223 assert_eq!(included_txs, 2, "expected one 5-blob tx and one 1-blob tx in the block");
1224 }
1225}