1#![doc(
272 html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
273 html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
274 issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
275)]
276#![cfg_attr(docsrs, feature(doc_cfg))]
277#![cfg_attr(not(test), warn(unused_crate_dependencies))]
278
279pub use imbl::OrdMap;
280
281pub use crate::{
282 batcher::{BatchTxProcessor, BatchTxRequest},
283 blobstore::{BlobStore, BlobStoreError},
284 config::{
285 LocalTransactionConfig, PoolConfig, PriceBumpConfig, SubPoolLimit,
286 DEFAULT_MAX_INFLIGHT_DELEGATED_SLOTS, DEFAULT_PRICE_BUMP,
287 DEFAULT_TXPOOL_ADDITIONAL_VALIDATION_TASKS, MAX_NEW_PENDING_TXS_NOTIFICATIONS,
288 REPLACE_BLOB_PRICE_BUMP, TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER,
289 TXPOOL_SUBPOOL_MAX_SIZE_MB_DEFAULT, TXPOOL_SUBPOOL_MAX_TXS_DEFAULT,
290 },
291 error::{PoolResult, RawPoolTransactionError},
292 ordering::{CoinbaseTipOrdering, Priority, TransactionOrdering},
293 pool::{
294 blob_tx_priority, fee_delta, state::SubPool, AddedTransactionOutcome,
295 AllTransactionsEvents, FullTransactionEvent, NewTransactionEvent, TransactionEvent,
296 TransactionEvents, TransactionListenerKind,
297 },
298 traits::*,
299 validate::{
300 EthTransactionValidator, StatefulValidationFn, StatelessValidationFn,
301 TransactionValidationOutcome, TransactionValidationTaskExecutor, TransactionValidator,
302 ValidPoolTransaction,
303 },
304};
305use crate::{identifier::TransactionId, pool::PoolInner};
306use alloy_eips::{
307 eip4844::{BlobAndProofV1, BlobAndProofV2, BlobCellsAndProofsV1},
308 eip7594::BlobTransactionSidecarVariant,
309};
310use alloy_primitives::{map::AddressSet, Address, TxHash, B128, B256, U256};
311use aquamarine as _;
312use reth_chainspec::{ChainSpecProvider, EthereumHardforks};
313use reth_eth_wire_types::HandleMempoolData;
314use reth_evm::ConfigureEvm;
315use reth_evm_ethereum::EthEvmConfig;
316use reth_execution_types::ChangedAccount;
317use reth_primitives_traits::{HeaderTy, Recovered};
318use reth_storage_api::{BlockReaderIdExt, StateProviderFactory};
319use std::sync::Arc;
320use tokio::sync::mpsc::Receiver;
321use tracing::{instrument, trace};
322
323pub mod error;
324pub mod maintain;
325pub mod metrics;
326pub mod noop;
327pub mod pool;
328pub mod validate;
329
330pub mod batcher;
331pub mod blobstore;
332mod config;
333pub mod identifier;
334mod ordering;
335mod traits;
336
337#[cfg(any(test, feature = "test-utils"))]
338pub mod test_utils;
340
341pub type EthTransactionPool<Client, S, Evm = EthEvmConfig, T = EthPooledTransaction> = Pool<
343 TransactionValidationTaskExecutor<EthTransactionValidator<Client, T, Evm>>,
344 CoinbaseTipOrdering<T>,
345 S,
346>;
347
348#[derive(Debug)]
350pub struct Pool<V, T: TransactionOrdering, S> {
351 pool: Arc<PoolInner<V, T, S>>,
353}
354
355impl<V, T, S> Pool<V, T, S>
358where
359 V: TransactionValidator,
360 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
361 S: BlobStore,
362{
363 pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
365 Self { pool: Arc::new(PoolInner::new(validator, ordering, blob_store, config)) }
366 }
367
368 pub fn inner(&self) -> &PoolInner<V, T, S> {
370 &self.pool
371 }
372
373 pub fn config(&self) -> &PoolConfig {
375 self.inner().config()
376 }
377
378 pub fn validator(&self) -> &V {
380 self.inner().validator()
381 }
382
383 async fn validate(
385 &self,
386 origin: TransactionOrigin,
387 transaction: V::Transaction,
388 ) -> TransactionValidationOutcome<V::Transaction> {
389 self.pool.validator().validate_transaction(origin, transaction).await
390 }
391
392 pub fn len(&self) -> usize {
394 self.pool.len()
395 }
396
397 pub fn is_empty(&self) -> bool {
399 self.pool.is_empty()
400 }
401
402 pub fn is_exceeded(&self) -> bool {
404 self.pool.is_exceeded()
405 }
406
407 pub fn blob_store(&self) -> &S {
409 self.pool.blob_store()
410 }
411}
412
413impl<Client, S, Evm> EthTransactionPool<Client, S, Evm>
414where
415 Client: ChainSpecProvider<ChainSpec: EthereumHardforks>
416 + StateProviderFactory
417 + Clone
418 + BlockReaderIdExt<Header = HeaderTy<Evm::Primitives>>
419 + 'static,
420 S: BlobStore,
421 Evm: ConfigureEvm + 'static,
422{
423 pub fn eth_pool(
458 validator: TransactionValidationTaskExecutor<
459 EthTransactionValidator<Client, EthPooledTransaction, Evm>,
460 >,
461 blob_store: S,
462 config: PoolConfig,
463 ) -> Self {
464 Self::new(validator, CoinbaseTipOrdering::default(), blob_store, config)
465 }
466}
467
468impl<V, T, S> TransactionPool for Pool<V, T, S>
470where
471 V: TransactionValidator,
472 <V as TransactionValidator>::Transaction: EthPoolTransaction,
473 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
474 S: BlobStore + Clone,
475{
476 type Transaction = T::Transaction;
477
478 fn pool_size(&self) -> PoolSize {
479 self.pool.size()
480 }
481
482 fn block_info(&self) -> BlockInfo {
483 self.pool.block_info()
484 }
485
486 async fn add_transaction_and_subscribe(
487 &self,
488 origin: TransactionOrigin,
489 transaction: Self::Transaction,
490 ) -> PoolResult<TransactionEvents> {
491 let tx = self.validate(origin, transaction).await;
492 self.pool.add_transaction_and_subscribe(origin, tx)
493 }
494
495 async fn add_transaction(
496 &self,
497 origin: TransactionOrigin,
498 transaction: Self::Transaction,
499 ) -> PoolResult<AddedTransactionOutcome> {
500 let tx = self.validate(origin, transaction).await;
501 let mut results = self.pool.add_transactions(origin, std::iter::once(tx));
502 results.pop().expect("result length is the same as the input")
503 }
504
505 async fn add_transactions(
506 &self,
507 origin: TransactionOrigin,
508 transactions: Vec<Self::Transaction>,
509 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
510 if transactions.is_empty() {
511 return Vec::new()
512 }
513 let validated = self
514 .pool
515 .validator()
516 .validate_transactions(transactions.into_iter().map(|tx| (origin, tx)))
517 .await;
518 self.pool.add_transactions(origin, validated)
519 }
520
521 async fn add_transactions_with_origins(
522 &self,
523 transactions: Vec<(TransactionOrigin, Self::Transaction)>,
524 ) -> Vec<PoolResult<AddedTransactionOutcome>> {
525 if transactions.is_empty() {
526 return Vec::new()
527 }
528 let origins: Vec<_> = transactions.iter().map(|(origin, _)| *origin).collect();
529 let validated = self.pool.validator().validate_transactions(transactions).await;
530 self.pool.add_transactions_with_origins(origins.into_iter().zip(validated))
531 }
532
533 fn transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
534 self.pool.add_transaction_event_listener(tx_hash)
535 }
536
537 fn all_transactions_event_listener(&self) -> AllTransactionsEvents<Self::Transaction> {
538 self.pool.add_all_transactions_event_listener()
539 }
540
541 fn pending_transactions_listener_for(&self, kind: TransactionListenerKind) -> Receiver<TxHash> {
542 self.pool.add_pending_listener(kind)
543 }
544
545 fn blob_transaction_sidecars_listener(&self) -> Receiver<NewBlobSidecar> {
546 self.pool.add_blob_sidecar_listener()
547 }
548
549 fn new_transactions_listener_for(
550 &self,
551 kind: TransactionListenerKind,
552 ) -> Receiver<NewTransactionEvent<Self::Transaction>> {
553 self.pool.add_new_transaction_listener(kind)
554 }
555
556 fn pooled_transaction_hashes(&self) -> Vec<TxHash> {
557 self.pool.pooled_transactions_hashes()
558 }
559
560 fn pooled_transaction_hashes_max(&self, max: usize) -> Vec<TxHash> {
561 self.pool.pooled_transactions_hashes_max(max)
562 }
563
564 fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
565 self.pool.pooled_transactions()
566 }
567
568 fn pooled_transactions_max(
569 &self,
570 max: usize,
571 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
572 self.pool.pooled_transactions_max(max)
573 }
574
575 fn get_pooled_transaction_elements(
576 &self,
577 tx_hashes: Vec<TxHash>,
578 limit: GetPooledTransactionLimit,
579 ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled> {
580 self.pool.get_pooled_transaction_elements(tx_hashes, limit)
581 }
582
583 fn append_pooled_transaction_elements(
584 &self,
585 tx_hashes: &[TxHash],
586 limit: GetPooledTransactionLimit,
587 out: &mut Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>,
588 ) {
589 self.pool.append_pooled_transaction_elements(tx_hashes, limit, out)
590 }
591
592 fn get_pooled_transaction_element(
593 &self,
594 tx_hash: TxHash,
595 ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
596 {
597 self.pool.get_pooled_transaction_element(tx_hash)
598 }
599
600 fn best_transactions(
601 &self,
602 ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>> {
603 Box::new(self.pool.best_transactions())
604 }
605
606 fn best_transactions_with_attributes(
607 &self,
608 best_transactions_attributes: BestTransactionsAttributes,
609 ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>> {
610 self.pool.best_transactions_with_attributes(best_transactions_attributes)
611 }
612
613 fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
614 self.pool.pending_transactions()
615 }
616
617 fn get_pending_transaction_by_sender_and_nonce(
618 &self,
619 sender: Address,
620 nonce: u64,
621 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
622 self.pool.get_pending_transaction_by_sender_and_nonce(sender, nonce)
623 }
624
625 fn pending_transactions_max(
626 &self,
627 max: usize,
628 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
629 self.pool.pending_transactions_max(max)
630 }
631
632 fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
633 self.pool.queued_transactions()
634 }
635
636 fn pending_and_queued_txn_count(&self) -> (usize, usize) {
637 let data = self.pool.get_pool_data();
638 let pending = data.pending_transactions_count();
639 let queued = data.queued_transactions_count();
640 (pending, queued)
641 }
642
643 fn all_transactions(&self) -> AllPoolTransactions<Self::Transaction> {
644 self.pool.all_transactions()
645 }
646
647 fn all_transaction_hashes(&self) -> Vec<TxHash> {
648 self.pool.all_transaction_hashes()
649 }
650
651 fn remove_transactions(
652 &self,
653 hashes: Vec<TxHash>,
654 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
655 self.pool.remove_transactions(hashes)
656 }
657
658 fn remove_transactions_and_descendants(
659 &self,
660 hashes: Vec<TxHash>,
661 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
662 self.pool.remove_transactions_and_descendants(hashes)
663 }
664
665 fn remove_transactions_by_sender(
666 &self,
667 sender: Address,
668 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
669 self.pool.remove_transactions_by_sender(sender)
670 }
671
672 fn prune_transactions(
673 &self,
674 hashes: Vec<TxHash>,
675 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
676 self.pool.prune_transactions(hashes)
677 }
678
679 fn retain_unknown<A>(&self, announcement: &mut A)
680 where
681 A: HandleMempoolData,
682 {
683 self.pool.retain_unknown(announcement)
684 }
685
686 fn retain_contains<A>(&self, announcement: &mut A)
687 where
688 A: HandleMempoolData,
689 {
690 self.pool.retain_contains(announcement)
691 }
692
693 fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
694 self.inner().get(tx_hash)
695 }
696
697 fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
698 self.inner().get_all(txs)
699 }
700
701 fn on_propagated(&self, txs: PropagatedTransactions) {
702 self.inner().on_propagated(txs)
703 }
704
705 fn get_transactions_by_sender(
706 &self,
707 sender: Address,
708 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
709 self.pool.get_transactions_by_sender(sender)
710 }
711
712 fn get_pending_transactions_with_predicate(
713 &self,
714 predicate: impl FnMut(&ValidPoolTransaction<Self::Transaction>) -> bool,
715 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
716 self.pool.pending_transactions_with_predicate(predicate)
717 }
718
719 fn get_pending_transactions_by_sender(
720 &self,
721 sender: Address,
722 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
723 self.pool.get_pending_transactions_by_sender(sender)
724 }
725
726 fn get_queued_transactions_by_sender(
727 &self,
728 sender: Address,
729 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
730 self.pool.get_queued_transactions_by_sender(sender)
731 }
732
733 fn get_highest_transaction_by_sender(
734 &self,
735 sender: Address,
736 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
737 self.pool.get_highest_transaction_by_sender(sender)
738 }
739
740 fn get_highest_consecutive_transaction_by_sender(
741 &self,
742 sender: Address,
743 on_chain_nonce: u64,
744 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
745 self.pool.get_highest_consecutive_transaction_by_sender(sender, on_chain_nonce)
746 }
747
748 fn get_transaction_by_sender_and_nonce(
749 &self,
750 sender: Address,
751 nonce: u64,
752 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
753 let sender_id = self.pool.sender_id(&sender)?;
754 let transaction_id = TransactionId::new(sender_id, nonce);
755
756 self.inner().get_pool_data().all().get(&transaction_id).map(|tx| tx.transaction.clone())
757 }
758
759 fn get_transactions_by_origin(
760 &self,
761 origin: TransactionOrigin,
762 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
763 self.pool.get_transactions_by_origin(origin)
764 }
765
766 fn get_pending_transactions_by_origin(
768 &self,
769 origin: TransactionOrigin,
770 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
771 self.pool.get_pending_transactions_by_origin(origin)
772 }
773
774 fn unique_senders(&self) -> AddressSet {
775 self.pool.unique_senders()
776 }
777
778 fn get_blob(
779 &self,
780 tx_hash: TxHash,
781 ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
782 self.pool.blob_store().get(tx_hash)
783 }
784
785 fn get_all_blobs(
786 &self,
787 tx_hashes: Vec<TxHash>,
788 ) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError> {
789 self.pool.blob_store().get_all(tx_hashes)
790 }
791
792 fn get_all_blobs_exact(
793 &self,
794 tx_hashes: Vec<TxHash>,
795 ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError> {
796 self.pool.blob_store().get_exact(tx_hashes)
797 }
798
799 fn get_blobs_for_versioned_hashes_v1(
800 &self,
801 versioned_hashes: &[B256],
802 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError> {
803 self.pool.blob_store().get_by_versioned_hashes_v1(versioned_hashes)
804 }
805
806 fn get_blobs_for_versioned_hashes_v2(
807 &self,
808 versioned_hashes: &[B256],
809 ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError> {
810 self.pool.blob_store().get_by_versioned_hashes_v2(versioned_hashes)
811 }
812
813 fn get_blobs_for_versioned_hashes_v3(
814 &self,
815 versioned_hashes: &[B256],
816 ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError> {
817 self.pool.blob_store().get_by_versioned_hashes_v3(versioned_hashes)
818 }
819
820 fn get_blobs_for_versioned_hashes_v4(
821 &self,
822 versioned_hashes: &[B256],
823 indices_bitarray: B128,
824 ) -> Result<Vec<Option<BlobCellsAndProofsV1>>, BlobStoreError> {
825 self.pool.blob_store().get_by_versioned_hashes_v4(versioned_hashes, indices_bitarray)
826 }
827
828 fn blob_store(&self) -> Box<dyn BlobStore> {
829 Box::new(self.pool.blob_store().clone())
830 }
831}
832
833impl<V, T, S> TransactionPoolExt for Pool<V, T, S>
834where
835 V: TransactionValidator,
836 <V as TransactionValidator>::Transaction: EthPoolTransaction,
837 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
838 S: BlobStore + Clone,
839{
840 type Block = V::Block;
841
842 #[instrument(skip(self), target = "txpool")]
843 fn set_block_info(&self, info: BlockInfo) {
844 trace!(target: "txpool", "updating pool block info");
845 self.pool.set_block_info(info)
846 }
847
848 fn on_canonical_state_change(&self, update: CanonicalStateUpdate<'_, Self::Block>) {
849 self.pool.on_canonical_state_change(update);
850 }
851
852 fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
853 self.pool.update_accounts(accounts);
854 }
855
856 fn delete_blob(&self, tx: TxHash) {
857 self.pool.delete_blob(tx)
858 }
859
860 fn delete_blobs(&self, txs: Vec<TxHash>) {
861 self.pool.delete_blobs(txs)
862 }
863
864 fn cleanup_blobs(&self) {
865 self.pool.cleanup_blobs()
866 }
867}
868
869impl<V, T, S> ValidatingPool for Pool<V, T, S>
870where
871 V: TransactionValidator,
872 <V as TransactionValidator>::Transaction: EthPoolTransaction,
873 T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
874 S: BlobStore + Clone,
875{
876 type Validator = V;
877
878 fn validator(&self) -> &Self::Validator {
879 self.inner().validator()
880 }
881}
882
883impl<V, T: TransactionOrdering, S> Clone for Pool<V, T, S> {
884 fn clone(&self) -> Self {
885 Self { pool: Arc::clone(&self.pool) }
886 }
887}