1#![doc(
272 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
273 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
274 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
275)]
276#![cfg_attr(docsrs, feature(doc_cfg))]
277#![cfg_attr(not(test), warn(unused_crate_dependencies))]
278
279pub use imbl::OrdMap;
280
281pub use crate::{
282 batcher::{BatchTxProcessor, BatchTxRequest},
283 blobstore::{BlobStore, BlobStoreError},
284 config::{
285 LocalTransactionConfig, PoolConfig, PriceBumpConfig, SubPoolLimit,
286 DEFAULT_MAX_INFLIGHT_DELEGATED_SLOTS, DEFAULT_PRICE_BUMP,
287 DEFAULT_TXPOOL_ADDITIONAL_VALIDATION_TASKS, MAX_NEW_PENDING_TXS_NOTIFICATIONS,
288 REPLACE_BLOB_PRICE_BUMP, TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER,
289 TXPOOL_SUBPOOL_MAX_SIZE_MB_DEFAULT, TXPOOL_SUBPOOL_MAX_TXS_DEFAULT,
290 },
291 error::PoolResult,
292 ordering::{CoinbaseTipOrdering, Priority, TransactionOrdering},
293 pool::{
294 blob_tx_priority, fee_delta, state::SubPool, AddedTransactionOutcome,
295 AllTransactionsEvents, FullTransactionEvent, NewTransactionEvent, TransactionEvent,
296 TransactionEvents, TransactionListenerKind,
297 },
298 traits::*,
299 validate::{
300 EthTransactionValidator, TransactionValidationOutcome, TransactionValidationTaskExecutor,
301 TransactionValidator, ValidPoolTransaction,
302 },
303};
304use crate::{identifier::TransactionId, pool::PoolInner};
305use alloy_eips::{
306 eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
307 eip7594::BlobTransactionSidecarVariant,
308};
309use alloy_primitives::{map::AddressSet, Address, TxHash, B128, B256, U256};
310use aquamarine as _;
311use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
312use reth_eth_wire_types::HandleMempoolData;
313use reth_evm::ConfigureEvm;
314use reth_evm_ethereum::EthEvmConfig;
315use reth_execution_types::ChangedAccount;
316use reth_primitives_traits::{HeaderTy, Recovered};
317use reth_storage_api::{BlockReaderIdExt, StateProviderFactory};
318use std::sync::Arc;
319use tokio::sync::mpsc::Receiver;
320use tracing::{instrument, trace};
321
322pub mod error;
323pub mod maintain;
324pub mod metrics;
325pub mod noop;
326pub mod pool;
327pub mod validate;
328
329pub mod batcher;
330pub mod blobstore;
331mod config;
332pub mod identifier;
333mod ordering;
334mod traits;
335
336#[cfg(any(test, feature = "test-utils"))]
337pub mod test_utils;
339
340pub type EthTransactionPool<Client, S, Evm = EthEvmConfig, T = EthPooledTransaction> = Pool<
342 TransactionValidationTaskExecutor<EthTransactionValidator<Client, T, Evm>>,
343 CoinbaseTipOrdering<T>,
344 S,
345>;
346
347#[derive(Debug)]
349pub struct Pool<V, T: TransactionOrdering, S> {
350 pool: Arc<PoolInner<V, T, S>>,
352}
353
354impl<V, T, S> Pool<V, T, S>
357where
358 V: TransactionValidator,
359 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
360 S: BlobStore,
361{
362 pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
364 Self { pool: Arc::new(PoolInner::new(validator, ordering, blob_store, config)) }
365 }
366
367 pub fn inner(&self) -> &PoolInner<V, T, S> {
369 &self.pool
370 }
371
372 pub fn config(&self) -> &PoolConfig {
374 self.inner().config()
375 }
376
377 pub fn validator(&self) -> &V {
379 self.inner().validator()
380 }
381
382 async fn validate(
384 &self,
385 origin: TransactionOrigin,
386 transaction: V::Transaction,
387 ) -> TransactionValidationOutcome<V::Transaction> {
388 self.pool.validator().validate_transaction(origin, transaction).await
389 }
390
391 pub fn len(&self) -> usize {
393 self.pool.len()
394 }
395
396 pub fn is_empty(&self) -> bool {
398 self.pool.is_empty()
399 }
400
401 pub fn is_exceeded(&self) -> bool {
403 self.pool.is_exceeded()
404 }
405
406 pub fn blob_store(&self) -> &S {
408 self.pool.blob_store()
409 }
410}
411
412impl<Client, S, Evm> EthTransactionPool<Client, S, Evm>
413where
414 Client: ChainSpecProvider<ChainSpec: EthereumHardforks>
415 + StateProviderFactory
416 + Clone
417 + BlockReaderIdExt<Header = HeaderTy<Evm::Primitives>>
418 + 'static,
419 S: BlobStore,
420 Evm: ConfigureEvm + 'static,
421{
422 pub fn eth_pool(
457 validator: TransactionValidationTaskExecutor<
458 EthTransactionValidator<Client, EthPooledTransaction, Evm>,
459 >,
460 blob_store: S,
461 config: PoolConfig,
462 ) -> Self {
463 Self::new(validator, CoinbaseTipOrdering::default(), blob_store, config)
464 }
465}
466
467impl<V, T, S> TransactionPool for Pool<V, T, S>
469where
470 V: TransactionValidator,
471 <V as TransactionValidator>::Transaction: EthPoolTransaction,
472 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
473 S: BlobStore,
474{
475 type Transaction = T::Transaction;
476
477 fn pool_size(&self) -> PoolSize {
478 self.pool.size()
479 }
480
481 fn block_info(&self) -> BlockInfo {
482 self.pool.block_info()
483 }
484
485 async fn add_transaction_and_subscribe(
486 &self,
487 origin: TransactionOrigin,
488 transaction: Self::Transaction,
489 ) -> PoolResult<TransactionEvents> {
490 let tx = self.validate(origin, transaction).await;
491 self.pool.add_transaction_and_subscribe(origin, tx)
492 }
493
494 async fn add_transaction(
495 &self,
496 origin: TransactionOrigin,
497 transaction: Self::Transaction,
498 ) -> PoolResult<AddedTransactionOutcome> {
499 let tx = self.validate(origin, transaction).await;
500 let mut results = self.pool.add_transactions(origin, std::iter::once(tx));
501 results.pop().expect("result length is the same as the input")
502 }
503
504 async fn add_transactions(
505 &self,
506 origin: TransactionOrigin,
507 transactions: Vec<Self::Transaction>,
508 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
509 if transactions.is_empty() {
510 return Vec::new()
511 }
512 let validated = self
513 .pool
514 .validator()
515 .validate_transactions(transactions.into_iter().map(|tx| (origin, tx)))
516 .await;
517 self.pool.add_transactions(origin, validated)
518 }
519
520 async fn add_transactions_with_origins(
521 &self,
522 transactions: Vec<(TransactionOrigin, Self::Transaction)>,
523 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
524 if transactions.is_empty() {
525 return Vec::new()
526 }
527 let origins: Vec<_> = transactions.iter().map(|(origin, _)| *origin).collect();
528 let validated = self.pool.validator().validate_transactions(transactions).await;
529 self.pool.add_transactions_with_origins(origins.into_iter().zip(validated))
530 }
531
532 fn transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
533 self.pool.add_transaction_event_listener(tx_hash)
534 }
535
536 fn all_transactions_event_listener(&self) -> AllTransactionsEvents<Self::Transaction> {
537 self.pool.add_all_transactions_event_listener()
538 }
539
540 fn pending_transactions_listener_for(&self, kind: TransactionListenerKind) -> Receiver<TxHash> {
541 self.pool.add_pending_listener(kind)
542 }
543
544 fn blob_transaction_sidecars_listener(&self) -> Receiver<NewBlobSidecar> {
545 self.pool.add_blob_sidecar_listener()
546 }
547
548 fn new_transactions_listener_for(
549 &self,
550 kind: TransactionListenerKind,
551 ) -> Receiver<NewTransactionEvent<Self::Transaction>> {
552 self.pool.add_new_transaction_listener(kind)
553 }
554
555 fn pooled_transaction_hashes(&self) -> Vec<TxHash> {
556 self.pool.pooled_transactions_hashes()
557 }
558
559 fn pooled_transaction_hashes_max(&self, max: usize) -> Vec<TxHash> {
560 self.pool.pooled_transactions_hashes_max(max)
561 }
562
563 fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
564 self.pool.pooled_transactions()
565 }
566
567 fn pooled_transactions_max(
568 &self,
569 max: usize,
570 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
571 self.pool.pooled_transactions_max(max)
572 }
573
574 fn get_pooled_transaction_elements(
575 &self,
576 tx_hashes: Vec<TxHash>,
577 limit: GetPooledTransactionLimit,
578 ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled> {
579 self.pool.get_pooled_transaction_elements(tx_hashes, limit)
580 }
581
582 fn append_pooled_transaction_elements(
583 &self,
584 tx_hashes: &[TxHash],
585 limit: GetPooledTransactionLimit,
586 out: &mut Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>,
587 ) {
588 self.pool.append_pooled_transaction_elements(tx_hashes, limit, out)
589 }
590
591 fn get_pooled_transaction_element(
592 &self,
593 tx_hash: TxHash,
594 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
595 {
596 self.pool.get_pooled_transaction_element(tx_hash)
597 }
598
599 fn best_transactions(
600 &self,
601 ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>> {
602 Box::new(self.pool.best_transactions())
603 }
604
605 fn best_transactions_with_attributes(
606 &self,
607 best_transactions_attributes: BestTransactionsAttributes,
608 ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>> {
609 self.pool.best_transactions_with_attributes(best_transactions_attributes)
610 }
611
612 fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
613 self.pool.pending_transactions()
614 }
615
616 fn get_pending_transaction_by_sender_and_nonce(
617 &self,
618 sender: Address,
619 nonce: u64,
620 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
621 self.pool.get_pending_transaction_by_sender_and_nonce(sender, nonce)
622 }
623
624 fn pending_transactions_max(
625 &self,
626 max: usize,
627 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
628 self.pool.pending_transactions_max(max)
629 }
630
631 fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
632 self.pool.queued_transactions()
633 }
634
635 fn pending_and_queued_txn_count(&self) -> (usize, usize) {
636 let data = self.pool.get_pool_data();
637 let pending = data.pending_transactions_count();
638 let queued = data.queued_transactions_count();
639 (pending, queued)
640 }
641
642 fn all_transactions(&self) -> AllPoolTransactions<Self::Transaction> {
643 self.pool.all_transactions()
644 }
645
646 fn all_transaction_hashes(&self) -> Vec<TxHash> {
647 self.pool.all_transaction_hashes()
648 }
649
650 fn remove_transactions(
651 &self,
652 hashes: Vec<TxHash>,
653 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
654 self.pool.remove_transactions(hashes)
655 }
656
657 fn remove_transactions_and_descendants(
658 &self,
659 hashes: Vec<TxHash>,
660 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
661 self.pool.remove_transactions_and_descendants(hashes)
662 }
663
664 fn remove_transactions_by_sender(
665 &self,
666 sender: Address,
667 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
668 self.pool.remove_transactions_by_sender(sender)
669 }
670
671 fn prune_transactions(
672 &self,
673 hashes: Vec<TxHash>,
674 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
675 self.pool.prune_transactions(hashes)
676 }
677
678 fn retain_unknown<A>(&self, announcement: &mut A)
679 where
680 A: HandleMempoolData,
681 {
682 self.pool.retain_unknown(announcement)
683 }
684
685 fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
686 self.inner().get(tx_hash)
687 }
688
689 fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
690 self.inner().get_all(txs)
691 }
692
693 fn on_propagated(&self, txs: PropagatedTransactions) {
694 self.inner().on_propagated(txs)
695 }
696
697 fn get_transactions_by_sender(
698 &self,
699 sender: Address,
700 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
701 self.pool.get_transactions_by_sender(sender)
702 }
703
704 fn get_pending_transactions_with_predicate(
705 &self,
706 predicate: impl FnMut(&ValidPoolTransaction<Self::Transaction>) -> bool,
707 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
708 self.pool.pending_transactions_with_predicate(predicate)
709 }
710
711 fn get_pending_transactions_by_sender(
712 &self,
713 sender: Address,
714 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
715 self.pool.get_pending_transactions_by_sender(sender)
716 }
717
718 fn get_queued_transactions_by_sender(
719 &self,
720 sender: Address,
721 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
722 self.pool.get_queued_transactions_by_sender(sender)
723 }
724
725 fn get_highest_transaction_by_sender(
726 &self,
727 sender: Address,
728 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
729 self.pool.get_highest_transaction_by_sender(sender)
730 }
731
732 fn get_highest_consecutive_transaction_by_sender(
733 &self,
734 sender: Address,
735 on_chain_nonce: u64,
736 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
737 self.pool.get_highest_consecutive_transaction_by_sender(sender, on_chain_nonce)
738 }
739
740 fn get_transaction_by_sender_and_nonce(
741 &self,
742 sender: Address,
743 nonce: u64,
744 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
745 let sender_id = self.pool.sender_id(&sender)?;
746 let transaction_id = TransactionId::new(sender_id, nonce);
747
748 self.inner().get_pool_data().all().get(&transaction_id).map(|tx| tx.transaction.clone())
749 }
750
751 fn get_transactions_by_origin(
752 &self,
753 origin: TransactionOrigin,
754 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
755 self.pool.get_transactions_by_origin(origin)
756 }
757
758 fn get_pending_transactions_by_origin(
760 &self,
761 origin: TransactionOrigin,
762 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
763 self.pool.get_pending_transactions_by_origin(origin)
764 }
765
766 fn unique_senders(&self) -> AddressSet {
767 self.pool.unique_senders()
768 }
769
770 fn get_blob(
771 &self,
772 tx_hash: TxHash,
773 ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
774 self.pool.blob_store().get(tx_hash)
775 }
776
777 fn get_all_blobs(
778 &self,
779 tx_hashes: Vec<TxHash>,
780 ) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
781 self.pool.blob_store().get_all(tx_hashes)
782 }
783
784 fn get_all_blobs_exact(
785 &self,
786 tx_hashes: Vec<TxHash>,
787 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
788 self.pool.blob_store().get_exact(tx_hashes)
789 }
790
791 fn get_blobs_for_versioned_hashes_v1(
792 &self,
793 versioned_hashes: &[B256],
794 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
795 self.pool.blob_store().get_by_versioned_hashes_v1(versioned_hashes)
796 }
797
798 fn get_blobs_for_versioned_hashes_v2(
799 &self,
800 versioned_hashes: &[B256],
801 ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
802 self.pool.blob_store().get_by_versioned_hashes_v2(versioned_hashes)
803 }
804
805 fn get_blobs_for_versioned_hashes_v3(
806 &self,
807 versioned_hashes: &[B256],
808 ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
809 self.pool.blob_store().get_by_versioned_hashes_v3(versioned_hashes)
810 }
811
812 fn get_blobs_for_versioned_hashes_v4(
813 &self,
814 versioned_hashes: &[B256],
815 indices_bitarray: B128,
816 ) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
817 self.pool.blob_store().get_by_versioned_hashes_v4(versioned_hashes, indices_bitarray)
818 }
819}
820
821impl<V, T, S> TransactionPoolExt for Pool<V, T, S>
822where
823 V: TransactionValidator,
824 <V as TransactionValidator>::Transaction: EthPoolTransaction,
825 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
826 S: BlobStore,
827{
828 type Block = V::Block;
829
830 #[instrument(skip(self), target = "txpool")]
831 fn set_block_info(&self, info: BlockInfo) {
832 trace!(target: "txpool", "updating pool block info");
833 self.pool.set_block_info(info)
834 }
835
836 fn on_canonical_state_change(&self, update: CanonicalStateUpdate<'_, Self::Block>) {
837 self.pool.on_canonical_state_change(update);
838 }
839
840 fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
841 self.pool.update_accounts(accounts);
842 }
843
844 fn delete_blob(&self, tx: TxHash) {
845 self.pool.delete_blob(tx)
846 }
847
848 fn delete_blobs(&self, txs: Vec<TxHash>) {
849 self.pool.delete_blobs(txs)
850 }
851
852 fn cleanup_blobs(&self) {
853 self.pool.cleanup_blobs()
854 }
855}
856
857impl<V, T: TransactionOrdering, S> Clone for Pool<V, T, S> {
858 fn clone(&self) -> Self {
859 Self { pool: Arc::clone(&self.pool) }
860 }
861}