Skip to main content

reth_transaction_pool/
traits.rs

1//! Transaction Pool Traits and Types
2//!
3//! This module defines the core abstractions for transaction pool implementations,
4//! handling the complexity of different transaction representations across the
5//! network, mempool, and the chain itself.
6//!
7//! ## Key Concepts
8//!
9//! ### Transaction Representations
10//!
11//! Transactions exist in different formats throughout their lifecycle:
12//!
13//! 1. **Consensus Format** ([`PoolTransaction::Consensus`])
14//!    - The canonical format stored in blocks
15//!    - Minimal size for efficient storage
16//!    - Example: EIP-4844 transactions store only blob hashes: ([`TransactionSigned::Eip4844`])
17//!
18//! 2. **Pooled Format** ([`PoolTransaction::Pooled`])
19//!    - Extended format for network propagation
20//!    - Includes additional validation data
21//!    - Example: EIP-4844 transactions include full blob sidecars: ([`PooledTransactionVariant`])
22//!
23//! ### Type Relationships
24//!
25//! ```text
26//! NodePrimitives::SignedTx  ←──   NetworkPrimitives::BroadcastedTransaction
27//!        │                              │
28//!        │ (consensus format)           │ (announced to peers)
29//!        │                              │
30//!        └──────────┐  ┌────────────────┘
31//!                   ▼  ▼
32//!            PoolTransaction::Consensus
33//!                   │ ▲
34//!                   │ │ from pooled (always succeeds)
35//!                   │ │
36//!                   ▼ │ try_from consensus (may fail)
37//!            PoolTransaction::Pooled  ←──→  NetworkPrimitives::PooledTransaction
38//!                                             (sent on request)
39//! ```
40//!
41//! ### Special Cases
42//!
43//! #### EIP-4844 Blob Transactions
44//! - Consensus format: Only blob hashes (32 bytes each)
45//! - Pooled format: Full blobs + commitments + proofs (large data per blob)
46//! - Network behavior: Not broadcast automatically, only sent on explicit request
47//!
48//! #### Optimism Deposit Transactions
49//! - Only exist in consensus format
50//! - Never enter the mempool (system transactions)
51//! - Conversion from consensus to pooled always fails
52
53use crate::{
54    blobstore::BlobStoreError,
55    error::{InvalidPoolTransactionError, PoolError, PoolResult},
56    pool::{
57        state::SubPool, BestTransactionFilter, NewTransactionEvent, TransactionEvents,
58        TransactionListenerKind,
59    },
60    validate::ValidPoolTransaction,
61    AddedTransactionOutcome, AllTransactionsEvents,
62};
63use alloy_consensus::{error::ValueError, transaction::TxHashRef, BlockHeader, Signed, Typed2718};
64use alloy_eips::{
65    eip2718::{Encodable2718, WithEncoded},
66    eip2930::AccessList,
67    eip4844::{
68        env_settings::KzgSettings, BlobAndProofV1, BlobAndProofV2, BlobTransactionValidationError,
69    },
70    eip7594::BlobTransactionSidecarVariant,
71    eip7702::SignedAuthorization,
72};
73use alloy_primitives::{map::AddressSet, Address, Bytes, TxHash, TxKind, B256, U256};
74use futures_util::{ready, Stream};
75use reth_eth_wire_types::HandleMempoolData;
76use reth_ethereum_primitives::{PooledTransactionVariant, TransactionSigned};
77use reth_execution_types::ChangedAccount;
78use reth_primitives_traits::{Block, InMemorySize, Recovered, SealedBlock, SignedTransaction};
79use serde::{Deserialize, Serialize};
80use std::{
81    collections::HashMap,
82    fmt,
83    fmt::Debug,
84    future::Future,
85    pin::Pin,
86    sync::Arc,
87    task::{Context, Poll},
88};
89use tokio::sync::mpsc::Receiver;
90
91/// The `PeerId` type.
92pub type PeerId = alloy_primitives::B512;
93
94/// Helper type alias to access [`PoolTransaction`] for a given [`TransactionPool`].
95pub type PoolTx<P> = <P as TransactionPool>::Transaction;
96/// Helper type alias to access [`PoolTransaction::Consensus`] for a given [`TransactionPool`].
97pub type PoolConsensusTx<P> = <<P as TransactionPool>::Transaction as PoolTransaction>::Consensus;
98
99/// Helper type alias to access [`PoolTransaction::Pooled`] for a given [`TransactionPool`].
100pub type PoolPooledTx<P> = <<P as TransactionPool>::Transaction as PoolTransaction>::Pooled;
101
102/// General purpose abstraction of a transaction-pool.
103///
104/// This is intended to be used by API-consumers such as RPC that need inject new incoming,
105/// unverified transactions. And by block production that needs to get transactions to execute in a
106/// new block.
107///
108/// Note: This requires `Clone` for convenience, since it is assumed that this will be implemented
109/// for a wrapped `Arc` type, see also [`Pool`](crate::Pool).
110#[auto_impl::auto_impl(&, Arc)]
111pub trait TransactionPool: Clone + Debug + Send + Sync {
112    /// The transaction type of the pool
113    type Transaction: EthPoolTransaction;
114
115    /// Returns stats about the pool and all sub-pools.
116    fn pool_size(&self) -> PoolSize;
117
118    /// Returns the block the pool is currently tracking.
119    ///
120    /// This tracks the block that the pool has last seen.
121    fn block_info(&self) -> BlockInfo;
122
123    /// Imports an _external_ transaction.
124    ///
125    /// This is intended to be used by the network to insert incoming transactions received over the
126    /// p2p network.
127    ///
128    /// Consumer: P2P
129    fn add_external_transaction(
130        &self,
131        transaction: Self::Transaction,
132    ) -> impl Future<Output = PoolResult<AddedTransactionOutcome>> + Send {
133        self.add_transaction(TransactionOrigin::External, transaction)
134    }
135
136    /// Imports all _external_ transactions
137    ///
138    /// Consumer: Utility
139    fn add_external_transactions(
140        &self,
141        transactions: Vec<Self::Transaction>,
142    ) -> impl Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send {
143        self.add_transactions(TransactionOrigin::External, transactions)
144    }
145
146    /// Adds an _unvalidated_ transaction into the pool and subscribe to state changes.
147    ///
148    /// This is the same as [`TransactionPool::add_transaction`] but returns an event stream for the
149    /// given transaction.
150    ///
151    /// Consumer: Custom
152    fn add_transaction_and_subscribe(
153        &self,
154        origin: TransactionOrigin,
155        transaction: Self::Transaction,
156    ) -> impl Future<Output = PoolResult<TransactionEvents>> + Send;
157
158    /// Adds an _unvalidated_ transaction into the pool.
159    ///
160    /// Consumer: RPC
161    fn add_transaction(
162        &self,
163        origin: TransactionOrigin,
164        transaction: Self::Transaction,
165    ) -> impl Future<Output = PoolResult<AddedTransactionOutcome>> + Send;
166
167    /// Adds the given _unvalidated_ transactions into the pool.
168    ///
169    /// All transactions will use the same `origin`.
170    ///
171    /// Returns a list of results.
172    ///
173    /// Consumer: RPC
174    fn add_transactions(
175        &self,
176        origin: TransactionOrigin,
177        transactions: Vec<Self::Transaction>,
178    ) -> impl Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send;
179
180    /// Adds the given _unvalidated_ transactions into the pool.
181    ///
182    /// Each transaction is paired with its own [`TransactionOrigin`].
183    ///
184    /// Returns a list of results.
185    ///
186    /// Consumer: RPC
187    fn add_transactions_with_origins(
188        &self,
189        transactions: Vec<(TransactionOrigin, Self::Transaction)>,
190    ) -> impl Future<Output = Vec<PoolResult<AddedTransactionOutcome>>> + Send;
191
192    /// Submit a consensus transaction directly to the pool
193    fn add_consensus_transaction(
194        &self,
195        tx: Recovered<<Self::Transaction as PoolTransaction>::Consensus>,
196        origin: TransactionOrigin,
197    ) -> impl Future<Output = PoolResult<AddedTransactionOutcome>> + Send {
198        async move {
199            let tx_hash = *tx.tx_hash();
200
201            let pool_transaction = match Self::Transaction::try_from_consensus(tx) {
202                Ok(tx) => tx,
203                Err(e) => return Err(PoolError::other(tx_hash, e.to_string())),
204            };
205
206            self.add_transaction(origin, pool_transaction).await
207        }
208    }
209
210    /// Submit a consensus transaction and subscribe to event stream
211    fn add_consensus_transaction_and_subscribe(
212        &self,
213        tx: Recovered<<Self::Transaction as PoolTransaction>::Consensus>,
214        origin: TransactionOrigin,
215    ) -> impl Future<Output = PoolResult<TransactionEvents>> + Send {
216        async move {
217            let tx_hash = *tx.tx_hash();
218
219            let pool_transaction = match Self::Transaction::try_from_consensus(tx) {
220                Ok(tx) => tx,
221                Err(e) => return Err(PoolError::other(tx_hash, e.to_string())),
222            };
223
224            self.add_transaction_and_subscribe(origin, pool_transaction).await
225        }
226    }
227
228    /// Returns a new transaction change event stream for the given transaction.
229    ///
230    /// Returns `None` if the transaction is not in the pool.
231    fn transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents>;
232
233    /// Returns a new transaction change event stream for _all_ transactions in the pool.
234    fn all_transactions_event_listener(&self) -> AllTransactionsEvents<Self::Transaction>;
235
236    /// Returns a new Stream that yields transactions hashes for new __pending__ transactions
237    /// inserted into the pool that are allowed to be propagated.
238    ///
239    /// Note: This is intended for networking and will __only__ yield transactions that are allowed
240    /// to be propagated over the network, see also [`TransactionListenerKind`].
241    ///
242    /// Consumer: RPC/P2P
243    fn pending_transactions_listener(&self) -> Receiver<TxHash> {
244        self.pending_transactions_listener_for(TransactionListenerKind::PropagateOnly)
245    }
246
247    /// Returns a new [Receiver] that yields transactions hashes for new __pending__ transactions
248    /// inserted into the pending pool depending on the given [`TransactionListenerKind`] argument.
249    fn pending_transactions_listener_for(&self, kind: TransactionListenerKind) -> Receiver<TxHash>;
250
251    /// Returns a new stream that yields new valid transactions added to the pool.
252    fn new_transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>> {
253        self.new_transactions_listener_for(TransactionListenerKind::PropagateOnly)
254    }
255
256    /// Returns a new [Receiver] that yields blob "sidecars" (blobs w/ assoc. kzg
257    /// commitments/proofs) for eip-4844 transactions inserted into the pool
258    fn blob_transaction_sidecars_listener(&self) -> Receiver<NewBlobSidecar>;
259
260    /// Returns a new stream that yields new valid transactions added to the pool
261    /// depending on the given [`TransactionListenerKind`] argument.
262    fn new_transactions_listener_for(
263        &self,
264        kind: TransactionListenerKind,
265    ) -> Receiver<NewTransactionEvent<Self::Transaction>>;
266
267    /// Returns a new Stream that yields new transactions added to the pending sub-pool.
268    ///
269    /// This is a convenience wrapper around [`Self::new_transactions_listener`] that filters for
270    /// [`SubPool::Pending`](crate::SubPool).
271    fn new_pending_pool_transactions_listener(
272        &self,
273    ) -> NewSubpoolTransactionStream<Self::Transaction> {
274        NewSubpoolTransactionStream::new(
275            self.new_transactions_listener_for(TransactionListenerKind::PropagateOnly),
276            SubPool::Pending,
277        )
278    }
279
280    /// Returns a new Stream that yields new transactions added to the basefee sub-pool.
281    ///
282    /// This is a convenience wrapper around [`Self::new_transactions_listener`] that filters for
283    /// [`SubPool::BaseFee`](crate::SubPool).
284    fn new_basefee_pool_transactions_listener(
285        &self,
286    ) -> NewSubpoolTransactionStream<Self::Transaction> {
287        NewSubpoolTransactionStream::new(self.new_transactions_listener(), SubPool::BaseFee)
288    }
289
290    /// Returns a new Stream that yields new transactions added to the queued-pool.
291    ///
292    /// This is a convenience wrapper around [`Self::new_transactions_listener`] that filters for
293    /// [`SubPool::Queued`](crate::SubPool).
294    fn new_queued_transactions_listener(&self) -> NewSubpoolTransactionStream<Self::Transaction> {
295        NewSubpoolTransactionStream::new(self.new_transactions_listener(), SubPool::Queued)
296    }
297
298    /// Returns a new Stream that yields new transactions added to the blob sub-pool.
299    ///
300    /// This is a convenience wrapper around [`Self::new_transactions_listener`] that filters for
301    /// [`SubPool::Blob`](crate::SubPool).
302    fn new_blob_pool_transactions_listener(
303        &self,
304    ) -> NewSubpoolTransactionStream<Self::Transaction> {
305        NewSubpoolTransactionStream::new(self.new_transactions_listener(), SubPool::Blob)
306    }
307
308    /// Returns the _hashes_ of all transactions in the pool that are allowed to be propagated.
309    ///
310    /// This excludes hashes that aren't allowed to be propagated.
311    ///
312    /// Note: This returns a `Vec` but should guarantee that all hashes are unique.
313    ///
314    /// Consumer: P2P
315    fn pooled_transaction_hashes(&self) -> Vec<TxHash>;
316
317    /// Returns only the first `max` hashes of transactions in the pool.
318    ///
319    /// Consumer: P2P
320    fn pooled_transaction_hashes_max(&self, max: usize) -> Vec<TxHash>;
321
322    /// Returns the _full_ transaction objects all transactions in the pool that are allowed to be
323    /// propagated.
324    ///
325    /// This is intended to be used by the network for the initial exchange of pooled transaction
326    /// _hashes_
327    ///
328    /// Note: This returns a `Vec` but should guarantee that all transactions are unique.
329    ///
330    /// Caution: In case of blob transactions, this does not include the sidecar.
331    ///
332    /// Consumer: P2P
333    fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
334
335    /// Returns only the first `max` transactions in the pool.
336    ///
337    /// Consumer: P2P
338    fn pooled_transactions_max(
339        &self,
340        max: usize,
341    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
342
343    /// Returns converted [`PooledTransactionVariant`] for the given transaction hashes that are
344    /// allowed to be propagated.
345    ///
346    /// This adheres to the expected behavior of
347    /// [`GetPooledTransactions`](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getpooledtransactions-0x09):
348    ///
349    /// The transactions must be in same order as in the request, but it is OK to skip transactions
350    /// which are not available.
351    ///
352    /// If the transaction is a blob transaction, the sidecar will be included.
353    ///
354    /// Consumer: P2P
355    fn get_pooled_transaction_elements(
356        &self,
357        tx_hashes: Vec<TxHash>,
358        limit: GetPooledTransactionLimit,
359    ) -> Vec<<Self::Transaction as PoolTransaction>::Pooled>;
360
361    /// Extends the given vector with pooled transactions for the given hashes that are allowed to
362    /// be propagated.
363    ///
364    /// This adheres to the expected behavior of [`Self::get_pooled_transaction_elements`].
365    ///
366    /// Consumer: P2P
367    fn append_pooled_transaction_elements(
368        &self,
369        tx_hashes: &[TxHash],
370        limit: GetPooledTransactionLimit,
371        out: &mut Vec<<Self::Transaction as PoolTransaction>::Pooled>,
372    ) {
373        out.extend(self.get_pooled_transaction_elements(tx_hashes.to_vec(), limit));
374    }
375
376    /// Returns the pooled transaction variant for the given transaction hash.
377    ///
378    /// This adheres to the expected behavior of
379    /// [`GetPooledTransactions`](https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getpooledtransactions-0x09):
380    ///
381    /// If the transaction is a blob transaction, the sidecar will be included.
382    ///
383    /// It is expected that this variant represents the valid p2p format for full transactions.
384    /// E.g. for EIP-4844 transactions this is the consensus transaction format with the blob
385    /// sidecar.
386    ///
387    /// Consumer: P2P
388    fn get_pooled_transaction_element(
389        &self,
390        tx_hash: TxHash,
391    ) -> Option<Recovered<<Self::Transaction as PoolTransaction>::Pooled>>;
392
393    /// Returns an iterator that yields transactions that are ready for block production.
394    ///
395    /// Consumer: Block production
396    fn best_transactions(
397        &self,
398    ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>>;
399
400    /// Returns an iterator that yields transactions that are ready for block production with the
401    /// given base fee and optional blob fee attributes.
402    ///
403    /// Consumer: Block production
404    fn best_transactions_with_attributes(
405        &self,
406        best_transactions_attributes: BestTransactionsAttributes,
407    ) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>>;
408
409    /// Returns all transactions that can be included in the next block.
410    ///
411    /// This is primarily used for the `txpool_` RPC namespace:
412    /// <https://geth.ethereum.org/docs/interacting-with-geth/rpc/ns-txpool> which distinguishes
413    /// between `pending` and `queued` transactions, where `pending` are transactions ready for
414    /// inclusion in the next block and `queued` are transactions that are ready for inclusion in
415    /// future blocks.
416    ///
417    /// Consumer: RPC
418    fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
419
420    /// Returns a pending transaction if it exists and is ready for immediate execution
421    /// (i.e., has the lowest nonce among the sender's pending transactions).
422    fn get_pending_transaction_by_sender_and_nonce(
423        &self,
424        sender: Address,
425        nonce: u64,
426    ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
427        self.best_transactions().find(|tx| tx.sender() == sender && tx.nonce() == nonce)
428    }
429
430    /// Returns first `max` transactions that can be included in the next block.
431    /// See <https://github.com/paradigmxyz/reth/issues/12767#issuecomment-2493223579>
432    ///
433    /// Consumer: Block production
434    fn pending_transactions_max(
435        &self,
436        max: usize,
437    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
438
439    /// Returns all transactions that can be included in _future_ blocks.
440    ///
441    /// This and [`Self::pending_transactions`] are mutually exclusive.
442    ///
443    /// Consumer: RPC
444    fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
445
446    /// Returns the number of transactions that are ready for inclusion in the next block and the
447    /// number of transactions that are ready for inclusion in future blocks: `(pending, queued)`.
448    fn pending_and_queued_txn_count(&self) -> (usize, usize);
449
450    /// Returns all transactions that are currently in the pool grouped by whether they are ready
451    /// for inclusion in the next block or not.
452    ///
453    /// This is primarily used for the `txpool_` namespace: <https://geth.ethereum.org/docs/interacting-with-geth/rpc/ns-txpool>
454    ///
455    /// Consumer: RPC
456    fn all_transactions(&self) -> AllPoolTransactions<Self::Transaction>;
457
458    /// Returns the _hashes_ of all transactions regardless of whether they can be propagated or
459    /// not.
460    ///
461    /// Unlike [`Self::pooled_transaction_hashes`] this doesn't consider whether the transaction can
462    /// be propagated or not.
463    ///
464    /// Note: This returns a `Vec` but should guarantee that all hashes are unique.
465    ///
466    /// Consumer: Utility
467    fn all_transaction_hashes(&self) -> Vec<TxHash>;
468
469    /// Removes a single transaction corresponding to the given hash.
470    ///
471    /// Note: This removes the transaction as if it got discarded (_not_ mined).
472    ///
473    /// Returns the removed transaction if it was found in the pool.
474    ///
475    /// Consumer: Utility
476    fn remove_transaction(
477        &self,
478        hash: TxHash,
479    ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
480        self.remove_transactions(vec![hash]).pop()
481    }
482
483    /// Removes all transactions corresponding to the given hashes.
484    ///
485    /// Note: This removes the transactions as if they got discarded (_not_ mined).
486    ///
487    /// Consumer: Utility
488    fn remove_transactions(
489        &self,
490        hashes: Vec<TxHash>,
491    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
492
493    /// Removes all transactions corresponding to the given hashes.
494    ///
495    /// Also removes all _dependent_ transactions.
496    ///
497    /// Consumer: Utility
498    fn remove_transactions_and_descendants(
499        &self,
500        hashes: Vec<TxHash>,
501    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
502
503    /// Removes all transactions from the given sender
504    ///
505    /// Consumer: Utility
506    fn remove_transactions_by_sender(
507        &self,
508        sender: Address,
509    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
510
511    /// Prunes a single transaction from the pool.
512    ///
513    /// This is similar to [`Self::remove_transaction`] but treats the transaction as _mined_
514    /// rather than discarded. The key difference is that pruning does **not** park descendant
515    /// transactions: their nonce requirements are considered satisfied, so they remain in whatever
516    /// sub-pool they currently occupy and can be included in the next block.
517    ///
518    /// In contrast, [`Self::remove_transaction`] treats the removal as a discard, which
519    /// introduces a nonce gap and moves all descendant transactions to the queued (parked)
520    /// sub-pool.
521    ///
522    /// Returns the pruned transaction if it existed in the pool.
523    ///
524    /// Consumer: Utility
525    fn prune_transaction(
526        &self,
527        hash: TxHash,
528    ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
529        self.prune_transactions(vec![hash]).pop()
530    }
531
532    /// Prunes all transactions corresponding to the given hashes from the pool.
533    ///
534    /// This behaves like [`Self::prune_transaction`] but for multiple transactions at once.
535    /// Each transaction is removed as if it was mined: descendant transactions are **not** parked
536    /// and their nonce requirements are considered satisfied.
537    ///
538    /// This is useful for scenarios like Flashblocks where transactions are committed across
539    /// multiple partial blocks without a canonical state update: previously committed transactions
540    /// can be pruned so that the best-transactions iterator yields their descendants in the
541    /// correct priority order.
542    ///
543    /// Consumer: Utility
544    fn prune_transactions(
545        &self,
546        hashes: Vec<TxHash>,
547    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
548
549    /// Retains only those hashes that are unknown to the pool.
550    /// In other words, removes all transactions from the given set that are currently present in
551    /// the pool. Returns hashes already known to the pool.
552    ///
553    /// Consumer: P2P
554    fn retain_unknown<A>(&self, announcement: &mut A)
555    where
556        A: HandleMempoolData;
557
558    /// Returns if the transaction for the given hash is already included in this pool.
559    fn contains(&self, tx_hash: &TxHash) -> bool {
560        self.get(tx_hash).is_some()
561    }
562
563    /// Returns the transaction for the given hash.
564    fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
565
566    /// Returns all transactions objects for the given hashes.
567    ///
568    /// Caution: This in case of blob transactions, this does not include the sidecar.
569    fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
570
571    /// Notify the pool about transactions that are propagated to peers.
572    ///
573    /// Consumer: P2P
574    fn on_propagated(&self, txs: PropagatedTransactions);
575
576    /// Returns all transactions sent by a given user
577    fn get_transactions_by_sender(
578        &self,
579        sender: Address,
580    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
581
582    /// Returns all pending transactions filtered by predicate
583    fn get_pending_transactions_with_predicate(
584        &self,
585        predicate: impl FnMut(&ValidPoolTransaction<Self::Transaction>) -> bool,
586    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
587
588    /// Returns all pending transactions sent by a given user
589    fn get_pending_transactions_by_sender(
590        &self,
591        sender: Address,
592    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
593
594    /// Returns all queued transactions sent by a given user
595    fn get_queued_transactions_by_sender(
596        &self,
597        sender: Address,
598    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
599
600    /// Returns the highest transaction sent by a given user
601    fn get_highest_transaction_by_sender(
602        &self,
603        sender: Address,
604    ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
605
606    /// Returns the transaction with the highest nonce that is executable given the on chain nonce.
607    /// In other words the highest non nonce gapped transaction.
608    ///
609    /// Note: The next pending pooled transaction must have the on chain nonce.
610    ///
611    /// For example, for a given on chain nonce of `5`, the next transaction must have that nonce.
612    /// If the pool contains txs `[5,6,7]` this returns tx `7`.
613    /// If the pool contains txs `[6,7]` this returns `None` because the next valid nonce (5) is
614    /// missing, which means txs `[6,7]` are nonce gapped.
615    fn get_highest_consecutive_transaction_by_sender(
616        &self,
617        sender: Address,
618        on_chain_nonce: u64,
619    ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
620
621    /// Returns a transaction sent by a given user and a nonce
622    fn get_transaction_by_sender_and_nonce(
623        &self,
624        sender: Address,
625        nonce: u64,
626    ) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
627
628    /// Returns all transactions that where submitted with the given [`TransactionOrigin`]
629    fn get_transactions_by_origin(
630        &self,
631        origin: TransactionOrigin,
632    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
633
634    /// Returns all pending transactions filtered by [`TransactionOrigin`]
635    fn get_pending_transactions_by_origin(
636        &self,
637        origin: TransactionOrigin,
638    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
639
640    /// Returns all transactions that where submitted as [`TransactionOrigin::Local`]
641    fn get_local_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
642        self.get_transactions_by_origin(TransactionOrigin::Local)
643    }
644
645    /// Returns all transactions that where submitted as [`TransactionOrigin::Private`]
646    fn get_private_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
647        self.get_transactions_by_origin(TransactionOrigin::Private)
648    }
649
650    /// Returns all transactions that where submitted as [`TransactionOrigin::External`]
651    fn get_external_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
652        self.get_transactions_by_origin(TransactionOrigin::External)
653    }
654
655    /// Returns all pending transactions that where submitted as [`TransactionOrigin::Local`]
656    fn get_local_pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
657        self.get_pending_transactions_by_origin(TransactionOrigin::Local)
658    }
659
660    /// Returns all pending transactions that where submitted as [`TransactionOrigin::Private`]
661    fn get_private_pending_transactions(
662        &self,
663    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
664        self.get_pending_transactions_by_origin(TransactionOrigin::Private)
665    }
666
667    /// Returns all pending transactions that where submitted as [`TransactionOrigin::External`]
668    fn get_external_pending_transactions(
669        &self,
670    ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
671        self.get_pending_transactions_by_origin(TransactionOrigin::External)
672    }
673
674    /// Returns a set of all senders of transactions in the pool
675    fn unique_senders(&self) -> AddressSet;
676
677    /// Returns the [`BlobTransactionSidecarVariant`] for the given transaction hash if it exists in
678    /// the blob store.
679    fn get_blob(
680        &self,
681        tx_hash: TxHash,
682    ) -> Result<Option<Arc<BlobTransactionSidecarVariant>>, BlobStoreError>;
683
684    /// Returns all [`BlobTransactionSidecarVariant`] for the given transaction hashes if they
685    /// exists in the blob store.
686    ///
687    /// This only returns the blobs that were found in the store.
688    /// If there's no blob it will not be returned.
689    fn get_all_blobs(
690        &self,
691        tx_hashes: Vec<TxHash>,
692    ) -> Result<Vec<(TxHash, Arc<BlobTransactionSidecarVariant>)>, BlobStoreError>;
693
694    /// Returns the exact [`BlobTransactionSidecarVariant`] for the given transaction hashes in the
695    /// order they were requested.
696    ///
697    /// Returns an error if any of the blobs are not found in the blob store.
698    fn get_all_blobs_exact(
699        &self,
700        tx_hashes: Vec<TxHash>,
701    ) -> Result<Vec<Arc<BlobTransactionSidecarVariant>>, BlobStoreError>;
702
703    /// Return the [`BlobAndProofV1`]s for a list of blob versioned hashes.
704    fn get_blobs_for_versioned_hashes_v1(
705        &self,
706        versioned_hashes: &[B256],
707    ) -> Result<Vec<Option<BlobAndProofV1>>, BlobStoreError>;
708
709    /// Return the [`BlobAndProofV2`]s for a list of blob versioned hashes.
710    /// Blobs and proofs are returned only if they are present for _all_ of the requested versioned
711    /// hashes.
712    fn get_blobs_for_versioned_hashes_v2(
713        &self,
714        versioned_hashes: &[B256],
715    ) -> Result<Option<Vec<BlobAndProofV2>>, BlobStoreError>;
716
717    /// Return the [`BlobAndProofV2`]s for a list of blob versioned hashes.
718    ///
719    /// The response is always the same length as the request. Missing or older-version blobs are
720    /// returned as `None` elements.
721    fn get_blobs_for_versioned_hashes_v3(
722        &self,
723        versioned_hashes: &[B256],
724    ) -> Result<Vec<Option<BlobAndProofV2>>, BlobStoreError>;
725}
726
727/// Extension for [`TransactionPool`] trait that allows to set the current block info.
728#[auto_impl::auto_impl(&, Arc)]
729pub trait TransactionPoolExt: TransactionPool {
730    /// The block type used for chain tip updates.
731    type Block: Block;
732
733    /// Sets the current block info for the pool.
734    fn set_block_info(&self, info: BlockInfo);
735
736    /// Event listener for when the pool needs to be updated.
737    ///
738    /// Implementers need to update the pool accordingly:
739    ///
740    /// ## Fee changes
741    ///
742    /// The [`CanonicalStateUpdate`] includes the base and blob fee of the pending block, which
743    /// affects the dynamic fee requirement of pending transactions in the pool.
744    ///
745    /// ## EIP-4844 Blob transactions
746    ///
747    /// Mined blob transactions need to be removed from the pool, but from the pool only. The blob
748    /// sidecar must not be removed from the blob store. Only after a blob transaction is
749    /// finalized, its sidecar is removed from the blob store. This ensures that in case of a reorg,
750    /// the sidecar is still available.
751    fn on_canonical_state_change(&self, update: CanonicalStateUpdate<'_, Self::Block>);
752
753    /// Updates the accounts in the pool
754    fn update_accounts(&self, accounts: Vec<ChangedAccount>);
755
756    /// Deletes the blob sidecar for the given transaction from the blob store
757    fn delete_blob(&self, tx: B256);
758
759    /// Deletes multiple blob sidecars from the blob store
760    fn delete_blobs(&self, txs: Vec<B256>);
761
762    /// Maintenance function to cleanup blobs that are no longer needed.
763    fn cleanup_blobs(&self);
764}
765
766/// A Helper type that bundles all transactions in the pool.
767#[derive(Debug, Clone)]
768pub struct AllPoolTransactions<T: PoolTransaction> {
769    /// Transactions that are ready for inclusion in the next block.
770    pub pending: Vec<Arc<ValidPoolTransaction<T>>>,
771    /// Transactions that are ready for inclusion in _future_ blocks, but are currently parked,
772    /// because they depend on other transactions that are not yet included in the pool (nonce gap)
773    /// or otherwise blocked.
774    pub queued: Vec<Arc<ValidPoolTransaction<T>>>,
775}
776
777// === impl AllPoolTransactions ===
778
779impl<T: PoolTransaction> AllPoolTransactions<T> {
780    /// Returns the combined number of all transactions.
781    pub const fn count(&self) -> usize {
782        self.pending.len() + self.queued.len()
783    }
784
785    /// Returns an iterator over all pending [`Recovered`] transactions.
786    pub fn pending_recovered(&self) -> impl Iterator<Item = Recovered<T::Consensus>> + '_ {
787        self.pending.iter().map(|tx| tx.to_consensus())
788    }
789
790    /// Returns an iterator over all queued [`Recovered`] transactions.
791    pub fn queued_recovered(&self) -> impl Iterator<Item = Recovered<T::Consensus>> + '_ {
792        self.queued.iter().map(|tx| tx.to_consensus())
793    }
794
795    /// Returns an iterator over all transactions, both pending and queued.
796    pub fn all(&self) -> impl Iterator<Item = Recovered<T::Consensus>> + '_ {
797        self.pending.iter().chain(self.queued.iter()).map(|tx| tx.to_consensus())
798    }
799}
800
801impl<T: PoolTransaction> Default for AllPoolTransactions<T> {
802    fn default() -> Self {
803        Self { pending: Default::default(), queued: Default::default() }
804    }
805}
806
807impl<T: PoolTransaction> IntoIterator for AllPoolTransactions<T> {
808    type Item = Arc<ValidPoolTransaction<T>>;
809    type IntoIter = std::iter::Chain<
810        std::vec::IntoIter<Arc<ValidPoolTransaction<T>>>,
811        std::vec::IntoIter<Arc<ValidPoolTransaction<T>>>,
812    >;
813
814    fn into_iter(self) -> Self::IntoIter {
815        self.pending.into_iter().chain(self.queued)
816    }
817}
818
819/// Represents transactions that were propagated over the network.
820#[derive(Debug, Clone, Eq, PartialEq, Default)]
821pub struct PropagatedTransactions(pub HashMap<TxHash, Vec<PropagateKind>>);
822
823impl PropagatedTransactions {
824    /// Records a propagation of a transaction to a peer.
825    pub fn record(&mut self, hash: TxHash, kind: PropagateKind) {
826        self.0.entry(hash).or_default().push(kind);
827    }
828
829    /// Returns the number of distinct transactions that were propagated.
830    pub fn len(&self) -> usize {
831        self.0.len()
832    }
833
834    /// Returns true if no transactions were propagated.
835    pub fn is_empty(&self) -> bool {
836        self.0.is_empty()
837    }
838
839    /// Returns the propagation info for a specific transaction.
840    pub fn get(&self, hash: &TxHash) -> Option<&[PropagateKind]> {
841        self.0.get(hash).map(Vec::as_slice)
842    }
843}
844
845impl IntoIterator for PropagatedTransactions {
846    type Item = (TxHash, Vec<PropagateKind>);
847    type IntoIter = std::collections::hash_map::IntoIter<TxHash, Vec<PropagateKind>>;
848
849    fn into_iter(self) -> Self::IntoIter {
850        self.0.into_iter()
851    }
852}
853
854/// Represents how a transaction was propagated over the network.
855#[derive(Debug, Copy, Clone, Eq, PartialEq)]
856#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
857pub enum PropagateKind {
858    /// The full transaction object was sent to the peer.
859    ///
860    /// This is equivalent to the `Transaction` message
861    Full(PeerId),
862    /// Only the Hash was propagated to the peer.
863    Hash(PeerId),
864}
865
866// === impl PropagateKind ===
867
868impl PropagateKind {
869    /// Returns the peer the transaction was sent to
870    pub const fn peer(&self) -> &PeerId {
871        match self {
872            Self::Full(peer) | Self::Hash(peer) => peer,
873        }
874    }
875
876    /// Returns true if the transaction was sent as a full transaction
877    pub const fn is_full(&self) -> bool {
878        matches!(self, Self::Full(_))
879    }
880
881    /// Returns true if the transaction was sent as a hash
882    pub const fn is_hash(&self) -> bool {
883        matches!(self, Self::Hash(_))
884    }
885}
886
887impl From<PropagateKind> for PeerId {
888    fn from(value: PropagateKind) -> Self {
889        match value {
890            PropagateKind::Full(peer) | PropagateKind::Hash(peer) => peer,
891        }
892    }
893}
894
895/// This type represents a new blob sidecar that has been stored in the transaction pool's
896/// blobstore; it includes the `TransactionHash` of the blob transaction along with the assoc.
897/// sidecar (blobs, commitments, proofs)
898#[derive(Debug, Clone)]
899pub struct NewBlobSidecar {
900    /// hash of the EIP-4844 transaction.
901    pub tx_hash: TxHash,
902    /// the blob transaction sidecar.
903    pub sidecar: Arc<BlobTransactionSidecarVariant>,
904}
905
906/// Where the transaction originates from.
907///
908/// Depending on where the transaction was picked up, it affects how the transaction is handled
909/// internally, e.g. limits for simultaneous transaction of one sender.
910#[derive(Debug, Copy, Clone, PartialEq, Eq, Default, Deserialize, Serialize)]
911pub enum TransactionOrigin {
912    /// Transaction is coming from a local source.
913    #[default]
914    Local,
915    /// Transaction has been received externally.
916    ///
917    /// This is usually considered an "untrusted" source, for example received from another in the
918    /// network.
919    External,
920    /// Transaction is originated locally and is intended to remain private.
921    ///
922    /// This type of transaction should not be propagated to the network. It's meant for
923    /// private usage within the local node only.
924    Private,
925}
926
927// === impl TransactionOrigin ===
928
929impl TransactionOrigin {
930    /// Whether the transaction originates from a local source.
931    pub const fn is_local(&self) -> bool {
932        matches!(self, Self::Local)
933    }
934
935    /// Whether the transaction originates from an external source.
936    pub const fn is_external(&self) -> bool {
937        matches!(self, Self::External)
938    }
939    /// Whether the transaction originates from a private source.
940    pub const fn is_private(&self) -> bool {
941        matches!(self, Self::Private)
942    }
943}
944
945/// Represents the kind of update to the canonical state.
946#[derive(Debug, Clone, Copy, PartialEq, Eq)]
947pub enum PoolUpdateKind {
948    /// The update was due to a block commit.
949    Commit,
950    /// The update was due to a reorganization.
951    Reorg,
952}
953
954/// Represents changes after a new canonical block or range of canonical blocks was added to the
955/// chain.
956///
957/// It is expected that this is only used if the added blocks are canonical to the pool's last known
958/// block hash. In other words, the first added block of the range must be the child of the last
959/// known block hash.
960///
961/// This is used to update the pool state accordingly.
962#[derive(Clone, Debug)]
963pub struct CanonicalStateUpdate<'a, B: Block> {
964    /// Hash of the tip block.
965    pub new_tip: &'a SealedBlock<B>,
966    /// EIP-1559 Base fee of the _next_ (pending) block
967    ///
968    /// The base fee of a block depends on the utilization of the last block and its base fee.
969    pub pending_block_base_fee: u64,
970    /// EIP-4844 blob fee of the _next_ (pending) block
971    ///
972    /// Only after Cancun
973    pub pending_block_blob_fee: Option<u128>,
974    /// A set of changed accounts across a range of blocks.
975    pub changed_accounts: Vec<ChangedAccount>,
976    /// All mined transactions in the block range.
977    pub mined_transactions: Vec<B256>,
978    /// The kind of update to the canonical state.
979    pub update_kind: PoolUpdateKind,
980}
981
982impl<B> CanonicalStateUpdate<'_, B>
983where
984    B: Block,
985{
986    /// Returns the number of the tip block.
987    pub fn number(&self) -> u64 {
988        self.new_tip.number()
989    }
990
991    /// Returns the hash of the tip block.
992    pub fn hash(&self) -> B256 {
993        self.new_tip.hash()
994    }
995
996    /// Timestamp of the latest chain update
997    pub fn timestamp(&self) -> u64 {
998        self.new_tip.timestamp()
999    }
1000
1001    /// Returns the block info for the tip block.
1002    pub fn block_info(&self) -> BlockInfo {
1003        BlockInfo {
1004            block_gas_limit: self.new_tip.gas_limit(),
1005            last_seen_block_hash: self.hash(),
1006            last_seen_block_number: self.number(),
1007            pending_basefee: self.pending_block_base_fee,
1008            pending_blob_fee: self.pending_block_blob_fee,
1009        }
1010    }
1011}
1012
1013impl<B> fmt::Display for CanonicalStateUpdate<'_, B>
1014where
1015    B: Block,
1016{
1017    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1018        f.debug_struct("CanonicalStateUpdate")
1019            .field("hash", &self.hash())
1020            .field("number", &self.number())
1021            .field("pending_block_base_fee", &self.pending_block_base_fee)
1022            .field("pending_block_blob_fee", &self.pending_block_blob_fee)
1023            .field("changed_accounts", &self.changed_accounts.len())
1024            .field("mined_transactions", &self.mined_transactions.len())
1025            .finish()
1026    }
1027}
1028
1029/// Alias to restrict the [`BestTransactions`] items to the pool's transaction type.
1030pub type BestTransactionsFor<Pool> = Box<
1031    dyn BestTransactions<Item = Arc<ValidPoolTransaction<<Pool as TransactionPool>::Transaction>>>,
1032>;
1033
1034/// An `Iterator` that only returns transactions that are ready to be executed.
1035///
1036/// This makes no assumptions about the order of the transactions, but expects that _all_
1037/// transactions are valid (no nonce gaps.) for the tracked state of the pool.
1038///
1039/// Note: this iterator will always return the best transaction that it currently knows.
1040/// There is no guarantee transactions will be returned sequentially in decreasing
1041/// priority order.
1042pub trait BestTransactions: Iterator + Send {
1043    /// Mark the transaction as invalid.
1044    ///
1045    /// Implementers must ensure all subsequent transaction _don't_ depend on this transaction.
1046    /// In other words, this must remove the given transaction _and_ drain all transaction that
1047    /// depend on it.
1048    fn mark_invalid(&mut self, transaction: &Self::Item, kind: &InvalidPoolTransactionError);
1049
1050    /// An iterator may be able to receive additional pending transactions that weren't present it
1051    /// the pool when it was created.
1052    ///
1053    /// This ensures that iterator will return the best transaction that it currently knows and not
1054    /// listen to pool updates.
1055    fn no_updates(&mut self);
1056
1057    /// Convenience function for [`Self::no_updates`] that returns the iterator again.
1058    fn without_updates(mut self) -> Self
1059    where
1060        Self: Sized,
1061    {
1062        self.no_updates();
1063        self
1064    }
1065
1066    /// Skip all blob transactions.
1067    ///
1068    /// There's only limited blob space available in a block, once exhausted, EIP-4844 transactions
1069    /// can no longer be included.
1070    ///
1071    /// If called then the iterator will no longer yield blob transactions.
1072    ///
1073    /// Note: this will also exclude any transactions that depend on blob transactions.
1074    fn skip_blobs(&mut self) {
1075        self.set_skip_blobs(true);
1076    }
1077
1078    /// Controls whether the iterator skips blob transactions or not.
1079    ///
1080    /// If set to true, no blob transactions will be returned.
1081    fn set_skip_blobs(&mut self, skip_blobs: bool);
1082
1083    /// Convenience function for [`Self::skip_blobs`] that returns the iterator again.
1084    fn without_blobs(mut self) -> Self
1085    where
1086        Self: Sized,
1087    {
1088        self.skip_blobs();
1089        self
1090    }
1091
1092    /// Creates an iterator which uses a closure to determine whether a transaction should be
1093    /// returned by the iterator.
1094    ///
1095    /// All items the closure returns false for are marked as invalid via [`Self::mark_invalid`] and
1096    /// descendant transactions will be skipped.
1097    fn filter_transactions<P>(self, predicate: P) -> BestTransactionFilter<Self, P>
1098    where
1099        P: FnMut(&Self::Item) -> bool,
1100        Self: Sized,
1101    {
1102        BestTransactionFilter::new(self, predicate)
1103    }
1104}
1105
1106impl<T> BestTransactions for Box<T>
1107where
1108    T: BestTransactions + ?Sized,
1109{
1110    fn mark_invalid(&mut self, transaction: &Self::Item, kind: &InvalidPoolTransactionError) {
1111        (**self).mark_invalid(transaction, kind)
1112    }
1113
1114    fn no_updates(&mut self) {
1115        (**self).no_updates();
1116    }
1117
1118    fn skip_blobs(&mut self) {
1119        (**self).skip_blobs();
1120    }
1121
1122    fn set_skip_blobs(&mut self, skip_blobs: bool) {
1123        (**self).set_skip_blobs(skip_blobs);
1124    }
1125}
1126
1127/// A no-op implementation that yields no transactions.
1128impl<T> BestTransactions for std::iter::Empty<T> {
1129    fn mark_invalid(&mut self, _tx: &T, _kind: &InvalidPoolTransactionError) {}
1130
1131    fn no_updates(&mut self) {}
1132
1133    fn skip_blobs(&mut self) {}
1134
1135    fn set_skip_blobs(&mut self, _skip_blobs: bool) {}
1136}
1137
1138/// A filter that allows to check if a transaction satisfies a set of conditions
1139pub trait TransactionFilter {
1140    /// The type of the transaction to check.
1141    type Transaction;
1142
1143    /// Returns true if the transaction satisfies the conditions.
1144    fn is_valid(&self, transaction: &Self::Transaction) -> bool;
1145}
1146
1147/// A no-op implementation of [`TransactionFilter`] which
1148/// marks all transactions as valid.
1149#[derive(Debug, Clone)]
1150pub struct NoopTransactionFilter<T>(std::marker::PhantomData<T>);
1151
1152// We can't derive Default because this forces T to be
1153// Default as well, which isn't necessary.
1154impl<T> Default for NoopTransactionFilter<T> {
1155    fn default() -> Self {
1156        Self(std::marker::PhantomData)
1157    }
1158}
1159
1160impl<T> TransactionFilter for NoopTransactionFilter<T> {
1161    type Transaction = T;
1162
1163    fn is_valid(&self, _transaction: &Self::Transaction) -> bool {
1164        true
1165    }
1166}
1167
1168/// A Helper type that bundles the best transactions attributes together.
1169#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1170pub struct BestTransactionsAttributes {
1171    /// The base fee attribute for best transactions.
1172    pub basefee: u64,
1173    /// The blob fee attribute for best transactions.
1174    pub blob_fee: Option<u64>,
1175}
1176
1177// === impl BestTransactionsAttributes ===
1178
1179impl BestTransactionsAttributes {
1180    /// Creates a new `BestTransactionsAttributes` with the given basefee and blob fee.
1181    pub const fn new(basefee: u64, blob_fee: Option<u64>) -> Self {
1182        Self { basefee, blob_fee }
1183    }
1184
1185    /// Creates a new `BestTransactionsAttributes` with the given basefee.
1186    pub const fn base_fee(basefee: u64) -> Self {
1187        Self::new(basefee, None)
1188    }
1189
1190    /// Sets the given blob fee.
1191    pub const fn with_blob_fee(mut self, blob_fee: u64) -> Self {
1192        self.blob_fee = Some(blob_fee);
1193        self
1194    }
1195}
1196
1197/// Trait for transaction types stored in the transaction pool.
1198///
1199/// This trait represents the actual transaction object stored in the mempool, which includes not
1200/// only the transaction data itself but also additional metadata needed for efficient pool
1201/// operations. Implementations typically cache values that are frequently accessed during
1202/// transaction ordering, validation, and eviction.
1203///
1204/// ## Key Responsibilities
1205///
1206/// 1. **Metadata Caching**: Store computed values like address, cost and encoded size
1207/// 2. **Representation Conversion**: Handle conversions between consensus and pooled
1208///    representations
1209/// 3. **Validation Support**: Provide methods for pool-specific validation rules
1210///
1211/// ## Cached Metadata
1212///
1213/// Implementations should cache frequently accessed values to avoid recomputation:
1214/// - **Address**: Recovered sender address of the transaction
1215/// - **Cost**: Max amount spendable (gas × price + value + blob costs)
1216/// - **Size**: RLP encoded length for mempool size limits
1217///
1218/// See [`EthPooledTransaction`] for a reference implementation.
1219///
1220/// ## Transaction Representations
1221///
1222/// This trait abstracts over the different representations a transaction can have:
1223///
1224/// 1. **Consensus representation** (`Consensus` associated type): The canonical form included in
1225///    blocks
1226///    - Compact representation without networking metadata
1227///    - For EIP-4844: includes only blob hashes, not the actual blobs
1228///    - Used for block execution and state transitions
1229///
1230/// 2. **Pooled representation** (`Pooled` associated type): The form used for network propagation
1231///    - May include additional data for validation
1232///    - For EIP-4844: includes full blob sidecars (blobs, commitments, proofs)
1233///    - Used for mempool validation and p2p gossiping
1234///
1235/// ## Why Two Representations?
1236///
1237/// This distinction is necessary because:
1238///
1239/// - **EIP-4844 blob transactions**: Require large blob sidecars for validation that would bloat
1240///   blocks if included. Only blob hashes are stored on-chain.
1241///
1242/// - **Network efficiency**: Blob transactions are not broadcast to all peers automatically but
1243///   must be explicitly requested to reduce bandwidth usage.
1244///
1245/// - **Special transactions**: Some transactions (like OP deposit transactions) exist only in
1246///   consensus format and are never in the mempool.
1247///
1248/// ## Conversion Rules
1249///
1250/// - `Consensus` → `Pooled`: May fail for transactions that cannot be pooled (e.g., OP deposit
1251///   transactions, blob transactions without sidecars)
1252/// - `Pooled` → `Consensus`: Always succeeds (pooled is a superset)
1253pub trait PoolTransaction:
1254    alloy_consensus::Transaction + InMemorySize + Debug + Send + Sync + Clone
1255{
1256    /// Associated error type for the `try_from_consensus` method.
1257    type TryFromConsensusError: fmt::Display;
1258
1259    /// Associated type representing the raw consensus variant of the transaction.
1260    type Consensus: SignedTransaction + From<Self::Pooled>;
1261
1262    /// Associated type representing the recovered pooled variant of the transaction.
1263    type Pooled: TryFrom<Self::Consensus, Error = Self::TryFromConsensusError> + SignedTransaction;
1264
1265    /// Define a method to convert from the `Consensus` type to `Self`
1266    ///
1267    /// This conversion may fail for transactions that are valid for inclusion in blocks
1268    /// but cannot exist in the transaction pool. Examples include:
1269    ///
1270    /// - **OP Deposit transactions**: These are special system transactions that are directly
1271    ///   included in blocks by the sequencer/validator and never enter the mempool
1272    /// - **Blob transactions without sidecars**: After being included in a block, the sidecar data
1273    ///   is pruned, making the consensus transaction unpoolable
1274    fn try_from_consensus(
1275        tx: Recovered<Self::Consensus>,
1276    ) -> Result<Self, Self::TryFromConsensusError> {
1277        let (tx, signer) = tx.into_parts();
1278        Ok(Self::from_pooled(Recovered::new_unchecked(tx.try_into()?, signer)))
1279    }
1280
1281    /// Clone the transaction into a consensus variant.
1282    ///
1283    /// This method is preferred when the [`PoolTransaction`] already wraps the consensus variant.
1284    fn clone_into_consensus(&self) -> Recovered<Self::Consensus> {
1285        self.clone().into_consensus()
1286    }
1287
1288    /// Returns a reference to the consensus transaction with the recovered sender.
1289    fn consensus_ref(&self) -> Recovered<&Self::Consensus>;
1290
1291    /// Define a method to convert from the `Self` type to `Consensus`
1292    fn into_consensus(self) -> Recovered<Self::Consensus>;
1293
1294    /// Converts the transaction into consensus format while preserving the EIP-2718 encoded bytes.
1295    /// This is used to optimize transaction execution by reusing cached encoded bytes instead of
1296    /// re-encoding the transaction. The cached bytes are particularly useful in payload building
1297    /// where the same transaction may be executed multiple times.
1298    fn into_consensus_with2718(self) -> WithEncoded<Recovered<Self::Consensus>> {
1299        self.into_consensus().into_encoded()
1300    }
1301
1302    /// Define a method to convert from the `Pooled` type to `Self`
1303    fn from_pooled(pooled: Recovered<Self::Pooled>) -> Self;
1304
1305    /// Tries to convert the `Consensus` type into the `Pooled` type.
1306    fn try_into_pooled(self) -> Result<Recovered<Self::Pooled>, Self::TryFromConsensusError> {
1307        let consensus = self.into_consensus();
1308        let (tx, signer) = consensus.into_parts();
1309        Ok(Recovered::new_unchecked(tx.try_into()?, signer))
1310    }
1311
1312    /// Clones the consensus transactions and tries to convert the `Consensus` type into the
1313    /// `Pooled` type.
1314    fn clone_into_pooled(&self) -> Result<Recovered<Self::Pooled>, Self::TryFromConsensusError> {
1315        let consensus = self.clone_into_consensus();
1316        let (tx, signer) = consensus.into_parts();
1317        Ok(Recovered::new_unchecked(tx.try_into()?, signer))
1318    }
1319
1320    /// Converts the `Pooled` type into the `Consensus` type.
1321    fn pooled_into_consensus(tx: Self::Pooled) -> Self::Consensus {
1322        tx.into()
1323    }
1324
1325    /// Hash of the transaction.
1326    fn hash(&self) -> &TxHash;
1327
1328    /// The Sender of the transaction.
1329    fn sender(&self) -> Address;
1330
1331    /// Reference to the Sender of the transaction.
1332    fn sender_ref(&self) -> &Address;
1333
1334    /// Returns the cost that this transaction is allowed to consume:
1335    ///
1336    /// For EIP-1559 transactions: `max_fee_per_gas * gas_limit + tx_value`.
1337    /// For legacy transactions: `gas_price * gas_limit + tx_value`.
1338    /// For EIP-4844 blob transactions: `max_fee_per_gas * gas_limit + tx_value +
1339    /// max_blob_fee_per_gas * blob_gas_used`.
1340    fn cost(&self) -> &U256;
1341
1342    /// Returns the length of the rlp encoded transaction object
1343    ///
1344    /// Note: Implementations should cache this value.
1345    fn encoded_length(&self) -> usize;
1346
1347    /// Ensures that the transaction's code size does not exceed the provided `max_init_code_size`.
1348    ///
1349    /// This is specifically relevant for contract creation transactions ([`TxKind::Create`]),
1350    /// where the input data contains the initialization code. If the input code size exceeds
1351    /// the configured limit, an [`InvalidPoolTransactionError::ExceedsMaxInitCodeSize`] error is
1352    /// returned.
1353    fn ensure_max_init_code_size(
1354        &self,
1355        max_init_code_size: usize,
1356    ) -> Result<(), InvalidPoolTransactionError> {
1357        let input_len = self.input().len();
1358        if self.is_create() && input_len > max_init_code_size {
1359            Err(InvalidPoolTransactionError::ExceedsMaxInitCodeSize(input_len, max_init_code_size))
1360        } else {
1361            Ok(())
1362        }
1363    }
1364
1365    /// Allows to communicate to the pool that the transaction doesn't require a nonce check.
1366    fn requires_nonce_check(&self) -> bool {
1367        true
1368    }
1369}
1370
1371/// Super trait for transactions that can be converted to and from Eth transactions intended for the
1372/// ethereum style pool.
1373///
1374/// This extends the [`PoolTransaction`] trait with additional methods that are specific to the
1375/// Ethereum pool.
1376pub trait EthPoolTransaction: PoolTransaction {
1377    /// Extracts the blob sidecar from the transaction.
1378    fn take_blob(&mut self) -> EthBlobTransactionSidecar;
1379
1380    /// A specialization for the EIP-4844 transaction type.
1381    /// Tries to reattach the blob sidecar to the transaction.
1382    ///
1383    /// This returns an option, but callers should ensure that the transaction is an EIP-4844
1384    /// transaction: [`Typed2718::is_eip4844`].
1385    fn try_into_pooled_eip4844(
1386        self,
1387        sidecar: Arc<BlobTransactionSidecarVariant>,
1388    ) -> Option<Recovered<Self::Pooled>>;
1389
1390    /// Tries to convert the `Consensus` type with a blob sidecar into the `Pooled` type.
1391    ///
1392    /// Returns `None` if passed transaction is not a blob transaction.
1393    fn try_from_eip4844(
1394        tx: Recovered<Self::Consensus>,
1395        sidecar: BlobTransactionSidecarVariant,
1396    ) -> Option<Self>;
1397
1398    /// Validates the blob sidecar of the transaction with the given settings.
1399    fn validate_blob(
1400        &self,
1401        blob: &BlobTransactionSidecarVariant,
1402        settings: &KzgSettings,
1403    ) -> Result<(), BlobTransactionValidationError>;
1404}
1405
1406/// The default [`PoolTransaction`] for the [Pool](crate::Pool) for Ethereum.
1407///
1408/// This type wraps a consensus transaction with additional cached data that's
1409/// frequently accessed by the pool for transaction ordering and validation:
1410///
1411/// - `cost`: Pre-calculated max cost (gas * price + value + blob costs)
1412/// - `encoded_length`: Cached RLP encoding length for size limits
1413/// - `blob_sidecar`: Blob data state (None/Missing/Present)
1414///
1415/// This avoids recalculating these values repeatedly during pool operations.
1416#[derive(Debug, Clone, PartialEq, Eq)]
1417pub struct EthPooledTransaction<T = TransactionSigned> {
1418    /// `EcRecovered` transaction, the consensus format.
1419    pub transaction: Recovered<T>,
1420
1421    /// For EIP-1559 transactions: `max_fee_per_gas * gas_limit + tx_value`.
1422    /// For legacy transactions: `gas_price * gas_limit + tx_value`.
1423    /// For EIP-4844 blob transactions: `max_fee_per_gas * gas_limit + tx_value +
1424    /// max_blob_fee_per_gas * blob_gas_used`.
1425    pub cost: U256,
1426
1427    /// This is the RLP length of the transaction, computed when the transaction is added to the
1428    /// pool.
1429    pub encoded_length: usize,
1430
1431    /// The blob side car for this transaction
1432    pub blob_sidecar: EthBlobTransactionSidecar,
1433}
1434
1435impl<T: SignedTransaction> EthPooledTransaction<T> {
1436    /// Create new instance of [Self].
1437    ///
1438    /// Caution: In case of blob transactions, this does marks the blob sidecar as
1439    /// [`EthBlobTransactionSidecar::Missing`]
1440    pub fn new(transaction: Recovered<T>, encoded_length: usize) -> Self {
1441        let mut blob_sidecar = EthBlobTransactionSidecar::None;
1442
1443        let gas_cost = U256::from(transaction.max_fee_per_gas())
1444            .saturating_mul(U256::from(transaction.gas_limit()));
1445
1446        let mut cost = gas_cost.saturating_add(transaction.value());
1447
1448        if let (Some(blob_gas_used), Some(max_fee_per_blob_gas)) =
1449            (transaction.blob_gas_used(), transaction.max_fee_per_blob_gas())
1450        {
1451            // Add max blob cost using saturating math to avoid overflow
1452            cost = cost.saturating_add(U256::from(
1453                max_fee_per_blob_gas.saturating_mul(blob_gas_used as u128),
1454            ));
1455
1456            // because the blob sidecar is not included in this transaction variant, mark it as
1457            // missing
1458            blob_sidecar = EthBlobTransactionSidecar::Missing;
1459        }
1460
1461        Self { transaction, cost, encoded_length, blob_sidecar }
1462    }
1463
1464    /// Return the reference to the underlying transaction.
1465    pub const fn transaction(&self) -> &Recovered<T> {
1466        &self.transaction
1467    }
1468}
1469
1470impl PoolTransaction for EthPooledTransaction {
1471    type TryFromConsensusError = ValueError<TransactionSigned>;
1472
1473    type Consensus = TransactionSigned;
1474
1475    type Pooled = PooledTransactionVariant;
1476
1477    fn clone_into_consensus(&self) -> Recovered<Self::Consensus> {
1478        self.transaction().clone()
1479    }
1480
1481    fn consensus_ref(&self) -> Recovered<&Self::Consensus> {
1482        Recovered::new_unchecked(&*self.transaction, self.transaction.signer())
1483    }
1484
1485    fn into_consensus(self) -> Recovered<Self::Consensus> {
1486        self.transaction
1487    }
1488
1489    fn from_pooled(tx: Recovered<Self::Pooled>) -> Self {
1490        let encoded_length = tx.encode_2718_len();
1491        let (tx, signer) = tx.into_parts();
1492        match tx {
1493            PooledTransactionVariant::Eip4844(tx) => {
1494                // include the blob sidecar
1495                let (tx, sig, hash) = tx.into_parts();
1496                let (tx, blob) = tx.into_parts();
1497                let tx = Signed::new_unchecked(tx, sig, hash);
1498                let tx = TransactionSigned::from(tx);
1499                let tx = Recovered::new_unchecked(tx, signer);
1500                let mut pooled = Self::new(tx, encoded_length);
1501                pooled.blob_sidecar = EthBlobTransactionSidecar::Present(blob);
1502                pooled
1503            }
1504            tx => {
1505                // no blob sidecar
1506                let tx = Recovered::new_unchecked(tx.into(), signer);
1507                Self::new(tx, encoded_length)
1508            }
1509        }
1510    }
1511
1512    /// Returns hash of the transaction.
1513    fn hash(&self) -> &TxHash {
1514        self.transaction.tx_hash()
1515    }
1516
1517    /// Returns the Sender of the transaction.
1518    fn sender(&self) -> Address {
1519        self.transaction.signer()
1520    }
1521
1522    /// Returns a reference to the Sender of the transaction.
1523    fn sender_ref(&self) -> &Address {
1524        self.transaction.signer_ref()
1525    }
1526
1527    /// Returns the cost that this transaction is allowed to consume:
1528    ///
1529    /// For EIP-1559 transactions: `max_fee_per_gas * gas_limit + tx_value`.
1530    /// For legacy transactions: `gas_price * gas_limit + tx_value`.
1531    /// For EIP-4844 blob transactions: `max_fee_per_gas * gas_limit + tx_value +
1532    /// max_blob_fee_per_gas * blob_gas_used`.
1533    fn cost(&self) -> &U256 {
1534        &self.cost
1535    }
1536
1537    /// Returns the length of the rlp encoded object
1538    fn encoded_length(&self) -> usize {
1539        self.encoded_length
1540    }
1541}
1542
1543impl<T: Typed2718> Typed2718 for EthPooledTransaction<T> {
1544    fn ty(&self) -> u8 {
1545        self.transaction.ty()
1546    }
1547}
1548
1549impl<T: InMemorySize> InMemorySize for EthPooledTransaction<T> {
1550    fn size(&self) -> usize {
1551        self.transaction.size()
1552    }
1553}
1554
1555impl<T: alloy_consensus::Transaction> alloy_consensus::Transaction for EthPooledTransaction<T> {
1556    fn chain_id(&self) -> Option<alloy_primitives::ChainId> {
1557        self.transaction.chain_id()
1558    }
1559
1560    fn nonce(&self) -> u64 {
1561        self.transaction.nonce()
1562    }
1563
1564    fn gas_limit(&self) -> u64 {
1565        self.transaction.gas_limit()
1566    }
1567
1568    fn gas_price(&self) -> Option<u128> {
1569        self.transaction.gas_price()
1570    }
1571
1572    fn max_fee_per_gas(&self) -> u128 {
1573        self.transaction.max_fee_per_gas()
1574    }
1575
1576    fn max_priority_fee_per_gas(&self) -> Option<u128> {
1577        self.transaction.max_priority_fee_per_gas()
1578    }
1579
1580    fn max_fee_per_blob_gas(&self) -> Option<u128> {
1581        self.transaction.max_fee_per_blob_gas()
1582    }
1583
1584    fn priority_fee_or_price(&self) -> u128 {
1585        self.transaction.priority_fee_or_price()
1586    }
1587
1588    fn effective_gas_price(&self, base_fee: Option<u64>) -> u128 {
1589        self.transaction.effective_gas_price(base_fee)
1590    }
1591
1592    fn is_dynamic_fee(&self) -> bool {
1593        self.transaction.is_dynamic_fee()
1594    }
1595
1596    fn kind(&self) -> TxKind {
1597        self.transaction.kind()
1598    }
1599
1600    fn is_create(&self) -> bool {
1601        self.transaction.is_create()
1602    }
1603
1604    fn value(&self) -> U256 {
1605        self.transaction.value()
1606    }
1607
1608    fn input(&self) -> &Bytes {
1609        self.transaction.input()
1610    }
1611
1612    fn access_list(&self) -> Option<&AccessList> {
1613        self.transaction.access_list()
1614    }
1615
1616    fn blob_versioned_hashes(&self) -> Option<&[B256]> {
1617        self.transaction.blob_versioned_hashes()
1618    }
1619
1620    fn authorization_list(&self) -> Option<&[SignedAuthorization]> {
1621        self.transaction.authorization_list()
1622    }
1623}
1624
1625impl EthPoolTransaction for EthPooledTransaction {
1626    fn take_blob(&mut self) -> EthBlobTransactionSidecar {
1627        if self.is_eip4844() {
1628            std::mem::replace(&mut self.blob_sidecar, EthBlobTransactionSidecar::Missing)
1629        } else {
1630            EthBlobTransactionSidecar::None
1631        }
1632    }
1633
1634    fn try_into_pooled_eip4844(
1635        self,
1636        sidecar: Arc<BlobTransactionSidecarVariant>,
1637    ) -> Option<Recovered<Self::Pooled>> {
1638        let (signed_transaction, signer) = self.into_consensus().into_parts();
1639        let pooled_transaction =
1640            signed_transaction.try_into_pooled_eip4844(Arc::unwrap_or_clone(sidecar)).ok()?;
1641
1642        Some(Recovered::new_unchecked(pooled_transaction, signer))
1643    }
1644
1645    fn try_from_eip4844(
1646        tx: Recovered<Self::Consensus>,
1647        sidecar: BlobTransactionSidecarVariant,
1648    ) -> Option<Self> {
1649        let (tx, signer) = tx.into_parts();
1650        tx.try_into_pooled_eip4844(sidecar)
1651            .ok()
1652            .map(|tx| tx.with_signer(signer))
1653            .map(Self::from_pooled)
1654    }
1655
1656    fn validate_blob(
1657        &self,
1658        sidecar: &BlobTransactionSidecarVariant,
1659        settings: &KzgSettings,
1660    ) -> Result<(), BlobTransactionValidationError> {
1661        match self.transaction.inner().as_eip4844() {
1662            Some(tx) => tx.tx().validate_blob(sidecar, settings),
1663            _ => Err(BlobTransactionValidationError::NotBlobTransaction(self.ty())),
1664        }
1665    }
1666}
1667
1668/// Represents the blob sidecar of the [`EthPooledTransaction`].
1669///
1670/// EIP-4844 blob transactions require additional data (blobs, commitments, proofs)
1671/// for validation that is not included in the consensus format. This enum tracks
1672/// the sidecar state throughout the transaction's lifecycle in the pool.
1673#[derive(Debug, Clone, PartialEq, Eq)]
1674pub enum EthBlobTransactionSidecar {
1675    /// This transaction does not have a blob sidecar
1676    /// (applies to all non-EIP-4844 transaction types)
1677    None,
1678    /// This transaction has a blob sidecar (EIP-4844) but it is missing.
1679    ///
1680    /// This can happen when:
1681    /// - The sidecar was extracted after the transaction was added to the pool
1682    /// - The transaction was re-injected after a reorg without its sidecar
1683    /// - The transaction was recovered from the consensus format (e.g., from a block)
1684    Missing,
1685    /// The EIP-4844 transaction was received from the network with its complete sidecar.
1686    ///
1687    /// This sidecar contains:
1688    /// - The actual blob data (large data per blob)
1689    /// - KZG commitments for each blob
1690    /// - KZG proofs for validation
1691    ///
1692    /// The sidecar is required for validating the transaction but is not included
1693    /// in blocks (only the blob hashes are included in the consensus format).
1694    Present(BlobTransactionSidecarVariant),
1695}
1696
1697impl EthBlobTransactionSidecar {
1698    /// Returns the blob sidecar if it is present
1699    pub const fn maybe_sidecar(&self) -> Option<&BlobTransactionSidecarVariant> {
1700        match self {
1701            Self::Present(sidecar) => Some(sidecar),
1702            _ => None,
1703        }
1704    }
1705}
1706
1707/// Represents the current status of the pool.
1708#[derive(Debug, Clone, Copy, Default)]
1709pub struct PoolSize {
1710    /// Number of transactions in the _pending_ sub-pool.
1711    pub pending: usize,
1712    /// Reported size of transactions in the _pending_ sub-pool.
1713    pub pending_size: usize,
1714    /// Number of transactions in the _blob_ pool.
1715    pub blob: usize,
1716    /// Reported size of transactions in the _blob_ pool.
1717    pub blob_size: usize,
1718    /// Number of transactions in the _basefee_ pool.
1719    pub basefee: usize,
1720    /// Reported size of transactions in the _basefee_ sub-pool.
1721    pub basefee_size: usize,
1722    /// Number of transactions in the _queued_ sub-pool.
1723    pub queued: usize,
1724    /// Reported size of transactions in the _queued_ sub-pool.
1725    pub queued_size: usize,
1726    /// Number of all transactions of all sub-pools
1727    ///
1728    /// Note: this is the sum of ```pending + basefee + queued + blob```
1729    pub total: usize,
1730}
1731
1732// === impl PoolSize ===
1733
1734impl PoolSize {
1735    /// Asserts that the invariants of the pool size are met.
1736    #[cfg(test)]
1737    pub(crate) fn assert_invariants(&self) {
1738        assert_eq!(self.total, self.pending + self.basefee + self.queued + self.blob);
1739    }
1740}
1741
1742/// Represents the current status of the pool.
1743#[derive(Default, Debug, Clone, Copy, Eq, PartialEq)]
1744pub struct BlockInfo {
1745    /// Hash for the currently tracked block.
1746    pub last_seen_block_hash: B256,
1747    /// Currently tracked block.
1748    pub last_seen_block_number: u64,
1749    /// Current block gas limit for the latest block.
1750    pub block_gas_limit: u64,
1751    /// Currently enforced base fee: the threshold for the basefee sub-pool.
1752    ///
1753    /// Note: this is the derived base fee of the _next_ block that builds on the block the pool is
1754    /// currently tracking.
1755    pub pending_basefee: u64,
1756    /// Currently enforced blob fee: the threshold for eip-4844 blob transactions.
1757    ///
1758    /// Note: this is the derived blob fee of the _next_ block that builds on the block the pool is
1759    /// currently tracking
1760    pub pending_blob_fee: Option<u128>,
1761}
1762
1763/// The limit to enforce for [`TransactionPool::get_pooled_transaction_elements`].
1764#[derive(Debug, Clone, Copy, Eq, PartialEq)]
1765pub enum GetPooledTransactionLimit {
1766    /// No limit, return all transactions.
1767    None,
1768    /// Enforce a size limit on the returned transactions, for example 2MB
1769    ResponseSizeSoftLimit(usize),
1770}
1771
1772impl GetPooledTransactionLimit {
1773    /// Returns true if the given size exceeds the limit.
1774    #[inline]
1775    pub const fn exceeds(&self, size: usize) -> bool {
1776        match self {
1777            Self::None => false,
1778            Self::ResponseSizeSoftLimit(limit) => size > *limit,
1779        }
1780    }
1781}
1782
1783/// A Stream that yields full transactions the subpool
1784#[must_use = "streams do nothing unless polled"]
1785#[derive(Debug)]
1786pub struct NewSubpoolTransactionStream<Tx: PoolTransaction> {
1787    st: Receiver<NewTransactionEvent<Tx>>,
1788    subpool: SubPool,
1789}
1790
1791// === impl NewSubpoolTransactionStream ===
1792
1793impl<Tx: PoolTransaction> NewSubpoolTransactionStream<Tx> {
1794    /// Create a new stream that yields full transactions from the subpool
1795    pub const fn new(st: Receiver<NewTransactionEvent<Tx>>, subpool: SubPool) -> Self {
1796        Self { st, subpool }
1797    }
1798
1799    /// Tries to receive the next value for this stream.
1800    pub fn try_recv(
1801        &mut self,
1802    ) -> Result<NewTransactionEvent<Tx>, tokio::sync::mpsc::error::TryRecvError> {
1803        loop {
1804            match self.st.try_recv() {
1805                Ok(event) => {
1806                    if event.subpool == self.subpool {
1807                        return Ok(event)
1808                    }
1809                }
1810                Err(e) => return Err(e),
1811            }
1812        }
1813    }
1814}
1815
1816impl<Tx: PoolTransaction> Stream for NewSubpoolTransactionStream<Tx> {
1817    type Item = NewTransactionEvent<Tx>;
1818
1819    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1820        loop {
1821            match ready!(self.st.poll_recv(cx)) {
1822                Some(event) => {
1823                    if event.subpool == self.subpool {
1824                        return Poll::Ready(Some(event))
1825                    }
1826                }
1827                None => return Poll::Ready(None),
1828            }
1829        }
1830    }
1831}
1832
1833#[cfg(test)]
1834mod tests {
1835    use super::*;
1836    use alloy_consensus::{
1837        EthereumTxEnvelope, SignableTransaction, TxEip1559, TxEip2930, TxEip4844, TxEip7702,
1838        TxEnvelope, TxLegacy,
1839    };
1840    use alloy_eips::eip4844::DATA_GAS_PER_BLOB;
1841    use alloy_primitives::Signature;
1842
1843    #[test]
1844    fn test_pool_size_invariants() {
1845        let pool_size = PoolSize {
1846            pending: 10,
1847            pending_size: 1000,
1848            blob: 5,
1849            blob_size: 500,
1850            basefee: 8,
1851            basefee_size: 800,
1852            queued: 7,
1853            queued_size: 700,
1854            total: 10 + 5 + 8 + 7, // Correct total
1855        };
1856
1857        // Call the assert_invariants method to check if the invariants are correct
1858        pool_size.assert_invariants();
1859    }
1860
1861    #[test]
1862    #[should_panic]
1863    fn test_pool_size_invariants_fail() {
1864        let pool_size = PoolSize {
1865            pending: 10,
1866            pending_size: 1000,
1867            blob: 5,
1868            blob_size: 500,
1869            basefee: 8,
1870            basefee_size: 800,
1871            queued: 7,
1872            queued_size: 700,
1873            total: 10 + 5 + 8, // Incorrect total
1874        };
1875
1876        // Call the assert_invariants method, which should panic
1877        pool_size.assert_invariants();
1878    }
1879
1880    #[test]
1881    fn test_eth_pooled_transaction_new_legacy() {
1882        // Create a legacy transaction with specific parameters
1883        let tx = TxEnvelope::Legacy(
1884            TxLegacy {
1885                gas_price: 10,
1886                gas_limit: 1000,
1887                value: U256::from(100),
1888                ..Default::default()
1889            }
1890            .into_signed(Signature::test_signature()),
1891        );
1892        let transaction = Recovered::new_unchecked(tx, Default::default());
1893        let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1894
1895        // Check that the pooled transaction is created correctly
1896        assert_eq!(pooled_tx.transaction, transaction);
1897        assert_eq!(pooled_tx.encoded_length, 200);
1898        assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1899        assert_eq!(pooled_tx.cost, U256::from(100) + U256::from(10 * 1000));
1900    }
1901
1902    #[test]
1903    fn test_eth_pooled_transaction_new_eip2930() {
1904        // Create an EIP-2930 transaction with specific parameters
1905        let tx = TxEnvelope::Eip2930(
1906            TxEip2930 {
1907                gas_price: 10,
1908                gas_limit: 1000,
1909                value: U256::from(100),
1910                ..Default::default()
1911            }
1912            .into_signed(Signature::test_signature()),
1913        );
1914        let transaction = Recovered::new_unchecked(tx, Default::default());
1915        let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1916        let expected_cost = U256::from(100) + (U256::from(10 * 1000));
1917
1918        assert_eq!(pooled_tx.transaction, transaction);
1919        assert_eq!(pooled_tx.encoded_length, 200);
1920        assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1921        assert_eq!(pooled_tx.cost, expected_cost);
1922    }
1923
1924    #[test]
1925    fn test_eth_pooled_transaction_new_eip1559() {
1926        // Create an EIP-1559 transaction with specific parameters
1927        let tx = TxEnvelope::Eip1559(
1928            TxEip1559 {
1929                max_fee_per_gas: 10,
1930                gas_limit: 1000,
1931                value: U256::from(100),
1932                ..Default::default()
1933            }
1934            .into_signed(Signature::test_signature()),
1935        );
1936        let transaction = Recovered::new_unchecked(tx, Default::default());
1937        let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1938
1939        // Check that the pooled transaction is created correctly
1940        assert_eq!(pooled_tx.transaction, transaction);
1941        assert_eq!(pooled_tx.encoded_length, 200);
1942        assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1943        assert_eq!(pooled_tx.cost, U256::from(100) + U256::from(10 * 1000));
1944    }
1945
1946    #[test]
1947    fn test_eth_pooled_transaction_new_eip4844() {
1948        // Create an EIP-4844 transaction with specific parameters
1949        let tx = EthereumTxEnvelope::Eip4844(
1950            TxEip4844 {
1951                max_fee_per_gas: 10,
1952                gas_limit: 1000,
1953                value: U256::from(100),
1954                max_fee_per_blob_gas: 5,
1955                blob_versioned_hashes: vec![B256::default()],
1956                ..Default::default()
1957            }
1958            .into_signed(Signature::test_signature()),
1959        );
1960        let transaction = Recovered::new_unchecked(tx, Default::default());
1961        let pooled_tx = EthPooledTransaction::new(transaction.clone(), 300);
1962
1963        // Check that the pooled transaction is created correctly
1964        assert_eq!(pooled_tx.transaction, transaction);
1965        assert_eq!(pooled_tx.encoded_length, 300);
1966        assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::Missing);
1967        let expected_cost =
1968            U256::from(100) + U256::from(10 * 1000) + U256::from(5 * DATA_GAS_PER_BLOB);
1969        assert_eq!(pooled_tx.cost, expected_cost);
1970    }
1971
1972    #[test]
1973    fn test_eth_pooled_transaction_new_eip7702() {
1974        // Init an EIP-7702 transaction with specific parameters
1975        let tx = EthereumTxEnvelope::<TxEip4844>::Eip7702(
1976            TxEip7702 {
1977                max_fee_per_gas: 10,
1978                gas_limit: 1000,
1979                value: U256::from(100),
1980                ..Default::default()
1981            }
1982            .into_signed(Signature::test_signature()),
1983        );
1984        let transaction = Recovered::new_unchecked(tx, Default::default());
1985        let pooled_tx = EthPooledTransaction::new(transaction.clone(), 200);
1986
1987        // Check that the pooled transaction is created correctly
1988        assert_eq!(pooled_tx.transaction, transaction);
1989        assert_eq!(pooled_tx.encoded_length, 200);
1990        assert_eq!(pooled_tx.blob_sidecar, EthBlobTransactionSidecar::None);
1991        assert_eq!(pooled_tx.cost, U256::from(100) + U256::from(10 * 1000));
1992    }
1993
1994    #[test]
1995    fn test_pooled_transaction_limit() {
1996        // No limit should never exceed
1997        let limit_none = GetPooledTransactionLimit::None;
1998        // Any size should return false
1999        assert!(!limit_none.exceeds(1000));
2000
2001        // Size limit of 2MB (2 * 1024 * 1024 bytes)
2002        let size_limit_2mb = GetPooledTransactionLimit::ResponseSizeSoftLimit(2 * 1024 * 1024);
2003
2004        // Test with size below the limit
2005        // 1MB is below 2MB, should return false
2006        assert!(!size_limit_2mb.exceeds(1024 * 1024));
2007
2008        // Test with size exactly at the limit
2009        // 2MB equals the limit, should return false
2010        assert!(!size_limit_2mb.exceeds(2 * 1024 * 1024));
2011
2012        // Test with size exceeding the limit
2013        // 3MB is above the 2MB limit, should return true
2014        assert!(size_limit_2mb.exceeds(3 * 1024 * 1024));
2015    }
2016}