1use crate::{
2 blobstore::BlobStoreError,
3 error::{InvalidPoolTransactionError, PoolResult},
4 pool::{state::SubPool, BestTransactionFilter, TransactionEvents},
5 validate::ValidPoolTransaction,
6 AllTransactionsEvents,
7};
8use alloy_consensus::{transaction::PooledTransaction, BlockHeader, Signed, Typed2718};
9use alloy_eips::{
10 eip2718::Encodable2718,
11 eip2930::AccessList,
12 eip4844::{
13 env_settings::KzgSettings, BlobAndProofV1, BlobTransactionSidecar,
14 BlobTransactionValidationError,
15 },
16 eip7702::SignedAuthorization,
17};
18use alloy_primitives::{Address, Bytes, TxHash, TxKind, B256, U256};
19use futures_util::{ready, Stream};
20use reth_eth_wire_types::HandleMempoolData;
21use reth_ethereum_primitives::{Transaction, TransactionSigned};
22use reth_execution_types::ChangedAccount;
23use reth_primitives_traits::{
24 transaction::error::TransactionConversionError, Block, InMemorySize, Recovered, SealedBlock,
25 SignedTransaction,
26};
27#[cfg(feature = "serde")]
28use serde::{Deserialize, Serialize};
29use std::{
30 collections::{HashMap, HashSet},
31 fmt,
32 future::Future,
33 pin::Pin,
34 sync::Arc,
35 task::{Context, Poll},
36};
37use tokio::sync::mpsc::Receiver;
38
39pub type PeerId = alloy_primitives::B512;
41
42pub type PoolTx<P> = <P as TransactionPool>::Transaction;
44pub type PoolConsensusTx<P> = <<P as TransactionPool>::Transaction as PoolTransaction>::Consensus;
46
47pub type PoolPooledTx<P> = <<P as TransactionPool>::Transaction as PoolTransaction>::Pooled;
49
50#[auto_impl::auto_impl(&, Arc)]
59pub trait TransactionPool: Send + Sync + Clone {
60 type Transaction: EthPoolTransaction;
62
63 fn pool_size(&self) -> PoolSize;
65
66 fn block_info(&self) -> BlockInfo;
70
71 fn add_external_transaction(
78 &self,
79 transaction: Self::Transaction,
80 ) -> impl Future<Output = PoolResult<TxHash>> + Send {
81 self.add_transaction(TransactionOrigin::External, transaction)
82 }
83
84 fn add_external_transactions(
88 &self,
89 transactions: Vec<Self::Transaction>,
90 ) -> impl Future<Output = Vec<PoolResult<TxHash>>> + Send {
91 self.add_transactions(TransactionOrigin::External, transactions)
92 }
93
94 fn add_transaction_and_subscribe(
101 &self,
102 origin: TransactionOrigin,
103 transaction: Self::Transaction,
104 ) -> impl Future<Output = PoolResult<TransactionEvents>> + Send;
105
106 fn add_transaction(
110 &self,
111 origin: TransactionOrigin,
112 transaction: Self::Transaction,
113 ) -> impl Future<Output = PoolResult<TxHash>> + Send;
114
115 fn add_transactions(
121 &self,
122 origin: TransactionOrigin,
123 transactions: Vec<Self::Transaction>,
124 ) -> impl Future<Output = Vec<PoolResult<TxHash>>> + Send;
125
126 fn transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents>;
130
131 fn all_transactions_event_listener(&self) -> AllTransactionsEvents<Self::Transaction>;
133
134 fn pending_transactions_listener(&self) -> Receiver<TxHash> {
142 self.pending_transactions_listener_for(TransactionListenerKind::PropagateOnly)
143 }
144
145 fn pending_transactions_listener_for(&self, kind: TransactionListenerKind) -> Receiver<TxHash>;
148
149 fn new_transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>> {
151 self.new_transactions_listener_for(TransactionListenerKind::PropagateOnly)
152 }
153
154 fn blob_transaction_sidecars_listener(&self) -> Receiver<NewBlobSidecar>;
157
158 fn new_transactions_listener_for(
161 &self,
162 kind: TransactionListenerKind,
163 ) -> Receiver<NewTransactionEvent<Self::Transaction>>;
164
165 fn new_pending_pool_transactions_listener(
170 &self,
171 ) -> NewSubpoolTransactionStream<Self::Transaction> {
172 NewSubpoolTransactionStream::new(
173 self.new_transactions_listener_for(TransactionListenerKind::PropagateOnly),
174 SubPool::Pending,
175 )
176 }
177
178 fn new_basefee_pool_transactions_listener(
183 &self,
184 ) -> NewSubpoolTransactionStream<Self::Transaction> {
185 NewSubpoolTransactionStream::new(self.new_transactions_listener(), SubPool::BaseFee)
186 }
187
188 fn new_queued_transactions_listener(&self) -> NewSubpoolTransactionStream<Self::Transaction> {
193 NewSubpoolTransactionStream::new(self.new_transactions_listener(), SubPool::Queued)
194 }
195
196 fn pooled_transaction_hashes(&self) -> Vec<TxHash>;
202
203 fn pooled_transaction_hashes_max(&self, max: usize) -> Vec<TxHash>;
207
208 fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
219
220 fn pooled_transactions_max(
224 &self,
225 max: usize,
226 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
227
228 fn get_pooled_transaction_elements(
240 &self,
241 tx_hashes: Vec<TxHash>,
242 limit: GetPooledTransactionLimit,
243 ) -> Vec<<Self::Transaction as PoolTransaction>::Pooled>;
244
245 fn get_pooled_transaction_element(
258 &self,
259 tx_hash: TxHash,
260 ) -> Option<Recovered<<Self::Transaction as PoolTransaction>::Pooled>>;
261
262 fn best_transactions(
266 &self,
267 ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>>;
268
269 fn best_transactions_with_attributes(
274 &self,
275 best_transactions_attributes: BestTransactionsAttributes,
276 ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>>;
277
278 fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
288
289 fn pending_transactions_max(
294 &self,
295 max: usize,
296 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
297
298 fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
304
305 fn all_transactions(&self) -> AllPoolTransactions<Self::Transaction>;
312
313 fn remove_transactions(
317 &self,
318 hashes: Vec<TxHash>,
319 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
320
321 fn remove_transactions_and_descendants(
327 &self,
328 hashes: Vec<TxHash>,
329 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
330
331 fn remove_transactions_by_sender(
335 &self,
336 sender: Address,
337 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
338
339 fn retain_unknown<A>(&self, announcement: &mut A)
345 where
346 A: HandleMempoolData;
347
348 fn contains(&self, tx_hash: &TxHash) -> bool {
350 self.get(tx_hash).is_some()
351 }
352
353 fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
355
356 fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
360
361 fn on_propagated(&self, txs: PropagatedTransactions);
365
366 fn get_transactions_by_sender(
368 &self,
369 sender: Address,
370 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
371
372 fn get_pending_transactions_with_predicate(
374 &self,
375 predicate: impl FnMut(&ValidPoolTransaction<Self::Transaction>) -> bool,
376 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
377
378 fn get_pending_transactions_by_sender(
380 &self,
381 sender: Address,
382 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
383
384 fn get_queued_transactions_by_sender(
386 &self,
387 sender: Address,
388 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
389
390 fn get_highest_transaction_by_sender(
392 &self,
393 sender: Address,
394 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
395
396 fn get_highest_consecutive_transaction_by_sender(
406 &self,
407 sender: Address,
408 on_chain_nonce: u64,
409 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
410
411 fn get_transaction_by_sender_and_nonce(
413 &self,
414 sender: Address,
415 nonce: u64,
416 ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
417
418 fn get_transactions_by_origin(
420 &self,
421 origin: TransactionOrigin,
422 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
423
424 fn get_pending_transactions_by_origin(
426 &self,
427 origin: TransactionOrigin,
428 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
429
430 fn get_local_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
432 self.get_transactions_by_origin(TransactionOrigin::Local)
433 }
434
435 fn get_private_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
437 self.get_transactions_by_origin(TransactionOrigin::Private)
438 }
439
440 fn get_external_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
442 self.get_transactions_by_origin(TransactionOrigin::External)
443 }
444
445 fn get_local_pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
447 self.get_pending_transactions_by_origin(TransactionOrigin::Local)
448 }
449
450 fn get_private_pending_transactions(
452 &self,
453 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
454 self.get_pending_transactions_by_origin(TransactionOrigin::Private)
455 }
456
457 fn get_external_pending_transactions(
459 &self,
460 ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
461 self.get_pending_transactions_by_origin(TransactionOrigin::External)
462 }
463
464 fn unique_senders(&self) -> HashSet<Address>;
466
467 fn get_blob(
470 &self,
471 tx_hash: TxHash,
472 ) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError>;
473
474 fn get_all_blobs(
480 &self,
481 tx_hashes: Vec<TxHash>,
482 ) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecar>)>, BlobStoreError>;
483
484 fn get_all_blobs_exact(
489 &self,
490 tx_hashes: Vec<TxHash>,
491 ) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError>;
492
493 fn get_blobs_for_versioned_hashes(
495 &self,
496 versioned_hashes: &[B256],
497 ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError>;
498}
499
500#[auto_impl::auto_impl(&, Arc)]
502pub trait TransactionPoolExt: TransactionPool {
503 fn set_block_info(&self, info: BlockInfo);
505
506 fn on_canonical_state_change<B>(&self, update: CanonicalStateUpdate<'_, B>)
522 where
523 B: Block;
524
525 fn update_accounts(&self, accounts: Vec<ChangedAccount>);
527
528 fn delete_blob(&self, tx: B256);
530
531 fn delete_blobs(&self, txs: Vec<B256>);
533
534 fn cleanup_blobs(&self);
536}
537
538#[derive(Debug, Copy, Clone, PartialEq, Eq)]
542pub enum TransactionListenerKind {
543 All,
545 PropagateOnly,
549}
550
551impl TransactionListenerKind {
552 #[inline]
554 pub const fn is_propagate_only(&self) -> bool {
555 matches!(self, Self::PropagateOnly)
556 }
557}
558
559#[derive(Debug, Clone)]
561pub struct AllPoolTransactions<T: PoolTransaction> {
562 pub pending: Vec<Arc<ValidPoolTransaction<T>>>,
564 pub queued: Vec<Arc<ValidPoolTransaction<T>>>,
568}
569
570impl<T: PoolTransaction> AllPoolTransactions<T> {
573 pub fn pending_recovered(&self) -> impl Iterator<Item = Recovered<T::Consensus>> + '_ {
575 self.pending.iter().map(|tx| tx.transaction.clone().into_consensus())
576 }
577
578 pub fn queued_recovered(&self) -> impl Iterator<Item = Recovered<T::Consensus>> + '_ {
580 self.queued.iter().map(|tx| tx.transaction.clone().into_consensus())
581 }
582
583 pub fn all(&self) -> impl Iterator<Item = Recovered<T::Consensus>> + '_ {
585 self.pending
586 .iter()
587 .chain(self.queued.iter())
588 .map(|tx| tx.transaction.clone().into_consensus())
589 }
590}
591
592impl<T: PoolTransaction> Default for AllPoolTransactions<T> {
593 fn default() -> Self {
594 Self { pending: Default::default(), queued: Default::default() }
595 }
596}
597
598#[derive(Debug, Clone, Eq, PartialEq, Default)]
600pub struct PropagatedTransactions(pub HashMap<TxHash, Vec<PropagateKind>>);
601
602#[derive(Debug, Copy, Clone, Eq, PartialEq)]
604#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
605pub enum PropagateKind {
606 Full(PeerId),
610 Hash(PeerId),
612}
613
614impl PropagateKind {
617 pub const fn peer(&self) -> &PeerId {
619 match self {
620 Self::Full(peer) | Self::Hash(peer) => peer,
621 }
622 }
623
624 pub const fn is_full(&self) -> bool {
626 matches!(self, Self::Full(_))
627 }
628
629 pub const fn is_hash(&self) -> bool {
631 matches!(self, Self::Hash(_))
632 }
633}
634
635impl From<PropagateKind> for PeerId {
636 fn from(value: PropagateKind) -> Self {
637 match value {
638 PropagateKind::Full(peer) | PropagateKind::Hash(peer) => peer,
639 }
640 }
641}
642
643#[derive(Debug)]
645pub struct NewTransactionEvent<T: PoolTransaction> {
646 pub subpool: SubPool,
648 pub transaction: Arc<ValidPoolTransaction<T>>,
650}
651
652impl<T: PoolTransaction> Clone for NewTransactionEvent<T> {
653 fn clone(&self) -> Self {
654 Self { subpool: self.subpool, transaction: self.transaction.clone() }
655 }
656}
657
658#[derive(Debug, Clone)]
662pub struct NewBlobSidecar {
663 pub tx_hash: TxHash,
665 pub sidecar: Arc<BlobTransactionSidecar>,
667}
668
669#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
674pub enum TransactionOrigin {
675 #[default]
677 Local,
678 External,
683 Private,
688}
689
690impl TransactionOrigin {
693 pub const fn is_local(&self) -> bool {
695 matches!(self, Self::Local)
696 }
697
698 pub const fn is_external(&self) -> bool {
700 matches!(self, Self::External)
701 }
702 pub const fn is_private(&self) -> bool {
704 matches!(self, Self::Private)
705 }
706}
707
708#[derive(Debug, Clone, Copy, PartialEq, Eq)]
710pub enum PoolUpdateKind {
711 Commit,
713 Reorg,
715}
716
717#[derive(Clone, Debug)]
726pub struct CanonicalStateUpdate<'a, B: Block> {
727 pub new_tip: &'a SealedBlock<B>,
729 pub pending_block_base_fee: u64,
733 pub pending_block_blob_fee: Option<u128>,
737 pub changed_accounts: Vec<ChangedAccount>,
739 pub mined_transactions: Vec<B256>,
741 pub update_kind: PoolUpdateKind,
743}
744
745impl<B> CanonicalStateUpdate<'_, B>
746where
747 B: Block,
748{
749 pub fn number(&self) -> u64 {
751 self.new_tip.number()
752 }
753
754 pub fn hash(&self) -> B256 {
756 self.new_tip.hash()
757 }
758
759 pub fn timestamp(&self) -> u64 {
761 self.new_tip.timestamp()
762 }
763
764 pub fn block_info(&self) -> BlockInfo {
766 BlockInfo {
767 block_gas_limit: self.new_tip.gas_limit(),
768 last_seen_block_hash: self.hash(),
769 last_seen_block_number: self.number(),
770 pending_basefee: self.pending_block_base_fee,
771 pending_blob_fee: self.pending_block_blob_fee,
772 }
773 }
774}
775
776impl<B> fmt::Display for CanonicalStateUpdate<'_, B>
777where
778 B: Block,
779{
780 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
781 f.debug_struct("CanonicalStateUpdate")
782 .field("hash", &self.hash())
783 .field("number", &self.number())
784 .field("pending_block_base_fee", &self.pending_block_base_fee)
785 .field("pending_block_blob_fee", &self.pending_block_blob_fee)
786 .field("changed_accounts", &self.changed_accounts.len())
787 .field("mined_transactions", &self.mined_transactions.len())
788 .finish()
789 }
790}
791
792pub type BestTransactionsFor<Pool> = Box<
794 dyn BestTransactions<Item = Arc<ValidPoolTransaction<<Pool as TransactionPool>::Transaction>>>,
795>;
796
797pub trait BestTransactions: Iterator + Send {
806 fn mark_invalid(&mut self, transaction: &Self::Item, kind: InvalidPoolTransactionError);
812
813 fn no_updates(&mut self);
819
820 fn without_updates(mut self) -> Self
822 where
823 Self: Sized,
824 {
825 self.no_updates();
826 self
827 }
828
829 fn skip_blobs(&mut self) {
838 self.set_skip_blobs(true);
839 }
840
841 fn set_skip_blobs(&mut self, skip_blobs: bool);
845
846 fn without_blobs(mut self) -> Self
848 where
849 Self: Sized,
850 {
851 self.skip_blobs();
852 self
853 }
854
855 fn filter_transactions<P>(self, predicate: P) -> BestTransactionFilter<Self, P>
861 where
862 P: FnMut(&Self::Item) -> bool,
863 Self: Sized,
864 {
865 BestTransactionFilter::new(self, predicate)
866 }
867}
868
869impl<T> BestTransactions for Box<T>
870where
871 T: BestTransactions + ?Sized,
872{
873 fn mark_invalid(&mut self, transaction: &Self::Item, kind: InvalidPoolTransactionError) {
874 (**self).mark_invalid(transaction, kind)
875 }
876
877 fn no_updates(&mut self) {
878 (**self).no_updates();
879 }
880
881 fn skip_blobs(&mut self) {
882 (**self).skip_blobs();
883 }
884
885 fn set_skip_blobs(&mut self, skip_blobs: bool) {
886 (**self).set_skip_blobs(skip_blobs);
887 }
888}
889
890impl<T> BestTransactions for std::iter::Empty<T> {
892 fn mark_invalid(&mut self, _tx: &T, _kind: InvalidPoolTransactionError) {}
893
894 fn no_updates(&mut self) {}
895
896 fn skip_blobs(&mut self) {}
897
898 fn set_skip_blobs(&mut self, _skip_blobs: bool) {}
899}
900
901pub trait TransactionFilter {
903 type Transaction;
905
906 fn is_valid(&self, transaction: &Self::Transaction) -> bool;
908}
909
910#[derive(Debug, Clone)]
913pub struct NoopTransactionFilter<T>(std::marker::PhantomData<T>);
914
915impl<T> Default for NoopTransactionFilter<T> {
918 fn default() -> Self {
919 Self(std::marker::PhantomData)
920 }
921}
922
923impl<T> TransactionFilter for NoopTransactionFilter<T> {
924 type Transaction = T;
925
926 fn is_valid(&self, _transaction: &Self::Transaction) -> bool {
927 true
928 }
929}
930
931#[derive(Debug, Copy, Clone, PartialEq, Eq)]
933pub struct BestTransactionsAttributes {
934 pub basefee: u64,
936 pub blob_fee: Option<u64>,
938}
939
940impl BestTransactionsAttributes {
943 pub const fn new(basefee: u64, blob_fee: Option<u64>) -> Self {
945 Self { basefee, blob_fee }
946 }
947
948 pub const fn base_fee(basefee: u64) -> Self {
950 Self::new(basefee, None)
951 }
952
953 pub const fn with_blob_fee(mut self, blob_fee: u64) -> Self {
955 self.blob_fee = Some(blob_fee);
956 self
957 }
958}
959
960pub trait PoolTransaction:
974 alloy_consensus::Transaction + InMemorySize + fmt::Debug + Send + Sync + Clone
975{
976 type TryFromConsensusError: fmt::Display;
978
979 type Consensus: SignedTransaction + From<Self::Pooled>;
981
982 type Pooled: TryFrom<Self::Consensus, Error = Self::TryFromConsensusError> + SignedTransaction;
984
985 fn try_from_consensus(
990 tx: Recovered<Self::Consensus>,
991 ) -> Result<Self, Self::TryFromConsensusError> {
992 let (tx, signer) = tx.into_parts();
993 Ok(Self::from_pooled(Recovered::new_unchecked(tx.try_into()?, signer)))
994 }
995
996 fn clone_into_consensus(&self) -> Recovered<Self::Consensus> {
1000 self.clone().into_consensus()
1001 }
1002
1003 fn into_consensus(self) -> Recovered<Self::Consensus>;
1005
1006 fn from_pooled(pooled: Recovered<Self::Pooled>) -> Self;
1008
1009 fn try_into_pooled(self) -> Result<Recovered<Self::Pooled>, Self::TryFromConsensusError> {
1011 let consensus = self.into_consensus();
1012 let (tx, signer) = consensus.into_parts();
1013 Ok(Recovered::new_unchecked(tx.try_into()?, signer))
1014 }
1015
1016 fn pooled_into_consensus(tx: Self::Pooled) -> Self::Consensus {
1018 tx.into()
1019 }
1020
1021 fn hash(&self) -> &TxHash;
1023
1024 fn sender(&self) -> Address;
1026
1027 fn sender_ref(&self) -> &Address;
1029
1030 fn cost(&self) -> &U256;
1037
1038 fn encoded_length(&self) -> usize;
1042
1043 fn ensure_max_init_code_size(
1050 &self,
1051 max_init_code_size: usize,
1052 ) -> Result<(), InvalidPoolTransactionError> {
1053 let input_len = self.input().len();
1054 if self.is_create() && input_len > max_init_code_size {
1055 Err(InvalidPoolTransactionError::ExceedsMaxInitCodeSize(input_len, max_init_code_size))
1056 } else {
1057 Ok(())
1058 }
1059 }
1060}
1061
1062pub trait EthPoolTransaction: PoolTransaction {
1068 fn take_blob(&mut self) -> EthBlobTransactionSidecar;
1070
1071 fn try_into_pooled_eip4844(
1077 self,
1078 sidecar: Arc<BlobTransactionSidecar>,
1079 ) -> Option<Recovered<Self::Pooled>>;
1080
1081 fn try_from_eip4844(
1085 tx: Recovered<Self::Consensus>,
1086 sidecar: BlobTransactionSidecar,
1087 ) -> Option<Self>;
1088
1089 fn validate_blob(
1091 &self,
1092 blob: &BlobTransactionSidecar,
1093 settings: &KzgSettings,
1094 ) -> Result<(), BlobTransactionValidationError>;
1095}
1096
1097#[derive(Debug, Clone, PartialEq, Eq)]
1102pub struct EthPooledTransaction<T = TransactionSigned> {
1103 pub transaction: Recovered<T>,
1105
1106 pub cost: U256,
1111
1112 pub encoded_length: usize,
1115
1116 pub blob_sidecar: EthBlobTransactionSidecar,
1118}
1119
1120impl<T: SignedTransaction> EthPooledTransaction<T> {
1121 pub fn new(transaction: Recovered<T>, encoded_length: usize) -> Self {
1126 let mut blob_sidecar = EthBlobTransactionSidecar::None;
1127
1128 let gas_cost = U256::from(transaction.max_fee_per_gas())
1129 .saturating_mul(U256::from(transaction.gas_limit()));
1130
1131 let mut cost = gas_cost.saturating_add(transaction.value());
1132
1133 if let (Some(blob_gas_used), Some(max_fee_per_blob_gas)) =
1134 (transaction.blob_gas_used(), transaction.max_fee_per_blob_gas())
1135 {
1136 cost = cost.saturating_add(U256::from(
1138 max_fee_per_blob_gas.saturating_mul(blob_gas_used as u128),
1139 ));
1140
1141 blob_sidecar = EthBlobTransactionSidecar::Missing;
1144 }
1145
1146 Self { transaction, cost, encoded_length, blob_sidecar }
1147 }
1148
1149 pub const fn transaction(&self) -> &Recovered<T> {
1151 &self.transaction
1152 }
1153}
1154
1155impl PoolTransaction for EthPooledTransaction {
1156 type TryFromConsensusError = TransactionConversionError;
1157
1158 type Consensus = TransactionSigned;
1159
1160 type Pooled = PooledTransaction;
1161
1162 fn clone_into_consensus(&self) -> Recovered<Self::Consensus> {
1163 self.transaction().clone()
1164 }
1165
1166 fn into_consensus(self) -> Recovered<Self::Consensus> {
1167 self.transaction
1168 }
1169
1170 fn from_pooled(tx: Recovered<Self::Pooled>) -> Self {
1171 let encoded_length = tx.encode_2718_len();
1172 let (tx, signer) = tx.into_parts();
1173 match tx {
1174 PooledTransaction::Eip4844(tx) => {
1175 let (tx, sig, hash) = tx.into_parts();
1177 let (tx, blob) = tx.into_parts();
1178 let tx = Signed::new_unchecked(tx, sig, hash);
1179 let tx = TransactionSigned::from(tx);
1180 let tx = Recovered::new_unchecked(tx, signer);
1181 let mut pooled = Self::new(tx, encoded_length);
1182 pooled.blob_sidecar = EthBlobTransactionSidecar::Present(blob);
1183 pooled
1184 }
1185 tx => {
1186 let tx = Recovered::new_unchecked(tx.into(), signer);
1188 Self::new(tx, encoded_length)
1189 }
1190 }
1191 }
1192
1193 fn hash(&self) -> &TxHash {
1195 self.transaction.tx_hash()
1196 }
1197
1198 fn sender(&self) -> Address {
1200 self.transaction.signer()
1201 }
1202
1203 fn sender_ref(&self) -> &Address {
1205 self.transaction.signer_ref()
1206 }
1207
1208 fn cost(&self) -> &U256 {
1215 &self.cost
1216 }
1217
1218 fn encoded_length(&self) -> usize {
1220 self.encoded_length
1221 }
1222}
1223
1224impl<T: Typed2718> Typed2718 for EthPooledTransaction<T> {
1225 fn ty(&self) -> u8 {
1226 self.transaction.ty()
1227 }
1228}
1229
1230impl<T: InMemorySize> InMemorySize for EthPooledTransaction<T> {
1231 fn size(&self) -> usize {
1232 self.transaction.size()
1233 }
1234}
1235
1236impl<T: alloy_consensus::Transaction> alloy_consensus::Transaction for EthPooledTransaction<T> {
1237 fn chain_id(&self) -> Option<alloy_primitives::ChainId> {
1238 self.transaction.chain_id()
1239 }
1240
1241 fn nonce(&self) -> u64 {
1242 self.transaction.nonce()
1243 }
1244
1245 fn gas_limit(&self) -> u64 {
1246 self.transaction.gas_limit()
1247 }
1248
1249 fn gas_price(&self) -> Option<u128> {
1250 self.transaction.gas_price()
1251 }
1252
1253 fn max_fee_per_gas(&self) -> u128 {
1254 self.transaction.max_fee_per_gas()
1255 }
1256
1257 fn max_priority_fee_per_gas(&self) -> Option<u128> {
1258 self.transaction.max_priority_fee_per_gas()
1259 }
1260
1261 fn max_fee_per_blob_gas(&self) -> Option<u128> {
1262 self.transaction.max_fee_per_blob_gas()
1263 }
1264
1265 fn priority_fee_or_price(&self) -> u128 {
1266 self.transaction.priority_fee_or_price()
1267 }
1268
1269 fn effective_gas_price(&self, base_fee: Option<u64>) -> u128 {
1270 self.transaction.effective_gas_price(base_fee)
1271 }
1272
1273 fn is_dynamic_fee(&self) -> bool {
1274 self.transaction.is_dynamic_fee()
1275 }
1276
1277 fn kind(&self) -> TxKind {
1278 self.transaction.kind()
1279 }
1280
1281 fn is_create(&self) -> bool {
1282 self.transaction.is_create()
1283 }
1284
1285 fn value(&self) -> U256 {
1286 self.transaction.value()
1287 }
1288
1289 fn input(&self) -> &Bytes {
1290 self.transaction.input()
1291 }
1292
1293 fn access_list(&self) -> Option<&AccessList> {
1294 self.transaction.access_list()
1295 }
1296
1297 fn blob_versioned_hashes(&self) -> Option<&[B256]> {
1298 self.transaction.blob_versioned_hashes()
1299 }
1300
1301 fn authorization_list(&self) -> Option<&[SignedAuthorization]> {
1302 self.transaction.authorization_list()
1303 }
1304}
1305
1306impl EthPoolTransaction for EthPooledTransaction {
1307 fn take_blob(&mut self) -> EthBlobTransactionSidecar {
1308 if self.is_eip4844() {
1309 std::mem::replace(&mut self.blob_sidecar, EthBlobTransactionSidecar::Missing)
1310 } else {
1311 EthBlobTransactionSidecar::None
1312 }
1313 }
1314
1315 fn try_into_pooled_eip4844(
1316 self,
1317 sidecar: Arc<BlobTransactionSidecar>,
1318 ) -> Option<Recovered<Self::Pooled>> {
1319 let (signed_transaction, signer) = self.into_consensus().into_parts();
1320 let pooled_transaction =
1321 signed_transaction.try_into_pooled_eip4844(Arc::unwrap_or_clone(sidecar)).ok()?;
1322
1323 Some(Recovered::new_unchecked(pooled_transaction, signer))
1324 }
1325
1326 fn try_from_eip4844(
1327 tx: Recovered<Self::Consensus>,
1328 sidecar: BlobTransactionSidecar,
1329 ) -> Option<Self> {
1330 let (tx, signer) = tx.into_parts();
1331 tx.try_into_pooled_eip4844(sidecar)
1332 .ok()
1333 .map(|tx| tx.with_signer(signer))
1334 .map(Self::from_pooled)
1335 }
1336
1337 fn validate_blob(
1338 &self,
1339 sidecar: &BlobTransactionSidecar,
1340 settings: &KzgSettings,
1341 ) -> Result<(), BlobTransactionValidationError> {
1342 match self.transaction.transaction() {
1343 Transaction::Eip4844(tx) => tx.validate_blob(sidecar, settings),
1344 _ => Err(BlobTransactionValidationError::NotBlobTransaction(self.ty())),
1345 }
1346 }
1347}
1348
1349#[derive(Debug, Clone, PartialEq, Eq)]
1351pub enum EthBlobTransactionSidecar {
1352 None,
1354 Missing,
1359 Present(BlobTransactionSidecar),
1361}
1362
1363impl EthBlobTransactionSidecar {
1364 pub const fn maybe_sidecar(&self) -> Option<&BlobTransactionSidecar> {
1366 match self {
1367 Self::Present(sidecar) => Some(sidecar),
1368 _ => None,
1369 }
1370 }
1371}
1372
1373#[derive(Debug, Clone, Copy, Default)]
1375pub struct PoolSize {
1376 pub pending: usize,
1378 pub pending_size: usize,
1380 pub blob: usize,
1382 pub blob_size: usize,
1384 pub basefee: usize,
1386 pub basefee_size: usize,
1388 pub queued: usize,
1390 pub queued_size: usize,
1392 pub total: usize,
1396}
1397
1398impl PoolSize {
1401 #[cfg(test)]
1403 pub(crate) fn assert_invariants(&self) {
1404 assert_eq!(self.total, self.pending + self.basefee + self.queued + self.blob);
1405 }
1406}
1407
1408#[derive(Default, Debug, Clone, Copy, Eq, PartialEq)]
1410pub struct BlockInfo {
1411 pub last_seen_block_hash: B256,
1413 pub last_seen_block_number: u64,
1415 pub block_gas_limit: u64,
1417 pub pending_basefee: u64,
1422 pub pending_blob_fee: Option<u128>,
1427}
1428
1429#[derive(Debug, Clone, Copy, Eq, PartialEq)]
1431pub enum GetPooledTransactionLimit {
1432 None,
1434 ResponseSizeSoftLimit(usize),
1436}
1437
1438impl GetPooledTransactionLimit {
1439 #[inline]
1441 pub const fn exceeds(&self, size: usize) -> bool {
1442 match self {
1443 Self::None => false,
1444 Self::ResponseSizeSoftLimit(limit) => size > *limit,
1445 }
1446 }
1447}
1448
1449#[must_use = "streams do nothing unless polled"]
1451#[derive(Debug)]
1452pub struct NewSubpoolTransactionStream<Tx: PoolTransaction> {
1453 st: Receiver<NewTransactionEvent<Tx>>,
1454 subpool: SubPool,
1455}
1456
1457impl<Tx: PoolTransaction> NewSubpoolTransactionStream<Tx> {
1460 pub const fn new(st: Receiver<NewTransactionEvent<Tx>>, subpool: SubPool) -> Self {
1462 Self { st, subpool }
1463 }
1464
1465 pub fn try_recv(
1467 &mut self,
1468 ) -> Result<NewTransactionEvent<Tx>, tokio::sync::mpsc::error::TryRecvError> {
1469 loop {
1470 match self.st.try_recv() {
1471 Ok(event) => {
1472 if event.subpool == self.subpool {
1473 return Ok(event)
1474 }
1475 }
1476 Err(e) => return Err(e),
1477 }
1478 }
1479 }
1480}
1481
1482impl<Tx: PoolTransaction> Stream for NewSubpoolTransactionStream<Tx> {
1483 type Item = NewTransactionEvent<Tx>;
1484
1485 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1486 loop {
1487 match ready!(self.st.poll_recv(cx)) {
1488 Some(event) => {
1489 if event.subpool == self.subpool {
1490 return Poll::Ready(Some(event))
1491 }
1492 }
1493 None => return Poll::Ready(None),
1494 }
1495 }
1496 }
1497}
1498
1499#[cfg(test)]
1500mod tests {
1501 use super::*;
1502 use alloy_consensus::{TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy};
1503 use alloy_eips::eip4844::DATA_GAS_PER_BLOB;
1504 use alloy_primitives::PrimitiveSignature as Signature;
1505 use reth_ethereum_primitives::{Transaction, TransactionSigned};
1506
1507 #[test]
1508 fn test_pool_size_invariants() {
1509 let pool_size = PoolSize {
1510 pending: 10,
1511 pending_size: 1000,
1512 blob: 5,
1513 blob_size: 500,
1514 basefee: 8,
1515 basefee_size: 800,
1516 queued: 7,
1517 queued_size: 700,
1518 total: 10 + 5 + 8 + 7, };
1520
1521 pool_size.assert_invariants();
1523 }
1524
1525 #[test]
1526 #[should_panic]
1527 fn test_pool_size_invariants_fail() {
1528 let pool_size = PoolSize {
1529 pending: 10,
1530 pending_size: 1000,
1531 blob: 5,
1532 blob_size: 500,
1533 basefee: 8,
1534 basefee_size: 800,
1535 queued: 7,
1536 queued_size: 700,
1537 total: 10 + 5 + 8, };
1539
1540 pool_size.assert_invariants();
1542 }
1543
1544 #[test]
1545 fn test_eth_pooled_transaction_new_legacy() {
1546 let tx = Transaction::Legacy(TxLegacy {
1548 gas_price: 10,
1549 gas_limit: 1000,
1550 value: U256::from(100),
1551 ..Default::default()
1552 });
1553 let signature = Signature::test_signature();
1554 let signed_tx = TransactionSigned::new_unhashed(tx, signature);
1555 let transaction = Recovered::new_unchecked(signed_tx, Default::default());
1556 let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1557
1558 assert_eq!(pooled_tx.transaction, transaction);
1560 assert_eq!(pooled_tx.encoded_length, 200);
1561 assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1562 assert_eq!(pooled_tx.cost, U256::from(100) + U256::from(10 * 1000));
1563 }
1564
1565 #[test]
1566 fn test_eth_pooled_transaction_new_eip2930() {
1567 let tx = Transaction::Eip2930(TxEip2930 {
1569 gas_price: 10,
1570 gas_limit: 1000,
1571 value: U256::from(100),
1572 ..Default::default()
1573 });
1574 let signature = Signature::test_signature();
1575 let signed_tx = TransactionSigned::new_unhashed(tx, signature);
1576 let transaction = Recovered::new_unchecked(signed_tx, Default::default());
1577 let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1578
1579 assert_eq!(pooled_tx.transaction, transaction);
1581 assert_eq!(pooled_tx.encoded_length, 200);
1582 assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1583 assert_eq!(pooled_tx.cost, U256::from(100) + U256::from(10 * 1000));
1584 }
1585
1586 #[test]
1587 fn test_eth_pooled_transaction_new_eip1559() {
1588 let tx = Transaction::Eip1559(TxEip1559 {
1590 max_fee_per_gas: 10,
1591 gas_limit: 1000,
1592 value: U256::from(100),
1593 ..Default::default()
1594 });
1595 let signature = Signature::test_signature();
1596 let signed_tx = TransactionSigned::new_unhashed(tx, signature);
1597 let transaction = Recovered::new_unchecked(signed_tx, Default::default());
1598 let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1599
1600 assert_eq!(pooled_tx.transaction, transaction);
1602 assert_eq!(pooled_tx.encoded_length, 200);
1603 assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1604 assert_eq!(pooled_tx.cost, U256::from(100) + U256::from(10 * 1000));
1605 }
1606
1607 #[test]
1608 fn test_eth_pooled_transaction_new_eip4844() {
1609 let tx = Transaction::Eip4844(TxEip4844 {
1611 max_fee_per_gas: 10,
1612 gas_limit: 1000,
1613 value: U256::from(100),
1614 max_fee_per_blob_gas: 5,
1615 blob_versioned_hashes: vec![B256::default()],
1616 ..Default::default()
1617 });
1618 let signature = Signature::test_signature();
1619 let signed_tx = TransactionSigned::new_unhashed(tx, signature);
1620 let transaction = Recovered::new_unchecked(signed_tx, Default::default());
1621 let pooled_tx = EthPooledTransaction::new(transaction.clone(), 300);
1622
1623 assert_eq!(pooled_tx.transaction, transaction);
1625 assert_eq!(pooled_tx.encoded_length, 300);
1626 assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::Missing);
1627 let expected_cost =
1628 U256::from(100) + U256::from(10 * 1000) + U256::from(5 * DATA_GAS_PER_BLOB);
1629 assert_eq!(pooled_tx.cost, expected_cost);
1630 }
1631
1632 #[test]
1633 fn test_eth_pooled_transaction_new_eip7702() {
1634 let tx = Transaction::Eip7702(TxEip7702 {
1636 max_fee_per_gas: 10,
1637 gas_limit: 1000,
1638 value: U256::from(100),
1639 ..Default::default()
1640 });
1641 let signature = Signature::test_signature();
1642 let signed_tx = TransactionSigned::new_unhashed(tx, signature);
1643 let transaction = Recovered::new_unchecked(signed_tx, Default::default());
1644 let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1645
1646 assert_eq!(pooled_tx.transaction, transaction);
1648 assert_eq!(pooled_tx.encoded_length, 200);
1649 assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1650 assert_eq!(pooled_tx.cost, U256::from(100) + U256::from(10 * 1000));
1651 }
1652
1653 #[test]
1654 fn test_pooled_transaction_limit() {
1655 let limit_none = GetPooledTransactionLimit::None;
1657 assert!(!limit_none.exceeds(1000));
1659
1660 let size_limit_2mb = GetPooledTransactionLimit::ResponseSizeSoftLimit(2 * 1024 * 1024);
1662
1663 assert!(!size_limit_2mb.exceeds(1024 * 1024));
1666
1667 assert!(!size_limit_2mb.exceeds(2 * 1024 * 1024));
1670
1671 assert!(size_limit_2mb.exceeds(3 * 1024 * 1024));
1674 }
1675}