reth_transaction_pool/
traits.rs

1use crate::{
2    blobstore::BlobStoreError,
3    error::{InvalidPoolTransactionError, PoolResult},
4    pool::{state::SubPool, BestTransactionFilter, TransactionEvents},
5    validate::ValidPoolTransaction,
6    AllTransactionsEvents,
7};
8use alloy_consensus::{transaction::PooledTransaction, BlockHeader, Signed, Typed2718};
9use alloy_eips::{
10    eip2718::Encodable2718,
11    eip2930::AccessList,
12    eip4844::{
13        env_settings::KzgSettings, BlobAndProofV1, BlobTransactionSidecar,
14        BlobTransactionValidationError,
15    },
16    eip7702::SignedAuthorization,
17};
18use alloy_primitives::{Address, Bytes, TxHash, TxKind, B256, U256};
19use futures_util::{ready, Stream};
20use reth_eth_wire_types::HandleMempoolData;
21use reth_ethereum_primitives::{Transaction, TransactionSigned};
22use reth_execution_types::ChangedAccount;
23use reth_primitives_traits::{
24    transaction::error::TransactionConversionError, Block, InMemorySize, Recovered, SealedBlock,
25    SignedTransaction,
26};
27#[cfg(feature = "serde")]
28use serde::{Deserialize, Serialize};
29use std::{
30    collections::{HashMap, HashSet},
31    fmt,
32    future::Future,
33    pin::Pin,
34    sync::Arc,
35    task::{Context, Poll},
36};
37use tokio::sync::mpsc::Receiver;
38
39/// The `PeerId` type.
40pub type PeerId = alloy_primitives::B512;
41
42/// Helper type alias to access [`PoolTransaction`] for a given [`TransactionPool`].
43pub type PoolTx<P> = <P as TransactionPool>::Transaction;
44/// Helper type alias to access [`PoolTransaction::Consensus`] for a given [`TransactionPool`].
45pub type PoolConsensusTx<P> = <<P as TransactionPool>::Transaction as PoolTransaction>::Consensus;
46
47/// Helper type alias to access [`PoolTransaction::Pooled`] for a given [`TransactionPool`].
48pub type PoolPooledTx<P> = <<P as TransactionPool>::Transaction as PoolTransaction>::Pooled;
49
50/// General purpose abstraction of a transaction-pool.
51///
52/// This is intended to be used by API-consumers such as RPC that need inject new incoming,
53/// unverified transactions. And by block production that needs to get transactions to execute in a
54/// new block.
55///
56/// Note: This requires `Clone` for convenience, since it is assumed that this will be implemented
57/// for a wrapped `Arc` type, see also [`Pool`](crate::Pool).
58#[auto_impl::auto_impl(&, Arc)]
59pub trait TransactionPool: Send + Sync + Clone {
60    /// The transaction type of the pool
61    type Transaction: EthPoolTransaction;
62
63    /// Returns stats about the pool and all sub-pools.
64    fn pool_size(&self) -> PoolSize;
65
66    /// Returns the block the pool is currently tracking.
67    ///
68    /// This tracks the block that the pool has last seen.
69    fn block_info(&self) -> BlockInfo;
70
71    /// Imports an _external_ transaction.
72    ///
73    /// This is intended to be used by the network to insert incoming transactions received over the
74    /// p2p network.
75    ///
76    /// Consumer: P2P
77    fn add_external_transaction(
78        &self,
79        transaction: Self::Transaction,
80    ) -> impl Future<Output = PoolResult<TxHash>> + Send {
81        self.add_transaction(TransactionOrigin::External, transaction)
82    }
83
84    /// Imports all _external_ transactions
85    ///
86    /// Consumer: Utility
87    fn add_external_transactions(
88        &self,
89        transactions: Vec<Self::Transaction>,
90    ) -> impl Future<Output = Vec<PoolResult<TxHash>>> + Send {
91        self.add_transactions(TransactionOrigin::External, transactions)
92    }
93
94    /// Adds an _unvalidated_ transaction into the pool and subscribe to state changes.
95    ///
96    /// This is the same as [`TransactionPool::add_transaction`] but returns an event stream for the
97    /// given transaction.
98    ///
99    /// Consumer: Custom
100    fn add_transaction_and_subscribe(
101        &self,
102        origin: TransactionOrigin,
103        transaction: Self::Transaction,
104    ) -> impl Future<Output = PoolResult<TransactionEvents>> + Send;
105
106    /// Adds an _unvalidated_ transaction into the pool.
107    ///
108    /// Consumer: RPC
109    fn add_transaction(
110        &self,
111        origin: TransactionOrigin,
112        transaction: Self::Transaction,
113    ) -> impl Future<Output = PoolResult<TxHash>> + Send;
114
115    /// Adds the given _unvalidated_ transaction into the pool.
116    ///
117    /// Returns a list of results.
118    ///
119    /// Consumer: RPC
120    fn add_transactions(
121        &self,
122        origin: TransactionOrigin,
123        transactions: Vec<Self::Transaction>,
124    ) -> impl Future<Output = Vec<PoolResult<TxHash>>> + Send;
125
126    /// Returns a new transaction change event stream for the given transaction.
127    ///
128    /// Returns `None` if the transaction is not in the pool.
129    fn transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents>;
130
131    /// Returns a new transaction change event stream for _all_ transactions in the pool.
132    fn all_transactions_event_listener(&self) -> AllTransactionsEvents<Self::Transaction>;
133
134    /// Returns a new Stream that yields transactions hashes for new __pending__ transactions
135    /// inserted into the pool that are allowed to be propagated.
136    ///
137    /// Note: This is intended for networking and will __only__ yield transactions that are allowed
138    /// to be propagated over the network, see also [TransactionListenerKind].
139    ///
140    /// Consumer: RPC/P2P
141    fn pending_transactions_listener(&self) -> Receiver<TxHash> {
142        self.pending_transactions_listener_for(TransactionListenerKind::PropagateOnly)
143    }
144
145    /// Returns a new [Receiver] that yields transactions hashes for new __pending__ transactions
146    /// inserted into the pending pool depending on the given [TransactionListenerKind] argument.
147    fn pending_transactions_listener_for(&self, kind: TransactionListenerKind) -> Receiver<TxHash>;
148
149    /// Returns a new stream that yields new valid transactions added to the pool.
150    fn new_transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>> {
151        self.new_transactions_listener_for(TransactionListenerKind::PropagateOnly)
152    }
153
154    /// Returns a new [Receiver] that yields blob "sidecars" (blobs w/ assoc. kzg
155    /// commitments/proofs) for eip-4844 transactions inserted into the pool
156    fn blob_transaction_sidecars_listener(&self) -> Receiver<NewBlobSidecar>;
157
158    /// Returns a new stream that yields new valid transactions added to the pool
159    /// depending on the given [TransactionListenerKind] argument.
160    fn new_transactions_listener_for(
161        &self,
162        kind: TransactionListenerKind,
163    ) -> Receiver<NewTransactionEvent<Self::Transaction>>;
164
165    /// Returns a new Stream that yields new transactions added to the pending sub-pool.
166    ///
167    /// This is a convenience wrapper around [Self::new_transactions_listener] that filters for
168    /// [SubPool::Pending](crate::SubPool).
169    fn new_pending_pool_transactions_listener(
170        &self,
171    ) -> NewSubpoolTransactionStream<Self::Transaction> {
172        NewSubpoolTransactionStream::new(
173            self.new_transactions_listener_for(TransactionListenerKind::PropagateOnly),
174            SubPool::Pending,
175        )
176    }
177
178    /// Returns a new Stream that yields new transactions added to the basefee sub-pool.
179    ///
180    /// This is a convenience wrapper around [Self::new_transactions_listener] that filters for
181    /// [SubPool::BaseFee](crate::SubPool).
182    fn new_basefee_pool_transactions_listener(
183        &self,
184    ) -> NewSubpoolTransactionStream<Self::Transaction> {
185        NewSubpoolTransactionStream::new(self.new_transactions_listener(), SubPool::BaseFee)
186    }
187
188    /// Returns a new Stream that yields new transactions added to the queued-pool.
189    ///
190    /// This is a convenience wrapper around [Self::new_transactions_listener] that filters for
191    /// [SubPool::Queued](crate::SubPool).
192    fn new_queued_transactions_listener(&self) -> NewSubpoolTransactionStream<Self::Transaction> {
193        NewSubpoolTransactionStream::new(self.new_transactions_listener(), SubPool::Queued)
194    }
195
196    /// Returns the _hashes_ of all transactions in the pool.
197    ///
198    /// Note: This returns a `Vec` but should guarantee that all hashes are unique.
199    ///
200    /// Consumer: P2P
201    fn pooled_transaction_hashes(&self) -> Vec<TxHash>;
202
203    /// Returns only the first `max` hashes of transactions in the pool.
204    ///
205    /// Consumer: P2P
206    fn pooled_transaction_hashes_max(&self, max: usize) -> Vec<TxHash>;
207
208    /// Returns the _full_ transaction objects all transactions in the pool.
209    ///
210    /// This is intended to be used by the network for the initial exchange of pooled transaction
211    /// _hashes_
212    ///
213    /// Note: This returns a `Vec` but should guarantee that all transactions are unique.
214    ///
215    /// Caution: In case of blob transactions, this does not include the sidecar.
216    ///
217    /// Consumer: P2P
218    fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
219
220    /// Returns only the first `max` transactions in the pool.
221    ///
222    /// Consumer: P2P
223    fn pooled_transactions_max(
224        &self,
225        max: usize,
226    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
227
228    /// Returns converted [PooledTransaction] for the given transaction hashes.
229    ///
230    /// This adheres to the expected behavior of
231    /// [`GetPooledTransactions`](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getpooledtransactions-0x09):
232    ///
233    /// The transactions must be in same order as in the request, but it is OK to skip transactions
234    /// which are not available.
235    ///
236    /// If the transaction is a blob transaction, the sidecar will be included.
237    ///
238    /// Consumer: P2P
239    fn get_pooled_transaction_elements(
240        &self,
241        tx_hashes: Vec<TxHash>,
242        limit: GetPooledTransactionLimit,
243    ) -> Vec<<Self::Transaction as PoolTransaction>::Pooled>;
244
245    /// Returns the pooled transaction variant for the given transaction hash.
246    ///
247    /// This adheres to the expected behavior of
248    /// [`GetPooledTransactions`](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getpooledtransactions-0x09):
249    ///
250    /// If the transaction is a blob transaction, the sidecar will be included.
251    ///
252    /// It is expected that this variant represents the valid p2p format for full transactions.
253    /// E.g. for EIP-4844 transactions this is the consensus transaction format with the blob
254    /// sidecar.
255    ///
256    /// Consumer: P2P
257    fn get_pooled_transaction_element(
258        &self,
259        tx_hash: TxHash,
260    ) -> Option<Recovered<<Self::Transaction as PoolTransaction>::Pooled>>;
261
262    /// Returns an iterator that yields transactions that are ready for block production.
263    ///
264    /// Consumer: Block production
265    fn best_transactions(
266        &self,
267    ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>>;
268
269    /// Returns an iterator that yields transactions that are ready for block production with the
270    /// given base fee and optional blob fee attributes.
271    ///
272    /// Consumer: Block production
273    fn best_transactions_with_attributes(
274        &self,
275        best_transactions_attributes: BestTransactionsAttributes,
276    ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>>;
277
278    /// Returns all transactions that can be included in the next block.
279    ///
280    /// This is primarily used for the `txpool_` RPC namespace:
281    /// <https://geth.ethereum.org/docs/interacting-with-geth/rpc/ns-txpool> which distinguishes
282    /// between `pending` and `queued` transactions, where `pending` are transactions ready for
283    /// inclusion in the next block and `queued` are transactions that are ready for inclusion in
284    /// future blocks.
285    ///
286    /// Consumer: RPC
287    fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
288
289    /// Returns first `max` transactions that can be included in the next block.
290    /// See <https://github.com/paradigmxyz/reth/issues/12767#issuecomment-2493223579>
291    ///
292    /// Consumer: Block production
293    fn pending_transactions_max(
294        &self,
295        max: usize,
296    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
297
298    /// Returns all transactions that can be included in _future_ blocks.
299    ///
300    /// This and [Self::pending_transactions] are mutually exclusive.
301    ///
302    /// Consumer: RPC
303    fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
304
305    /// Returns all transactions that are currently in the pool grouped by whether they are ready
306    /// for inclusion in the next block or not.
307    ///
308    /// This is primarily used for the `txpool_` namespace: <https://geth.ethereum.org/docs/interacting-with-geth/rpc/ns-txpool>
309    ///
310    /// Consumer: RPC
311    fn all_transactions(&self) -> AllPoolTransactions<Self::Transaction>;
312
313    /// Removes all transactions corresponding to the given hashes.
314    ///
315    /// Consumer: Utility
316    fn remove_transactions(
317        &self,
318        hashes: Vec<TxHash>,
319    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
320
321    /// Removes all transactions corresponding to the given hashes.
322    ///
323    /// Also removes all _dependent_ transactions.
324    ///
325    /// Consumer: Utility
326    fn remove_transactions_and_descendants(
327        &self,
328        hashes: Vec<TxHash>,
329    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
330
331    /// Removes all transactions from the given sender
332    ///
333    /// Consumer: Utility
334    fn remove_transactions_by_sender(
335        &self,
336        sender: Address,
337    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
338
339    /// Retains only those hashes that are unknown to the pool.
340    /// In other words, removes all transactions from the given set that are currently present in
341    /// the pool. Returns hashes already known to the pool.
342    ///
343    /// Consumer: P2P
344    fn retain_unknown<A>(&self, announcement: &mut A)
345    where
346        A: HandleMempoolData;
347
348    /// Returns if the transaction for the given hash is already included in this pool.
349    fn contains(&self, tx_hash: &TxHash) -> bool {
350        self.get(tx_hash).is_some()
351    }
352
353    /// Returns the transaction for the given hash.
354    fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
355
356    /// Returns all transactions objects for the given hashes.
357    ///
358    /// Caution: This in case of blob transactions, this does not include the sidecar.
359    fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
360
361    /// Notify the pool about transactions that are propagated to peers.
362    ///
363    /// Consumer: P2P
364    fn on_propagated(&self, txs: PropagatedTransactions);
365
366    /// Returns all transactions sent by a given user
367    fn get_transactions_by_sender(
368        &self,
369        sender: Address,
370    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
371
372    /// Returns all pending transactions filtered by predicate
373    fn get_pending_transactions_with_predicate(
374        &self,
375        predicate: impl FnMut(&ValidPoolTransaction<Self::Transaction>) -> bool,
376    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
377
378    /// Returns all pending transactions sent by a given user
379    fn get_pending_transactions_by_sender(
380        &self,
381        sender: Address,
382    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
383
384    /// Returns all queued transactions sent by a given user
385    fn get_queued_transactions_by_sender(
386        &self,
387        sender: Address,
388    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
389
390    /// Returns the highest transaction sent by a given user
391    fn get_highest_transaction_by_sender(
392        &self,
393        sender: Address,
394    ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
395
396    /// Returns the transaction with the highest nonce that is executable given the on chain nonce.
397    /// In other words the highest non nonce gapped transaction.
398    ///
399    /// Note: The next pending pooled transaction must have the on chain nonce.
400    ///
401    /// For example, for a given on chain nonce of `5`, the next transaction must have that nonce.
402    /// If the pool contains txs `[5,6,7]` this returns tx `7`.
403    /// If the pool contains txs `[6,7]` this returns `None` because the next valid nonce (5) is
404    /// missing, which means txs `[6,7]` are nonce gapped.
405    fn get_highest_consecutive_transaction_by_sender(
406        &self,
407        sender: Address,
408        on_chain_nonce: u64,
409    ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
410
411    /// Returns a transaction sent by a given user and a nonce
412    fn get_transaction_by_sender_and_nonce(
413        &self,
414        sender: Address,
415        nonce: u64,
416    ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
417
418    /// Returns all transactions that where submitted with the given [TransactionOrigin]
419    fn get_transactions_by_origin(
420        &self,
421        origin: TransactionOrigin,
422    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
423
424    /// Returns all pending transactions filtered by [`TransactionOrigin`]
425    fn get_pending_transactions_by_origin(
426        &self,
427        origin: TransactionOrigin,
428    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
429
430    /// Returns all transactions that where submitted as [TransactionOrigin::Local]
431    fn get_local_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
432        self.get_transactions_by_origin(TransactionOrigin::Local)
433    }
434
435    /// Returns all transactions that where submitted as [TransactionOrigin::Private]
436    fn get_private_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
437        self.get_transactions_by_origin(TransactionOrigin::Private)
438    }
439
440    /// Returns all transactions that where submitted as [TransactionOrigin::External]
441    fn get_external_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
442        self.get_transactions_by_origin(TransactionOrigin::External)
443    }
444
445    /// Returns all pending transactions that where submitted as [TransactionOrigin::Local]
446    fn get_local_pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
447        self.get_pending_transactions_by_origin(TransactionOrigin::Local)
448    }
449
450    /// Returns all pending transactions that where submitted as [TransactionOrigin::Private]
451    fn get_private_pending_transactions(
452        &self,
453    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
454        self.get_pending_transactions_by_origin(TransactionOrigin::Private)
455    }
456
457    /// Returns all pending transactions that where submitted as [TransactionOrigin::External]
458    fn get_external_pending_transactions(
459        &self,
460    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
461        self.get_pending_transactions_by_origin(TransactionOrigin::External)
462    }
463
464    /// Returns a set of all senders of transactions in the pool
465    fn unique_senders(&self) -> HashSet<Address>;
466
467    /// Returns the [BlobTransactionSidecar] for the given transaction hash if it exists in the blob
468    /// store.
469    fn get_blob(
470        &self,
471        tx_hash: TxHash,
472    ) -> Result<Option<Arc<BlobTransactionSidecar>>, BlobStoreError>;
473
474    /// Returns all [BlobTransactionSidecar] for the given transaction hashes if they exists in the
475    /// blob store.
476    ///
477    /// This only returns the blobs that were found in the store.
478    /// If there's no blob it will not be returned.
479    fn get_all_blobs(
480        &self,
481        tx_hashes: Vec<TxHash>,
482    ) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecar>)>, BlobStoreError>;
483
484    /// Returns the exact [BlobTransactionSidecar] for the given transaction hashes in the order
485    /// they were requested.
486    ///
487    /// Returns an error if any of the blobs are not found in the blob store.
488    fn get_all_blobs_exact(
489        &self,
490        tx_hashes: Vec<TxHash>,
491    ) -> Result<Vec<Arc<BlobTransactionSidecar>>, BlobStoreError>;
492
493    /// Return the [`BlobTransactionSidecar`]s for a list of blob versioned hashes.
494    fn get_blobs_for_versioned_hashes(
495        &self,
496        versioned_hashes: &[B256],
497    ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError>;
498}
499
500/// Extension for [TransactionPool] trait that allows to set the current block info.
501#[auto_impl::auto_impl(&, Arc)]
502pub trait TransactionPoolExt: TransactionPool {
503    /// Sets the current block info for the pool.
504    fn set_block_info(&self, info: BlockInfo);
505
506    /// Event listener for when the pool needs to be updated.
507    ///
508    /// Implementers need to update the pool accordingly:
509    ///
510    /// ## Fee changes
511    ///
512    /// The [`CanonicalStateUpdate`] includes the base and blob fee of the pending block, which
513    /// affects the dynamic fee requirement of pending transactions in the pool.
514    ///
515    /// ## EIP-4844 Blob transactions
516    ///
517    /// Mined blob transactions need to be removed from the pool, but from the pool only. The blob
518    /// sidecar must not be removed from the blob store. Only after a blob transaction is
519    /// finalized, its sidecar is removed from the blob store. This ensures that in case of a reorg,
520    /// the sidecar is still available.
521    fn on_canonical_state_change<B>(&self, update: CanonicalStateUpdate<'_, B>)
522    where
523        B: Block;
524
525    /// Updates the accounts in the pool
526    fn update_accounts(&self, accounts: Vec<ChangedAccount>);
527
528    /// Deletes the blob sidecar for the given transaction from the blob store
529    fn delete_blob(&self, tx: B256);
530
531    /// Deletes multiple blob sidecars from the blob store
532    fn delete_blobs(&self, txs: Vec<B256>);
533
534    /// Maintenance function to cleanup blobs that are no longer needed.
535    fn cleanup_blobs(&self);
536}
537
538/// Determines what kind of new transactions should be emitted by a stream of transactions.
539///
540/// This gives control whether to include transactions that are allowed to be propagated.
541#[derive(Debug, Copy, Clone, PartialEq, Eq)]
542pub enum TransactionListenerKind {
543    /// Any new pending transactions
544    All,
545    /// Only transactions that are allowed to be propagated.
546    ///
547    /// See also [`ValidPoolTransaction`]
548    PropagateOnly,
549}
550
551impl TransactionListenerKind {
552    /// Returns true if we're only interested in transactions that are allowed to be propagated.
553    #[inline]
554    pub const fn is_propagate_only(&self) -> bool {
555        matches!(self, Self::PropagateOnly)
556    }
557}
558
559/// A Helper type that bundles all transactions in the pool.
560#[derive(Debug, Clone)]
561pub struct AllPoolTransactions<T: PoolTransaction> {
562    /// Transactions that are ready for inclusion in the next block.
563    pub pending: Vec<Arc<ValidPoolTransaction<T>>>,
564    /// Transactions that are ready for inclusion in _future_ blocks, but are currently parked,
565    /// because they depend on other transactions that are not yet included in the pool (nonce gap)
566    /// or otherwise blocked.
567    pub queued: Vec<Arc<ValidPoolTransaction<T>>>,
568}
569
570// === impl AllPoolTransactions ===
571
572impl<T: PoolTransaction> AllPoolTransactions<T> {
573    /// Returns an iterator over all pending [`Recovered`] transactions.
574    pub fn pending_recovered(&self) -> impl Iterator<Item = Recovered<T::Consensus>> + '_ {
575        self.pending.iter().map(|tx| tx.transaction.clone().into_consensus())
576    }
577
578    /// Returns an iterator over all queued [`Recovered`] transactions.
579    pub fn queued_recovered(&self) -> impl Iterator<Item = Recovered<T::Consensus>> + '_ {
580        self.queued.iter().map(|tx| tx.transaction.clone().into_consensus())
581    }
582
583    /// Returns an iterator over all transactions, both pending and queued.
584    pub fn all(&self) -> impl Iterator<Item = Recovered<T::Consensus>> + '_ {
585        self.pending
586            .iter()
587            .chain(self.queued.iter())
588            .map(|tx| tx.transaction.clone().into_consensus())
589    }
590}
591
592impl<T: PoolTransaction> Default for AllPoolTransactions<T> {
593    fn default() -> Self {
594        Self { pending: Default::default(), queued: Default::default() }
595    }
596}
597
598/// Represents a transaction that was propagated over the network.
599#[derive(Debug, Clone, Eq, PartialEq, Default)]
600pub struct PropagatedTransactions(pub HashMap<TxHash, Vec<PropagateKind>>);
601
602/// Represents how a transaction was propagated over the network.
603#[derive(Debug, Copy, Clone, Eq, PartialEq)]
604#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
605pub enum PropagateKind {
606    /// The full transaction object was sent to the peer.
607    ///
608    /// This is equivalent to the `Transaction` message
609    Full(PeerId),
610    /// Only the Hash was propagated to the peer.
611    Hash(PeerId),
612}
613
614// === impl PropagateKind ===
615
616impl PropagateKind {
617    /// Returns the peer the transaction was sent to
618    pub const fn peer(&self) -> &PeerId {
619        match self {
620            Self::Full(peer) | Self::Hash(peer) => peer,
621        }
622    }
623
624    /// Returns true if the transaction was sent as a full transaction
625    pub const fn is_full(&self) -> bool {
626        matches!(self, Self::Full(_))
627    }
628
629    /// Returns true if the transaction was sent as a hash
630    pub const fn is_hash(&self) -> bool {
631        matches!(self, Self::Hash(_))
632    }
633}
634
635impl From<PropagateKind> for PeerId {
636    fn from(value: PropagateKind) -> Self {
637        match value {
638            PropagateKind::Full(peer) | PropagateKind::Hash(peer) => peer,
639        }
640    }
641}
642
643/// Represents a new transaction
644#[derive(Debug)]
645pub struct NewTransactionEvent<T: PoolTransaction> {
646    /// The pool which the transaction was moved to.
647    pub subpool: SubPool,
648    /// Actual transaction
649    pub transaction: Arc<ValidPoolTransaction<T>>,
650}
651
652impl<T: PoolTransaction> Clone for NewTransactionEvent<T> {
653    fn clone(&self) -> Self {
654        Self { subpool: self.subpool, transaction: self.transaction.clone() }
655    }
656}
657
658/// This type represents a new blob sidecar that has been stored in the transaction pool's
659/// blobstore; it includes the `TransactionHash` of the blob transaction along with the assoc.
660/// sidecar (blobs, commitments, proofs)
661#[derive(Debug, Clone)]
662pub struct NewBlobSidecar {
663    /// hash of the EIP-4844 transaction.
664    pub tx_hash: TxHash,
665    /// the blob transaction sidecar.
666    pub sidecar: Arc<BlobTransactionSidecar>,
667}
668
669/// Where the transaction originates from.
670///
671/// Depending on where the transaction was picked up, it affects how the transaction is handled
672/// internally, e.g. limits for simultaneous transaction of one sender.
673#[derive(Debug, Copy, Clone, PartialEq, Eq, Default)]
674pub enum TransactionOrigin {
675    /// Transaction is coming from a local source.
676    #[default]
677    Local,
678    /// Transaction has been received externally.
679    ///
680    /// This is usually considered an "untrusted" source, for example received from another in the
681    /// network.
682    External,
683    /// Transaction is originated locally and is intended to remain private.
684    ///
685    /// This type of transaction should not be propagated to the network. It's meant for
686    /// private usage within the local node only.
687    Private,
688}
689
690// === impl TransactionOrigin ===
691
692impl TransactionOrigin {
693    /// Whether the transaction originates from a local source.
694    pub const fn is_local(&self) -> bool {
695        matches!(self, Self::Local)
696    }
697
698    /// Whether the transaction originates from an external source.
699    pub const fn is_external(&self) -> bool {
700        matches!(self, Self::External)
701    }
702    /// Whether the transaction originates from a private source.
703    pub const fn is_private(&self) -> bool {
704        matches!(self, Self::Private)
705    }
706}
707
708/// Represents the kind of update to the canonical state.
709#[derive(Debug, Clone, Copy, PartialEq, Eq)]
710pub enum PoolUpdateKind {
711    /// The update was due to a block commit.
712    Commit,
713    /// The update was due to a reorganization.
714    Reorg,
715}
716
717/// Represents changes after a new canonical block or range of canonical blocks was added to the
718/// chain.
719///
720/// It is expected that this is only used if the added blocks are canonical to the pool's last known
721/// block hash. In other words, the first added block of the range must be the child of the last
722/// known block hash.
723///
724/// This is used to update the pool state accordingly.
725#[derive(Clone, Debug)]
726pub struct CanonicalStateUpdate<'a, B: Block> {
727    /// Hash of the tip block.
728    pub new_tip: &'a SealedBlock<B>,
729    /// EIP-1559 Base fee of the _next_ (pending) block
730    ///
731    /// The base fee of a block depends on the utilization of the last block and its base fee.
732    pub pending_block_base_fee: u64,
733    /// EIP-4844 blob fee of the _next_ (pending) block
734    ///
735    /// Only after Cancun
736    pub pending_block_blob_fee: Option<u128>,
737    /// A set of changed accounts across a range of blocks.
738    pub changed_accounts: Vec<ChangedAccount>,
739    /// All mined transactions in the block range.
740    pub mined_transactions: Vec<B256>,
741    /// The kind of update to the canonical state.
742    pub update_kind: PoolUpdateKind,
743}
744
745impl<B> CanonicalStateUpdate<'_, B>
746where
747    B: Block,
748{
749    /// Returns the number of the tip block.
750    pub fn number(&self) -> u64 {
751        self.new_tip.number()
752    }
753
754    /// Returns the hash of the tip block.
755    pub fn hash(&self) -> B256 {
756        self.new_tip.hash()
757    }
758
759    /// Timestamp of the latest chain update
760    pub fn timestamp(&self) -> u64 {
761        self.new_tip.timestamp()
762    }
763
764    /// Returns the block info for the tip block.
765    pub fn block_info(&self) -> BlockInfo {
766        BlockInfo {
767            block_gas_limit: self.new_tip.gas_limit(),
768            last_seen_block_hash: self.hash(),
769            last_seen_block_number: self.number(),
770            pending_basefee: self.pending_block_base_fee,
771            pending_blob_fee: self.pending_block_blob_fee,
772        }
773    }
774}
775
776impl<B> fmt::Display for CanonicalStateUpdate<'_, B>
777where
778    B: Block,
779{
780    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
781        f.debug_struct("CanonicalStateUpdate")
782            .field("hash", &self.hash())
783            .field("number", &self.number())
784            .field("pending_block_base_fee", &self.pending_block_base_fee)
785            .field("pending_block_blob_fee", &self.pending_block_blob_fee)
786            .field("changed_accounts", &self.changed_accounts.len())
787            .field("mined_transactions", &self.mined_transactions.len())
788            .finish()
789    }
790}
791
792/// Alias to restrict the [`BestTransactions`] items to the pool's transaction type.
793pub type BestTransactionsFor<Pool> = Box<
794    dyn BestTransactions<Item = Arc<ValidPoolTransaction<<Pool as TransactionPool>::Transaction>>>,
795>;
796
797/// An `Iterator` that only returns transactions that are ready to be executed.
798///
799/// This makes no assumptions about the order of the transactions, but expects that _all_
800/// transactions are valid (no nonce gaps.) for the tracked state of the pool.
801///
802/// Note: this iterator will always return the best transaction that it currently knows.
803/// There is no guarantee transactions will be returned sequentially in decreasing
804/// priority order.
805pub trait BestTransactions: Iterator + Send {
806    /// Mark the transaction as invalid.
807    ///
808    /// Implementers must ensure all subsequent transaction _don't_ depend on this transaction.
809    /// In other words, this must remove the given transaction _and_ drain all transaction that
810    /// depend on it.
811    fn mark_invalid(&mut self, transaction: &Self::Item, kind: InvalidPoolTransactionError);
812
813    /// An iterator may be able to receive additional pending transactions that weren't present it
814    /// the pool when it was created.
815    ///
816    /// This ensures that iterator will return the best transaction that it currently knows and not
817    /// listen to pool updates.
818    fn no_updates(&mut self);
819
820    /// Convenience function for [`Self::no_updates`] that returns the iterator again.
821    fn without_updates(mut self) -> Self
822    where
823        Self: Sized,
824    {
825        self.no_updates();
826        self
827    }
828
829    /// Skip all blob transactions.
830    ///
831    /// There's only limited blob space available in a block, once exhausted, EIP-4844 transactions
832    /// can no longer be included.
833    ///
834    /// If called then the iterator will no longer yield blob transactions.
835    ///
836    /// Note: this will also exclude any transactions that depend on blob transactions.
837    fn skip_blobs(&mut self) {
838        self.set_skip_blobs(true);
839    }
840
841    /// Controls whether the iterator skips blob transactions or not.
842    ///
843    /// If set to true, no blob transactions will be returned.
844    fn set_skip_blobs(&mut self, skip_blobs: bool);
845
846    /// Convenience function for [`Self::skip_blobs`] that returns the iterator again.
847    fn without_blobs(mut self) -> Self
848    where
849        Self: Sized,
850    {
851        self.skip_blobs();
852        self
853    }
854
855    /// Creates an iterator which uses a closure to determine whether a transaction should be
856    /// returned by the iterator.
857    ///
858    /// All items the closure returns false for are marked as invalid via [`Self::mark_invalid`] and
859    /// descendant transactions will be skipped.
860    fn filter_transactions<P>(self, predicate: P) -> BestTransactionFilter<Self, P>
861    where
862        P: FnMut(&Self::Item) -> bool,
863        Self: Sized,
864    {
865        BestTransactionFilter::new(self, predicate)
866    }
867}
868
869impl<T> BestTransactions for Box<T>
870where
871    T: BestTransactions + ?Sized,
872{
873    fn mark_invalid(&mut self, transaction: &Self::Item, kind: InvalidPoolTransactionError) {
874        (**self).mark_invalid(transaction, kind)
875    }
876
877    fn no_updates(&mut self) {
878        (**self).no_updates();
879    }
880
881    fn skip_blobs(&mut self) {
882        (**self).skip_blobs();
883    }
884
885    fn set_skip_blobs(&mut self, skip_blobs: bool) {
886        (**self).set_skip_blobs(skip_blobs);
887    }
888}
889
890/// A no-op implementation that yields no transactions.
891impl<T> BestTransactions for std::iter::Empty<T> {
892    fn mark_invalid(&mut self, _tx: &T, _kind: InvalidPoolTransactionError) {}
893
894    fn no_updates(&mut self) {}
895
896    fn skip_blobs(&mut self) {}
897
898    fn set_skip_blobs(&mut self, _skip_blobs: bool) {}
899}
900
901/// A filter that allows to check if a transaction satisfies a set of conditions
902pub trait TransactionFilter {
903    /// The type of the transaction to check.
904    type Transaction;
905
906    /// Returns true if the transaction satisfies the conditions.
907    fn is_valid(&self, transaction: &Self::Transaction) -> bool;
908}
909
910/// A no-op implementation of [`TransactionFilter`] which
911/// marks all transactions as valid.
912#[derive(Debug, Clone)]
913pub struct NoopTransactionFilter<T>(std::marker::PhantomData<T>);
914
915// We can't derive Default because this forces T to be
916// Default as well, which isn't necessary.
917impl<T> Default for NoopTransactionFilter<T> {
918    fn default() -> Self {
919        Self(std::marker::PhantomData)
920    }
921}
922
923impl<T> TransactionFilter for NoopTransactionFilter<T> {
924    type Transaction = T;
925
926    fn is_valid(&self, _transaction: &Self::Transaction) -> bool {
927        true
928    }
929}
930
931/// A Helper type that bundles the best transactions attributes together.
932#[derive(Debug, Copy, Clone, PartialEq, Eq)]
933pub struct BestTransactionsAttributes {
934    /// The base fee attribute for best transactions.
935    pub basefee: u64,
936    /// The blob fee attribute for best transactions.
937    pub blob_fee: Option<u64>,
938}
939
940// === impl BestTransactionsAttributes ===
941
942impl BestTransactionsAttributes {
943    /// Creates a new `BestTransactionsAttributes` with the given basefee and blob fee.
944    pub const fn new(basefee: u64, blob_fee: Option<u64>) -> Self {
945        Self { basefee, blob_fee }
946    }
947
948    /// Creates a new `BestTransactionsAttributes` with the given basefee.
949    pub const fn base_fee(basefee: u64) -> Self {
950        Self::new(basefee, None)
951    }
952
953    /// Sets the given blob fee.
954    pub const fn with_blob_fee(mut self, blob_fee: u64) -> Self {
955        self.blob_fee = Some(blob_fee);
956        self
957    }
958}
959
960/// Trait for transaction types used inside the pool.
961///
962/// This supports two transaction formats
963/// - Consensus format: the form the transaction takes when it is included in a block.
964/// - Pooled format: the form the transaction takes when it is gossiping around the network.
965///
966/// This distinction is necessary for the EIP-4844 blob transactions, which require an additional
967/// sidecar when they are gossiped around the network. It is expected that the `Consensus` format is
968/// a subset of the `Pooled` format.
969///
970/// The assumption is that fallible conversion from `Consensus` to `Pooled` will encapsulate
971/// handling of all valid `Consensus` transactions that can't be pooled (e.g Deposit transactions or
972/// blob-less EIP-4844 transactions).
973pub trait PoolTransaction:
974    alloy_consensus::Transaction + InMemorySize + fmt::Debug + Send + Sync + Clone
975{
976    /// Associated error type for the `try_from_consensus` method.
977    type TryFromConsensusError: fmt::Display;
978
979    /// Associated type representing the raw consensus variant of the transaction.
980    type Consensus: SignedTransaction + From<Self::Pooled>;
981
982    /// Associated type representing the recovered pooled variant of the transaction.
983    type Pooled: TryFrom<Self::Consensus, Error = Self::TryFromConsensusError> + SignedTransaction;
984
985    /// Define a method to convert from the `Consensus` type to `Self`
986    ///
987    /// Note: this _must_ fail on any transactions that cannot be pooled (e.g OP Deposit
988    /// transactions).
989    fn try_from_consensus(
990        tx: Recovered<Self::Consensus>,
991    ) -> Result<Self, Self::TryFromConsensusError> {
992        let (tx, signer) = tx.into_parts();
993        Ok(Self::from_pooled(Recovered::new_unchecked(tx.try_into()?, signer)))
994    }
995
996    /// Clone the transaction into a consensus variant.
997    ///
998    /// This method is preferred when the [`PoolTransaction`] already wraps the consensus variant.
999    fn clone_into_consensus(&self) -> Recovered<Self::Consensus> {
1000        self.clone().into_consensus()
1001    }
1002
1003    /// Define a method to convert from the `Self` type to `Consensus`
1004    fn into_consensus(self) -> Recovered<Self::Consensus>;
1005
1006    /// Define a method to convert from the `Pooled` type to `Self`
1007    fn from_pooled(pooled: Recovered<Self::Pooled>) -> Self;
1008
1009    /// Tries to convert the `Consensus` type into the `Pooled` type.
1010    fn try_into_pooled(self) -> Result<Recovered<Self::Pooled>, Self::TryFromConsensusError> {
1011        let consensus = self.into_consensus();
1012        let (tx, signer) = consensus.into_parts();
1013        Ok(Recovered::new_unchecked(tx.try_into()?, signer))
1014    }
1015
1016    /// Converts the `Pooled` type into the `Consensus` type.
1017    fn pooled_into_consensus(tx: Self::Pooled) -> Self::Consensus {
1018        tx.into()
1019    }
1020
1021    /// Hash of the transaction.
1022    fn hash(&self) -> &TxHash;
1023
1024    /// The Sender of the transaction.
1025    fn sender(&self) -> Address;
1026
1027    /// Reference to the Sender of the transaction.
1028    fn sender_ref(&self) -> &Address;
1029
1030    /// Returns the cost that this transaction is allowed to consume:
1031    ///
1032    /// For EIP-1559 transactions: `max_fee_per_gas * gas_limit + tx_value`.
1033    /// For legacy transactions: `gas_price * gas_limit + tx_value`.
1034    /// For EIP-4844 blob transactions: `max_fee_per_gas * gas_limit + tx_value +
1035    /// max_blob_fee_per_gas * blob_gas_used`.
1036    fn cost(&self) -> &U256;
1037
1038    /// Returns the length of the rlp encoded transaction object
1039    ///
1040    /// Note: Implementations should cache this value.
1041    fn encoded_length(&self) -> usize;
1042
1043    /// Ensures that the transaction's code size does not exceed the provided `max_init_code_size`.
1044    ///
1045    /// This is specifically relevant for contract creation transactions ([`TxKind::Create`]),
1046    /// where the input data contains the initialization code. If the input code size exceeds
1047    /// the configured limit, an [`InvalidPoolTransactionError::ExceedsMaxInitCodeSize`] error is
1048    /// returned.
1049    fn ensure_max_init_code_size(
1050        &self,
1051        max_init_code_size: usize,
1052    ) -> Result<(), InvalidPoolTransactionError> {
1053        let input_len = self.input().len();
1054        if self.is_create() && input_len > max_init_code_size {
1055            Err(InvalidPoolTransactionError::ExceedsMaxInitCodeSize(input_len, max_init_code_size))
1056        } else {
1057            Ok(())
1058        }
1059    }
1060}
1061
1062/// Super trait for transactions that can be converted to and from Eth transactions intended for the
1063/// ethereum style pool.
1064///
1065/// This extends the [`PoolTransaction`] trait with additional methods that are specific to the
1066/// Ethereum pool.
1067pub trait EthPoolTransaction: PoolTransaction {
1068    /// Extracts the blob sidecar from the transaction.
1069    fn take_blob(&mut self) -> EthBlobTransactionSidecar;
1070
1071    /// A specialization for the EIP-4844 transaction type.
1072    /// Tries to reattach the blob sidecar to the transaction.
1073    ///
1074    /// This returns an option, but callers should ensure that the transaction is an EIP-4844
1075    /// transaction: [`Typed2718::is_eip4844`].
1076    fn try_into_pooled_eip4844(
1077        self,
1078        sidecar: Arc<BlobTransactionSidecar>,
1079    ) -> Option<Recovered<Self::Pooled>>;
1080
1081    /// Tries to convert the `Consensus` type with a blob sidecar into the `Pooled` type.
1082    ///
1083    /// Returns `None` if passed transaction is not a blob transaction.
1084    fn try_from_eip4844(
1085        tx: Recovered<Self::Consensus>,
1086        sidecar: BlobTransactionSidecar,
1087    ) -> Option<Self>;
1088
1089    /// Validates the blob sidecar of the transaction with the given settings.
1090    fn validate_blob(
1091        &self,
1092        blob: &BlobTransactionSidecar,
1093        settings: &KzgSettings,
1094    ) -> Result<(), BlobTransactionValidationError>;
1095}
1096
1097/// The default [`PoolTransaction`] for the [Pool](crate::Pool) for Ethereum.
1098///
1099/// This type is essentially a wrapper around [`Recovered`] with additional
1100/// fields derived from the transaction that are frequently used by the pools for ordering.
1101#[derive(Debug, Clone, PartialEq, Eq)]
1102pub struct EthPooledTransaction<T = TransactionSigned> {
1103    /// `EcRecovered` transaction, the consensus format.
1104    pub transaction: Recovered<T>,
1105
1106    /// For EIP-1559 transactions: `max_fee_per_gas * gas_limit + tx_value`.
1107    /// For legacy transactions: `gas_price * gas_limit + tx_value`.
1108    /// For EIP-4844 blob transactions: `max_fee_per_gas * gas_limit + tx_value +
1109    /// max_blob_fee_per_gas * blob_gas_used`.
1110    pub cost: U256,
1111
1112    /// This is the RLP length of the transaction, computed when the transaction is added to the
1113    /// pool.
1114    pub encoded_length: usize,
1115
1116    /// The blob side car for this transaction
1117    pub blob_sidecar: EthBlobTransactionSidecar,
1118}
1119
1120impl<T: SignedTransaction> EthPooledTransaction<T> {
1121    /// Create new instance of [Self].
1122    ///
1123    /// Caution: In case of blob transactions, this does marks the blob sidecar as
1124    /// [`EthBlobTransactionSidecar::Missing`]
1125    pub fn new(transaction: Recovered<T>, encoded_length: usize) -> Self {
1126        let mut blob_sidecar = EthBlobTransactionSidecar::None;
1127
1128        let gas_cost = U256::from(transaction.max_fee_per_gas())
1129            .saturating_mul(U256::from(transaction.gas_limit()));
1130
1131        let mut cost = gas_cost.saturating_add(transaction.value());
1132
1133        if let (Some(blob_gas_used), Some(max_fee_per_blob_gas)) =
1134            (transaction.blob_gas_used(), transaction.max_fee_per_blob_gas())
1135        {
1136            // Add max blob cost using saturating math to avoid overflow
1137            cost = cost.saturating_add(U256::from(
1138                max_fee_per_blob_gas.saturating_mul(blob_gas_used as u128),
1139            ));
1140
1141            // because the blob sidecar is not included in this transaction variant, mark it as
1142            // missing
1143            blob_sidecar = EthBlobTransactionSidecar::Missing;
1144        }
1145
1146        Self { transaction, cost, encoded_length, blob_sidecar }
1147    }
1148
1149    /// Return the reference to the underlying transaction.
1150    pub const fn transaction(&self) -> &Recovered<T> {
1151        &self.transaction
1152    }
1153}
1154
1155impl PoolTransaction for EthPooledTransaction {
1156    type TryFromConsensusError = TransactionConversionError;
1157
1158    type Consensus = TransactionSigned;
1159
1160    type Pooled = PooledTransaction;
1161
1162    fn clone_into_consensus(&self) -> Recovered<Self::Consensus> {
1163        self.transaction().clone()
1164    }
1165
1166    fn into_consensus(self) -> Recovered<Self::Consensus> {
1167        self.transaction
1168    }
1169
1170    fn from_pooled(tx: Recovered<Self::Pooled>) -> Self {
1171        let encoded_length = tx.encode_2718_len();
1172        let (tx, signer) = tx.into_parts();
1173        match tx {
1174            PooledTransaction::Eip4844(tx) => {
1175                // include the blob sidecar
1176                let (tx, sig, hash) = tx.into_parts();
1177                let (tx, blob) = tx.into_parts();
1178                let tx = Signed::new_unchecked(tx, sig, hash);
1179                let tx = TransactionSigned::from(tx);
1180                let tx = Recovered::new_unchecked(tx, signer);
1181                let mut pooled = Self::new(tx, encoded_length);
1182                pooled.blob_sidecar = EthBlobTransactionSidecar::Present(blob);
1183                pooled
1184            }
1185            tx => {
1186                // no blob sidecar
1187                let tx = Recovered::new_unchecked(tx.into(), signer);
1188                Self::new(tx, encoded_length)
1189            }
1190        }
1191    }
1192
1193    /// Returns hash of the transaction.
1194    fn hash(&self) -> &TxHash {
1195        self.transaction.tx_hash()
1196    }
1197
1198    /// Returns the Sender of the transaction.
1199    fn sender(&self) -> Address {
1200        self.transaction.signer()
1201    }
1202
1203    /// Returns a reference to the Sender of the transaction.
1204    fn sender_ref(&self) -> &Address {
1205        self.transaction.signer_ref()
1206    }
1207
1208    /// Returns the cost that this transaction is allowed to consume:
1209    ///
1210    /// For EIP-1559 transactions: `max_fee_per_gas * gas_limit + tx_value`.
1211    /// For legacy transactions: `gas_price * gas_limit + tx_value`.
1212    /// For EIP-4844 blob transactions: `max_fee_per_gas * gas_limit + tx_value +
1213    /// max_blob_fee_per_gas * blob_gas_used`.
1214    fn cost(&self) -> &U256 {
1215        &self.cost
1216    }
1217
1218    /// Returns the length of the rlp encoded object
1219    fn encoded_length(&self) -> usize {
1220        self.encoded_length
1221    }
1222}
1223
1224impl<T: Typed2718> Typed2718 for EthPooledTransaction<T> {
1225    fn ty(&self) -> u8 {
1226        self.transaction.ty()
1227    }
1228}
1229
1230impl<T: InMemorySize> InMemorySize for EthPooledTransaction<T> {
1231    fn size(&self) -> usize {
1232        self.transaction.size()
1233    }
1234}
1235
1236impl<T: alloy_consensus::Transaction> alloy_consensus::Transaction for EthPooledTransaction<T> {
1237    fn chain_id(&self) -> Option<alloy_primitives::ChainId> {
1238        self.transaction.chain_id()
1239    }
1240
1241    fn nonce(&self) -> u64 {
1242        self.transaction.nonce()
1243    }
1244
1245    fn gas_limit(&self) -> u64 {
1246        self.transaction.gas_limit()
1247    }
1248
1249    fn gas_price(&self) -> Option<u128> {
1250        self.transaction.gas_price()
1251    }
1252
1253    fn max_fee_per_gas(&self) -> u128 {
1254        self.transaction.max_fee_per_gas()
1255    }
1256
1257    fn max_priority_fee_per_gas(&self) -> Option<u128> {
1258        self.transaction.max_priority_fee_per_gas()
1259    }
1260
1261    fn max_fee_per_blob_gas(&self) -> Option<u128> {
1262        self.transaction.max_fee_per_blob_gas()
1263    }
1264
1265    fn priority_fee_or_price(&self) -> u128 {
1266        self.transaction.priority_fee_or_price()
1267    }
1268
1269    fn effective_gas_price(&self, base_fee: Option<u64>) -> u128 {
1270        self.transaction.effective_gas_price(base_fee)
1271    }
1272
1273    fn is_dynamic_fee(&self) -> bool {
1274        self.transaction.is_dynamic_fee()
1275    }
1276
1277    fn kind(&self) -> TxKind {
1278        self.transaction.kind()
1279    }
1280
1281    fn is_create(&self) -> bool {
1282        self.transaction.is_create()
1283    }
1284
1285    fn value(&self) -> U256 {
1286        self.transaction.value()
1287    }
1288
1289    fn input(&self) -> &Bytes {
1290        self.transaction.input()
1291    }
1292
1293    fn access_list(&self) -> Option<&AccessList> {
1294        self.transaction.access_list()
1295    }
1296
1297    fn blob_versioned_hashes(&self) -> Option<&[B256]> {
1298        self.transaction.blob_versioned_hashes()
1299    }
1300
1301    fn authorization_list(&self) -> Option<&[SignedAuthorization]> {
1302        self.transaction.authorization_list()
1303    }
1304}
1305
1306impl EthPoolTransaction for EthPooledTransaction {
1307    fn take_blob(&mut self) -> EthBlobTransactionSidecar {
1308        if self.is_eip4844() {
1309            std::mem::replace(&mut self.blob_sidecar, EthBlobTransactionSidecar::Missing)
1310        } else {
1311            EthBlobTransactionSidecar::None
1312        }
1313    }
1314
1315    fn try_into_pooled_eip4844(
1316        self,
1317        sidecar: Arc<BlobTransactionSidecar>,
1318    ) -> Option<Recovered<Self::Pooled>> {
1319        let (signed_transaction, signer) = self.into_consensus().into_parts();
1320        let pooled_transaction =
1321            signed_transaction.try_into_pooled_eip4844(Arc::unwrap_or_clone(sidecar)).ok()?;
1322
1323        Some(Recovered::new_unchecked(pooled_transaction, signer))
1324    }
1325
1326    fn try_from_eip4844(
1327        tx: Recovered<Self::Consensus>,
1328        sidecar: BlobTransactionSidecar,
1329    ) -> Option<Self> {
1330        let (tx, signer) = tx.into_parts();
1331        tx.try_into_pooled_eip4844(sidecar)
1332            .ok()
1333            .map(|tx| tx.with_signer(signer))
1334            .map(Self::from_pooled)
1335    }
1336
1337    fn validate_blob(
1338        &self,
1339        sidecar: &BlobTransactionSidecar,
1340        settings: &KzgSettings,
1341    ) -> Result<(), BlobTransactionValidationError> {
1342        match self.transaction.transaction() {
1343            Transaction::Eip4844(tx) => tx.validate_blob(sidecar, settings),
1344            _ => Err(BlobTransactionValidationError::NotBlobTransaction(self.ty())),
1345        }
1346    }
1347}
1348
1349/// Represents the blob sidecar of the [`EthPooledTransaction`].
1350#[derive(Debug, Clone, PartialEq, Eq)]
1351pub enum EthBlobTransactionSidecar {
1352    /// This transaction does not have a blob sidecar
1353    None,
1354    /// This transaction has a blob sidecar (EIP-4844) but it is missing
1355    ///
1356    /// It was either extracted after being inserted into the pool or re-injected after reorg
1357    /// without the blob sidecar
1358    Missing,
1359    /// The eip-4844 transaction was pulled from the network and still has its blob sidecar
1360    Present(BlobTransactionSidecar),
1361}
1362
1363impl EthBlobTransactionSidecar {
1364    /// Returns the blob sidecar if it is present
1365    pub const fn maybe_sidecar(&self) -> Option<&BlobTransactionSidecar> {
1366        match self {
1367            Self::Present(sidecar) => Some(sidecar),
1368            _ => None,
1369        }
1370    }
1371}
1372
1373/// Represents the current status of the pool.
1374#[derive(Debug, Clone, Copy, Default)]
1375pub struct PoolSize {
1376    /// Number of transactions in the _pending_ sub-pool.
1377    pub pending: usize,
1378    /// Reported size of transactions in the _pending_ sub-pool.
1379    pub pending_size: usize,
1380    /// Number of transactions in the _blob_ pool.
1381    pub blob: usize,
1382    /// Reported size of transactions in the _blob_ pool.
1383    pub blob_size: usize,
1384    /// Number of transactions in the _basefee_ pool.
1385    pub basefee: usize,
1386    /// Reported size of transactions in the _basefee_ sub-pool.
1387    pub basefee_size: usize,
1388    /// Number of transactions in the _queued_ sub-pool.
1389    pub queued: usize,
1390    /// Reported size of transactions in the _queued_ sub-pool.
1391    pub queued_size: usize,
1392    /// Number of all transactions of all sub-pools
1393    ///
1394    /// Note: this is the sum of ```pending + basefee + queued```
1395    pub total: usize,
1396}
1397
1398// === impl PoolSize ===
1399
1400impl PoolSize {
1401    /// Asserts that the invariants of the pool size are met.
1402    #[cfg(test)]
1403    pub(crate) fn assert_invariants(&self) {
1404        assert_eq!(self.total, self.pending + self.basefee + self.queued + self.blob);
1405    }
1406}
1407
1408/// Represents the current status of the pool.
1409#[derive(Default, Debug, Clone, Copy, Eq, PartialEq)]
1410pub struct BlockInfo {
1411    /// Hash for the currently tracked block.
1412    pub last_seen_block_hash: B256,
1413    /// Currently tracked block.
1414    pub last_seen_block_number: u64,
1415    /// Current block gas limit for the latest block.
1416    pub block_gas_limit: u64,
1417    /// Currently enforced base fee: the threshold for the basefee sub-pool.
1418    ///
1419    /// Note: this is the derived base fee of the _next_ block that builds on the block the pool is
1420    /// currently tracking.
1421    pub pending_basefee: u64,
1422    /// Currently enforced blob fee: the threshold for eip-4844 blob transactions.
1423    ///
1424    /// Note: this is the derived blob fee of the _next_ block that builds on the block the pool is
1425    /// currently tracking
1426    pub pending_blob_fee: Option<u128>,
1427}
1428
1429/// The limit to enforce for [`TransactionPool::get_pooled_transaction_elements`].
1430#[derive(Debug, Clone, Copy, Eq, PartialEq)]
1431pub enum GetPooledTransactionLimit {
1432    /// No limit, return all transactions.
1433    None,
1434    /// Enforce a size limit on the returned transactions, for example 2MB
1435    ResponseSizeSoftLimit(usize),
1436}
1437
1438impl GetPooledTransactionLimit {
1439    /// Returns true if the given size exceeds the limit.
1440    #[inline]
1441    pub const fn exceeds(&self, size: usize) -> bool {
1442        match self {
1443            Self::None => false,
1444            Self::ResponseSizeSoftLimit(limit) => size > *limit,
1445        }
1446    }
1447}
1448
1449/// A Stream that yields full transactions the subpool
1450#[must_use = "streams do nothing unless polled"]
1451#[derive(Debug)]
1452pub struct NewSubpoolTransactionStream<Tx: PoolTransaction> {
1453    st: Receiver<NewTransactionEvent<Tx>>,
1454    subpool: SubPool,
1455}
1456
1457// === impl NewSubpoolTransactionStream ===
1458
1459impl<Tx: PoolTransaction> NewSubpoolTransactionStream<Tx> {
1460    /// Create a new stream that yields full transactions from the subpool
1461    pub const fn new(st: Receiver<NewTransactionEvent<Tx>>, subpool: SubPool) -> Self {
1462        Self { st, subpool }
1463    }
1464
1465    /// Tries to receive the next value for this stream.
1466    pub fn try_recv(
1467        &mut self,
1468    ) -> Result<NewTransactionEvent<Tx>, tokio::sync::mpsc::error::TryRecvError> {
1469        loop {
1470            match self.st.try_recv() {
1471                Ok(event) => {
1472                    if event.subpool == self.subpool {
1473                        return Ok(event)
1474                    }
1475                }
1476                Err(e) => return Err(e),
1477            }
1478        }
1479    }
1480}
1481
1482impl<Tx: PoolTransaction> Stream for NewSubpoolTransactionStream<Tx> {
1483    type Item = NewTransactionEvent<Tx>;
1484
1485    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1486        loop {
1487            match ready!(self.st.poll_recv(cx)) {
1488                Some(event) => {
1489                    if event.subpool == self.subpool {
1490                        return Poll::Ready(Some(event))
1491                    }
1492                }
1493                None => return Poll::Ready(None),
1494            }
1495        }
1496    }
1497}
1498
1499#[cfg(test)]
1500mod tests {
1501    use super::*;
1502    use alloy_consensus::{TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy};
1503    use alloy_eips::eip4844::DATA_GAS_PER_BLOB;
1504    use alloy_primitives::PrimitiveSignature as Signature;
1505    use reth_ethereum_primitives::{Transaction, TransactionSigned};
1506
1507    #[test]
1508    fn test_pool_size_invariants() {
1509        let pool_size = PoolSize {
1510            pending: 10,
1511            pending_size: 1000,
1512            blob: 5,
1513            blob_size: 500,
1514            basefee: 8,
1515            basefee_size: 800,
1516            queued: 7,
1517            queued_size: 700,
1518            total: 10 + 5 + 8 + 7, // Correct total
1519        };
1520
1521        // Call the assert_invariants method to check if the invariants are correct
1522        pool_size.assert_invariants();
1523    }
1524
1525    #[test]
1526    #[should_panic]
1527    fn test_pool_size_invariants_fail() {
1528        let pool_size = PoolSize {
1529            pending: 10,
1530            pending_size: 1000,
1531            blob: 5,
1532            blob_size: 500,
1533            basefee: 8,
1534            basefee_size: 800,
1535            queued: 7,
1536            queued_size: 700,
1537            total: 10 + 5 + 8, // Incorrect total
1538        };
1539
1540        // Call the assert_invariants method, which should panic
1541        pool_size.assert_invariants();
1542    }
1543
1544    #[test]
1545    fn test_eth_pooled_transaction_new_legacy() {
1546        // Create a legacy transaction with specific parameters
1547        let tx = Transaction::Legacy(TxLegacy {
1548            gas_price: 10,
1549            gas_limit: 1000,
1550            value: U256::from(100),
1551            ..Default::default()
1552        });
1553        let signature = Signature::test_signature();
1554        let signed_tx = TransactionSigned::new_unhashed(tx, signature);
1555        let transaction = Recovered::new_unchecked(signed_tx, Default::default());
1556        let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1557
1558        // Check that the pooled transaction is created correctly
1559        assert_eq!(pooled_tx.transaction, transaction);
1560        assert_eq!(pooled_tx.encoded_length, 200);
1561        assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1562        assert_eq!(pooled_tx.cost, U256::from(100) + U256::from(10 * 1000));
1563    }
1564
1565    #[test]
1566    fn test_eth_pooled_transaction_new_eip2930() {
1567        // Create an EIP-2930 transaction with specific parameters
1568        let tx = Transaction::Eip2930(TxEip2930 {
1569            gas_price: 10,
1570            gas_limit: 1000,
1571            value: U256::from(100),
1572            ..Default::default()
1573        });
1574        let signature = Signature::test_signature();
1575        let signed_tx = TransactionSigned::new_unhashed(tx, signature);
1576        let transaction = Recovered::new_unchecked(signed_tx, Default::default());
1577        let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1578
1579        // Check that the pooled transaction is created correctly
1580        assert_eq!(pooled_tx.transaction, transaction);
1581        assert_eq!(pooled_tx.encoded_length, 200);
1582        assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1583        assert_eq!(pooled_tx.cost, U256::from(100) + U256::from(10 * 1000));
1584    }
1585
1586    #[test]
1587    fn test_eth_pooled_transaction_new_eip1559() {
1588        // Create an EIP-1559 transaction with specific parameters
1589        let tx = Transaction::Eip1559(TxEip1559 {
1590            max_fee_per_gas: 10,
1591            gas_limit: 1000,
1592            value: U256::from(100),
1593            ..Default::default()
1594        });
1595        let signature = Signature::test_signature();
1596        let signed_tx = TransactionSigned::new_unhashed(tx, signature);
1597        let transaction = Recovered::new_unchecked(signed_tx, Default::default());
1598        let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1599
1600        // Check that the pooled transaction is created correctly
1601        assert_eq!(pooled_tx.transaction, transaction);
1602        assert_eq!(pooled_tx.encoded_length, 200);
1603        assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1604        assert_eq!(pooled_tx.cost, U256::from(100) + U256::from(10 * 1000));
1605    }
1606
1607    #[test]
1608    fn test_eth_pooled_transaction_new_eip4844() {
1609        // Create an EIP-4844 transaction with specific parameters
1610        let tx = Transaction::Eip4844(TxEip4844 {
1611            max_fee_per_gas: 10,
1612            gas_limit: 1000,
1613            value: U256::from(100),
1614            max_fee_per_blob_gas: 5,
1615            blob_versioned_hashes: vec![B256::default()],
1616            ..Default::default()
1617        });
1618        let signature = Signature::test_signature();
1619        let signed_tx = TransactionSigned::new_unhashed(tx, signature);
1620        let transaction = Recovered::new_unchecked(signed_tx, Default::default());
1621        let pooled_tx = EthPooledTransaction::new(transaction.clone(), 300);
1622
1623        // Check that the pooled transaction is created correctly
1624        assert_eq!(pooled_tx.transaction, transaction);
1625        assert_eq!(pooled_tx.encoded_length, 300);
1626        assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::Missing);
1627        let expected_cost =
1628            U256::from(100) + U256::from(10 * 1000) + U256::from(5 * DATA_GAS_PER_BLOB);
1629        assert_eq!(pooled_tx.cost, expected_cost);
1630    }
1631
1632    #[test]
1633    fn test_eth_pooled_transaction_new_eip7702() {
1634        // Init an EIP-7702 transaction with specific parameters
1635        let tx = Transaction::Eip7702(TxEip7702 {
1636            max_fee_per_gas: 10,
1637            gas_limit: 1000,
1638            value: U256::from(100),
1639            ..Default::default()
1640        });
1641        let signature = Signature::test_signature();
1642        let signed_tx = TransactionSigned::new_unhashed(tx, signature);
1643        let transaction = Recovered::new_unchecked(signed_tx, Default::default());
1644        let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1645
1646        // Check that the pooled transaction is created correctly
1647        assert_eq!(pooled_tx.transaction, transaction);
1648        assert_eq!(pooled_tx.encoded_length, 200);
1649        assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1650        assert_eq!(pooled_tx.cost, U256::from(100) + U256::from(10 * 1000));
1651    }
1652
1653    #[test]
1654    fn test_pooled_transaction_limit() {
1655        // No limit should never exceed
1656        let limit_none = GetPooledTransactionLimit::None;
1657        // Any size should return false
1658        assert!(!limit_none.exceeds(1000));
1659
1660        // Size limit of 2MB (2 * 1024 * 1024 bytes)
1661        let size_limit_2mb = GetPooledTransactionLimit::ResponseSizeSoftLimit(2 * 1024 * 1024);
1662
1663        // Test with size below the limit
1664        // 1MB is below 2MB, should return false
1665        assert!(!size_limit_2mb.exceeds(1024 * 1024));
1666
1667        // Test with size exactly at the limit
1668        // 2MB equals the limit, should return false
1669        assert!(!size_limit_2mb.exceeds(2 * 1024 * 1024));
1670
1671        // Test with size exceeding the limit
1672        // 3MB is above the 2MB limit, should return true
1673        assert!(size_limit_2mb.exceeds(3 * 1024 * 1024));
1674    }
1675}