reth_transaction_pool/pool/
mod.rs

1//! Transaction Pool internals.
2//!
3//! Incoming transactions are validated before they enter the pool first. The validation outcome can
4//! have 3 states:
5//!
6//!  1. Transaction can _never_ be valid
7//!  2. Transaction is _currently_ valid
8//!  3. Transaction is _currently_ invalid, but could potentially become valid in the future
9//!
10//! However, (2.) and (3.) of a transaction can only be determined on the basis of the current
11//! state, whereas (1.) holds indefinitely. This means once the state changes (2.) and (3.) the
12//! state of a transaction needs to be reevaluated again.
13//!
14//! The transaction pool is responsible for storing new, valid transactions and providing the next
15//! best transactions sorted by their priority. Where priority is determined by the transaction's
16//! score ([`TransactionOrdering`]).
17//!
18//! Furthermore, the following characteristics fall under (3.):
19//!
20//!  a) Nonce of a transaction is higher than the expected nonce for the next transaction of its
21//! sender. A distinction is made here whether multiple transactions from the same sender have
22//! gapless nonce increments.
23//!
24//!  a)(1) If _no_ transaction is missing in a chain of multiple
25//! transactions from the same sender (all nonce in row), all of them can in principle be executed
26//! on the current state one after the other.
27//!
28//!  a)(2) If there's a nonce gap, then all
29//! transactions after the missing transaction are blocked until the missing transaction arrives.
30//!
31//!  b) Transaction does not meet the dynamic fee cap requirement introduced by EIP-1559: The
32//! fee cap of the transaction needs to be no less than the base fee of block.
33//!
34//!
35//! In essence the transaction pool is made of three separate sub-pools:
36//!
37//!  - Pending Pool: Contains all transactions that are valid on the current state and satisfy (3.
38//!    a)(1): _No_ nonce gaps. A _pending_ transaction is considered _ready_ when it has the lowest
39//!    nonce of all transactions from the same sender. Once a _ready_ transaction with nonce `n` has
40//!    been executed, the next highest transaction from the same sender `n + 1` becomes ready.
41//!
42//!  - Queued Pool: Contains all transactions that are currently blocked by missing transactions:
43//!    (3. a)(2): _With_ nonce gaps or due to lack of funds.
44//!
45//!  - Basefee Pool: To account for the dynamic base fee requirement (3. b) which could render an
46//!    EIP-1559 and all subsequent transactions of the sender currently invalid.
47//!
48//! The classification of transactions is always dependent on the current state that is changed as
49//! soon as a new block is mined. Once a new block is mined, the account changeset must be applied
50//! to the transaction pool.
51//!
52//!
53//! Depending on the use case, consumers of the [`TransactionPool`](crate::traits::TransactionPool)
54//! are interested in (2.) and/or (3.).
55
56//! A generic [`TransactionPool`](crate::traits::TransactionPool) that only handles transactions.
57//!
58//! This Pool maintains two separate sub-pools for (2.) and (3.)
59//!
60//! ## Terminology
61//!
62//!  - _Pending_: pending transactions are transactions that fall under (2.). These transactions can
63//!    currently be executed and are stored in the pending sub-pool
64//!  - _Queued_: queued transactions are transactions that fall under category (3.). Those
65//!    transactions are _currently_ waiting for state changes that eventually move them into
66//!    category (2.) and become pending.
67
68use crate::{
69    blobstore::BlobStore,
70    error::{PoolError, PoolErrorKind, PoolResult},
71    identifier::{SenderId, SenderIdentifiers, TransactionId},
72    metrics::BlobStoreMetrics,
73    pool::{
74        listener::{
75            BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76            TransactionListener,
77        },
78        state::SubPool,
79        txpool::{SenderInfo, TxPool},
80        update::UpdateOutcome,
81    },
82    traits::{
83        AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84        NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85    },
86    validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87    CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88    TransactionValidator,
89};
90
91use alloy_primitives::{Address, TxHash, B256};
92use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
93use reth_eth_wire_types::HandleMempoolData;
94use reth_execution_types::ChangedAccount;
95
96use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718};
97use reth_primitives_traits::Recovered;
98use rustc_hash::FxHashMap;
99use std::{collections::HashSet, fmt, sync::Arc, time::Instant};
100use tokio::sync::mpsc;
101use tracing::{debug, trace, warn};
102mod events;
103pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
104pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
105pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
106pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
107pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
108pub use pending::PendingPool;
109use reth_primitives_traits::Block;
110
111mod best;
112pub use best::BestTransactions;
113
114mod blob;
115pub mod listener;
116mod parked;
117pub mod pending;
118pub mod size;
119pub(crate) mod state;
120pub mod txpool;
121mod update;
122
123/// Bound on number of pending transactions from `reth_network::TransactionsManager` to buffer.
124pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
125/// Bound on number of new transactions from `reth_network::TransactionsManager` to buffer.
126pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
127
128const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
129
130/// Transaction pool internals.
131pub struct PoolInner<V, T, S>
132where
133    T: TransactionOrdering,
134{
135    /// Internal mapping of addresses to plain ints.
136    identifiers: RwLock<SenderIdentifiers>,
137    /// Transaction validator.
138    validator: V,
139    /// Storage for blob transactions
140    blob_store: S,
141    /// The internal pool that manages all transactions.
142    pool: RwLock<TxPool<T>>,
143    /// Pool settings.
144    config: PoolConfig,
145    /// Manages listeners for transaction state change events.
146    event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
147    /// Listeners for new _full_ pending transactions.
148    pending_transaction_listener: Mutex<Vec<PendingTransactionHashListener>>,
149    /// Listeners for new transactions added to the pool.
150    transaction_listener: Mutex<Vec<TransactionListener<T::Transaction>>>,
151    /// Listener for new blob transaction sidecars added to the pool.
152    blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
153    /// Metrics for the blob store
154    blob_store_metrics: BlobStoreMetrics,
155}
156
157// === impl PoolInner ===
158
159impl<V, T, S> PoolInner<V, T, S>
160where
161    V: TransactionValidator,
162    T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
163    S: BlobStore,
164{
165    /// Create a new transaction pool instance.
166    pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
167        Self {
168            identifiers: Default::default(),
169            validator,
170            event_listener: Default::default(),
171            pool: RwLock::new(TxPool::new(ordering, config.clone())),
172            pending_transaction_listener: Default::default(),
173            transaction_listener: Default::default(),
174            blob_transaction_sidecar_listener: Default::default(),
175            config,
176            blob_store,
177            blob_store_metrics: Default::default(),
178        }
179    }
180
181    /// Returns the configured blob store.
182    pub const fn blob_store(&self) -> &S {
183        &self.blob_store
184    }
185
186    /// Returns stats about the size of the pool.
187    pub fn size(&self) -> PoolSize {
188        self.get_pool_data().size()
189    }
190
191    /// Returns the currently tracked block
192    pub fn block_info(&self) -> BlockInfo {
193        self.get_pool_data().block_info()
194    }
195    /// Sets the currently tracked block
196    pub fn set_block_info(&self, info: BlockInfo) {
197        self.pool.write().set_block_info(info)
198    }
199
200    /// Returns the internal [`SenderId`] for this address
201    pub fn get_sender_id(&self, addr: Address) -> SenderId {
202        self.identifiers.write().sender_id_or_create(addr)
203    }
204
205    /// Returns the internal [`SenderId`]s for the given addresses.
206    pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
207        self.identifiers.write().sender_ids_or_create(addrs)
208    }
209
210    /// Returns all senders in the pool
211    pub fn unique_senders(&self) -> HashSet<Address> {
212        self.get_pool_data().unique_senders()
213    }
214
215    /// Converts the changed accounts to a map of sender ids to sender info (internal identifier
216    /// used for __tracked__ accounts)
217    fn changed_senders(
218        &self,
219        accs: impl Iterator<Item = ChangedAccount>,
220    ) -> FxHashMap<SenderId, SenderInfo> {
221        let identifiers = self.identifiers.read();
222        accs.into_iter()
223            .filter_map(|acc| {
224                let ChangedAccount { address, nonce, balance } = acc;
225                let sender_id = identifiers.sender_id(&address)?;
226                Some((sender_id, SenderInfo { state_nonce: nonce, balance }))
227            })
228            .collect()
229    }
230
231    /// Get the config the pool was configured with.
232    pub const fn config(&self) -> &PoolConfig {
233        &self.config
234    }
235
236    /// Get the validator reference.
237    pub const fn validator(&self) -> &V {
238        &self.validator
239    }
240
241    /// Adds a new transaction listener to the pool that gets notified about every new _pending_
242    /// transaction inserted into the pool
243    pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
244        let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
245        let listener = PendingTransactionHashListener { sender, kind };
246        self.pending_transaction_listener.lock().push(listener);
247        rx
248    }
249
250    /// Adds a new transaction listener to the pool that gets notified about every new transaction.
251    pub fn add_new_transaction_listener(
252        &self,
253        kind: TransactionListenerKind,
254    ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
255        let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
256        let listener = TransactionListener { sender, kind };
257        self.transaction_listener.lock().push(listener);
258        rx
259    }
260    /// Adds a new blob sidecar listener to the pool that gets notified about every new
261    /// eip4844 transaction's blob sidecar.
262    pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
263        let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
264        let listener = BlobTransactionSidecarListener { sender };
265        self.blob_transaction_sidecar_listener.lock().push(listener);
266        rx
267    }
268
269    /// If the pool contains the transaction, this adds a new listener that gets notified about
270    /// transaction events.
271    pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
272        self.get_pool_data()
273            .contains(&tx_hash)
274            .then(|| self.event_listener.write().subscribe(tx_hash))
275    }
276
277    /// Adds a listener for all transaction events.
278    pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
279        self.event_listener.write().subscribe_all()
280    }
281
282    /// Returns a read lock to the pool's data.
283    pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
284        self.pool.read()
285    }
286
287    /// Returns transactions in the pool that can be propagated
288    pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
289        let mut out = Vec::new();
290        self.append_pooled_transactions(&mut out);
291        out
292    }
293
294    /// Returns hashes of transactions in the pool that can be propagated.
295    pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
296        let mut out = Vec::new();
297        self.append_pooled_transactions_hashes(&mut out);
298        out
299    }
300
301    /// Returns only the first `max` transactions in the pool that can be propagated.
302    pub fn pooled_transactions_max(
303        &self,
304        max: usize,
305    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
306        let mut out = Vec::new();
307        self.append_pooled_transactions_max(max, &mut out);
308        out
309    }
310
311    /// Extends the given vector with all transactions in the pool that can be propagated.
312    pub fn append_pooled_transactions(
313        &self,
314        out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
315    ) {
316        out.extend(
317            self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned(),
318        );
319    }
320
321    /// Extends the given vector with the hashes of all transactions in the pool that can be
322    /// propagated.
323    pub fn append_pooled_transactions_hashes(&self, out: &mut Vec<TxHash>) {
324        out.extend(
325            self.get_pool_data()
326                .all()
327                .transactions_iter()
328                .filter(|tx| tx.propagate)
329                .map(|tx| *tx.hash()),
330        );
331    }
332
333    /// Extends the given vector with only the first `max` transactions in the pool that can be
334    /// propagated.
335    pub fn append_pooled_transactions_max(
336        &self,
337        max: usize,
338        out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
339    ) {
340        out.extend(
341            self.get_pool_data()
342                .all()
343                .transactions_iter()
344                .filter(|tx| tx.propagate)
345                .take(max)
346                .cloned(),
347        );
348    }
349
350    /// Returns only the first `max` hashes of transactions in the pool that can be propagated.
351    pub fn pooled_transactions_hashes_max(&self, max: usize) -> Vec<TxHash> {
352        if max == 0 {
353            return Vec::new();
354        }
355        self.get_pool_data()
356            .all()
357            .transactions_iter()
358            .filter(|tx| tx.propagate)
359            .take(max)
360            .map(|tx| *tx.hash())
361            .collect()
362    }
363
364    /// Converts the internally tracked transaction to the pooled format.
365    ///
366    /// If the transaction is an EIP-4844 transaction, the blob sidecar is fetched from the blob
367    /// store and attached to the transaction.
368    fn to_pooled_transaction(
369        &self,
370        transaction: Arc<ValidPoolTransaction<T::Transaction>>,
371    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
372    where
373        <V as TransactionValidator>::Transaction: EthPoolTransaction,
374    {
375        if transaction.is_eip4844() {
376            let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
377            transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
378        } else {
379            transaction
380                .transaction
381                .clone_into_pooled()
382                .inspect_err(|err| {
383                    debug!(
384                        target: "txpool", %err,
385                        "failed to convert transaction to pooled element; skipping",
386                    );
387                })
388                .ok()
389        }
390    }
391
392    /// Returns pooled transactions for the given transaction hashes that are allowed to be
393    /// propagated.
394    pub fn get_pooled_transaction_elements(
395        &self,
396        tx_hashes: Vec<TxHash>,
397        limit: GetPooledTransactionLimit,
398    ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
399    where
400        <V as TransactionValidator>::Transaction: EthPoolTransaction,
401    {
402        let transactions = self.get_all_propagatable(tx_hashes);
403        let mut elements = Vec::with_capacity(transactions.len());
404        let mut size = 0;
405        for transaction in transactions {
406            let encoded_len = transaction.encoded_length();
407            let Some(pooled) = self.to_pooled_transaction(transaction) else {
408                continue;
409            };
410
411            size += encoded_len;
412            elements.push(pooled.into_inner());
413
414            if limit.exceeds(size) {
415                break
416            }
417        }
418
419        elements
420    }
421
422    /// Returns converted pooled transaction for the given transaction hash.
423    pub fn get_pooled_transaction_element(
424        &self,
425        tx_hash: TxHash,
426    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
427    where
428        <V as TransactionValidator>::Transaction: EthPoolTransaction,
429    {
430        self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
431    }
432
433    /// Updates the entire pool after a new block was executed.
434    pub fn on_canonical_state_change<B>(&self, update: CanonicalStateUpdate<'_, B>)
435    where
436        B: Block,
437    {
438        trace!(target: "txpool", ?update, "updating pool on canonical state change");
439
440        let block_info = update.block_info();
441        let CanonicalStateUpdate {
442            new_tip, changed_accounts, mined_transactions, update_kind, ..
443        } = update;
444        self.validator.on_new_head_block(new_tip);
445
446        let changed_senders = self.changed_senders(changed_accounts.into_iter());
447
448        // update the pool
449        let outcome = self.pool.write().on_canonical_state_change(
450            block_info,
451            mined_transactions,
452            changed_senders,
453            update_kind,
454        );
455
456        // This will discard outdated transactions based on the account's nonce
457        self.delete_discarded_blobs(outcome.discarded.iter());
458
459        // notify listeners about updates
460        self.notify_on_new_state(outcome);
461    }
462
463    /// Performs account updates on the pool.
464    ///
465    /// This will either promote or discard transactions based on the new account state.
466    ///
467    /// This should be invoked when the pool drifted and accounts are updated manually
468    pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
469        let changed_senders = self.changed_senders(accounts.into_iter());
470        let UpdateOutcome { promoted, discarded } =
471            self.pool.write().update_accounts(changed_senders);
472
473        self.notify_on_transaction_updates(promoted, discarded);
474    }
475
476    /// Add a single validated transaction into the pool.
477    ///
478    /// Note: this is only used internally by [`Self::add_transactions()`], all new transaction(s)
479    /// come in through that function, either as a batch or `std::iter::once`.
480    fn add_transaction(
481        &self,
482        pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
483        origin: TransactionOrigin,
484        tx: TransactionValidationOutcome<T::Transaction>,
485    ) -> PoolResult<AddedTransactionOutcome> {
486        match tx {
487            TransactionValidationOutcome::Valid {
488                balance,
489                state_nonce,
490                transaction,
491                propagate,
492                bytecode_hash,
493                authorities,
494            } => {
495                let sender_id = self.get_sender_id(transaction.sender());
496                let transaction_id = TransactionId::new(sender_id, transaction.nonce());
497
498                // split the valid transaction and the blob sidecar if it has any
499                let (transaction, maybe_sidecar) = match transaction {
500                    ValidTransaction::Valid(tx) => (tx, None),
501                    ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
502                        debug_assert!(
503                            transaction.is_eip4844(),
504                            "validator returned sidecar for non EIP-4844 transaction"
505                        );
506                        (transaction, Some(sidecar))
507                    }
508                };
509
510                let tx = ValidPoolTransaction {
511                    transaction,
512                    transaction_id,
513                    propagate,
514                    timestamp: Instant::now(),
515                    origin,
516                    authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
517                };
518
519                let added = pool.add_transaction(tx, balance, state_nonce, bytecode_hash)?;
520                let hash = *added.hash();
521                let state = added.transaction_state();
522
523                // transaction was successfully inserted into the pool
524                if let Some(sidecar) = maybe_sidecar {
525                    // notify blob sidecar listeners
526                    self.on_new_blob_sidecar(&hash, &sidecar);
527                    // store the sidecar in the blob store
528                    self.insert_blob(hash, sidecar);
529                }
530
531                if let Some(replaced) = added.replaced_blob_transaction() {
532                    debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
533                    // delete the replaced transaction from the blob store
534                    self.delete_blob(replaced);
535                }
536
537                // Notify about new pending transactions
538                if let Some(pending) = added.as_pending() {
539                    self.on_new_pending_transaction(pending);
540                }
541
542                // Notify tx event listeners
543                self.notify_event_listeners(&added);
544
545                if let Some(discarded) = added.discarded_transactions() {
546                    self.delete_discarded_blobs(discarded.iter());
547                }
548
549                // Notify listeners for _all_ transactions
550                self.on_new_transaction(added.into_new_transaction_event());
551
552                Ok(AddedTransactionOutcome { hash, state })
553            }
554            TransactionValidationOutcome::Invalid(tx, err) => {
555                let mut listener = self.event_listener.write();
556                listener.invalid(tx.hash());
557                Err(PoolError::new(*tx.hash(), err))
558            }
559            TransactionValidationOutcome::Error(tx_hash, err) => {
560                let mut listener = self.event_listener.write();
561                listener.discarded(&tx_hash);
562                Err(PoolError::other(tx_hash, err))
563            }
564        }
565    }
566
567    /// Adds a transaction and returns the event stream.
568    pub fn add_transaction_and_subscribe(
569        &self,
570        origin: TransactionOrigin,
571        tx: TransactionValidationOutcome<T::Transaction>,
572    ) -> PoolResult<TransactionEvents> {
573        let listener = {
574            let mut listener = self.event_listener.write();
575            listener.subscribe(tx.tx_hash())
576        };
577        let mut results = self.add_transactions(origin, std::iter::once(tx));
578        results.pop().expect("result length is the same as the input")?;
579        Ok(listener)
580    }
581
582    /// Adds all transactions in the iterator to the pool, returning a list of results.
583    ///
584    /// Note: A large batch may lock the pool for a long time that blocks important operations
585    /// like updating the pool on canonical state changes. The caller should consider having
586    /// a max batch size to balance transaction insertions with other updates.
587    pub fn add_transactions(
588        &self,
589        origin: TransactionOrigin,
590        transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
591    ) -> Vec<PoolResult<AddedTransactionOutcome>> {
592        // Process all transactions in one write lock, maintaining individual origins
593        let (mut added, discarded) = {
594            let mut pool = self.pool.write();
595            let added = transactions
596                .into_iter()
597                .map(|tx| self.add_transaction(&mut pool, origin, tx))
598                .collect::<Vec<_>>();
599
600            // Enforce the pool size limits if at least one transaction was added successfully
601            let discarded = if added.iter().any(Result::is_ok) {
602                pool.discard_worst()
603            } else {
604                Default::default()
605            };
606
607            (added, discarded)
608        };
609
610        if !discarded.is_empty() {
611            // Delete any blobs associated with discarded blob transactions
612            self.delete_discarded_blobs(discarded.iter());
613            self.event_listener.write().discarded_many(&discarded);
614
615            let discarded_hashes =
616                discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
617
618            // A newly added transaction may be immediately discarded, so we need to
619            // adjust the result here
620            for res in &mut added {
621                if let Ok(AddedTransactionOutcome { hash, .. }) = res &&
622                    discarded_hashes.contains(hash)
623                {
624                    *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
625                }
626            }
627        };
628
629        added
630    }
631
632    /// Notify all listeners about a new pending transaction.
633    ///
634    /// See also [`Self::add_pending_listener`]
635    ///
636    /// CAUTION: This function is only intended to be used manually in order to use this type's
637    /// pending transaction receivers when manually implementing the
638    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
639    /// [`TransactionPool::pending_transactions_listener_for`](crate::TransactionPool).
640    pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
641        let mut transaction_listeners = self.pending_transaction_listener.lock();
642        transaction_listeners.retain_mut(|listener| {
643            // broadcast all pending transactions to the listener
644            listener.send_all(pending.pending_transactions(listener.kind))
645        });
646    }
647
648    /// Notify all listeners about a newly inserted pending transaction.
649    ///
650    /// See also [`Self::add_new_transaction_listener`]
651    ///
652    /// CAUTION: This function is only intended to be used manually in order to use this type's
653    /// transaction receivers when manually implementing the
654    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
655    /// [`TransactionPool::new_transactions_listener_for`](crate::TransactionPool).
656    pub fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
657        let mut transaction_listeners = self.transaction_listener.lock();
658        transaction_listeners.retain_mut(|listener| {
659            if listener.kind.is_propagate_only() && !event.transaction.propagate {
660                // only emit this hash to listeners that are only allowed to receive propagate only
661                // transactions, such as network
662                return !listener.sender.is_closed()
663            }
664
665            listener.send(event.clone())
666        });
667    }
668
669    /// Notify all listeners about a blob sidecar for a newly inserted blob (eip4844) transaction.
670    fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecarVariant) {
671        let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
672        if sidecar_listeners.is_empty() {
673            return
674        }
675        let sidecar = Arc::new(sidecar.clone());
676        sidecar_listeners.retain_mut(|listener| {
677            let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
678            match listener.sender.try_send(new_blob_event) {
679                Ok(()) => true,
680                Err(err) => {
681                    if matches!(err, mpsc::error::TrySendError::Full(_)) {
682                        debug!(
683                            target: "txpool",
684                            "[{:?}] failed to send blob sidecar; channel full",
685                            sidecar,
686                        );
687                        true
688                    } else {
689                        false
690                    }
691                }
692            }
693        })
694    }
695
696    /// Notifies transaction listeners about changes once a block was processed.
697    fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
698        trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
699
700        // notify about promoted pending transactions
701        // emit hashes
702        self.pending_transaction_listener
703            .lock()
704            .retain_mut(|listener| listener.send_all(outcome.pending_transactions(listener.kind)));
705
706        // emit full transactions
707        self.transaction_listener.lock().retain_mut(|listener| {
708            listener.send_all(outcome.full_pending_transactions(listener.kind))
709        });
710
711        let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
712
713        // broadcast specific transaction events
714        let mut listener = self.event_listener.write();
715
716        if !listener.is_empty() {
717            for tx in &mined {
718                listener.mined(tx, block_hash);
719            }
720            for tx in &promoted {
721                listener.pending(tx.hash(), None);
722            }
723            for tx in &discarded {
724                listener.discarded(tx.hash());
725            }
726        }
727    }
728
729    /// Notifies all listeners about the transaction movements.
730    ///
731    /// This will emit events according to the provided changes.
732    ///
733    /// CAUTION: This function is only intended to be used manually in order to use this type's
734    /// [`TransactionEvents`] receivers when manually implementing the
735    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
736    /// [`TransactionPool::transaction_event_listener`](crate::TransactionPool).
737    #[allow(clippy::type_complexity)]
738    pub fn notify_on_transaction_updates(
739        &self,
740        promoted: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
741        discarded: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
742    ) {
743        // Notify about promoted pending transactions (similar to notify_on_new_state)
744        if !promoted.is_empty() {
745            self.pending_transaction_listener.lock().retain_mut(|listener| {
746                let promoted_hashes = promoted.iter().filter_map(|tx| {
747                    if listener.kind.is_propagate_only() && !tx.propagate {
748                        None
749                    } else {
750                        Some(*tx.hash())
751                    }
752                });
753                listener.send_all(promoted_hashes)
754            });
755
756            // in this case we should also emit promoted transactions in full
757            self.transaction_listener.lock().retain_mut(|listener| {
758                let promoted_txs = promoted.iter().filter_map(|tx| {
759                    if listener.kind.is_propagate_only() && !tx.propagate {
760                        None
761                    } else {
762                        Some(NewTransactionEvent::pending(tx.clone()))
763                    }
764                });
765                listener.send_all(promoted_txs)
766            });
767        }
768
769        {
770            let mut listener = self.event_listener.write();
771            if !listener.is_empty() {
772                for tx in &promoted {
773                    listener.pending(tx.hash(), None);
774                }
775                for tx in &discarded {
776                    listener.discarded(tx.hash());
777                }
778            }
779        }
780
781        if !discarded.is_empty() {
782            // This deletes outdated blob txs from the blob store, based on the account's nonce.
783            // This is called during txpool maintenance when the pool drifted.
784            self.delete_discarded_blobs(discarded.iter());
785        }
786    }
787
788    /// Fire events for the newly added transaction if there are any.
789    ///
790    /// See also [`Self::add_transaction_event_listener`].
791    ///
792    /// CAUTION: This function is only intended to be used manually in order to use this type's
793    /// [`TransactionEvents`] receivers when manually implementing the
794    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
795    /// [`TransactionPool::transaction_event_listener`](crate::TransactionPool).
796    pub fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
797        let mut listener = self.event_listener.write();
798        if listener.is_empty() {
799            // nothing to notify
800            return
801        }
802
803        match tx {
804            AddedTransaction::Pending(tx) => {
805                let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
806
807                listener.pending(transaction.hash(), replaced.clone());
808                for tx in promoted {
809                    listener.pending(tx.hash(), None);
810                }
811                for tx in discarded {
812                    listener.discarded(tx.hash());
813                }
814            }
815            AddedTransaction::Parked { transaction, replaced, queued_reason, .. } => {
816                listener.queued(transaction.hash(), queued_reason.clone());
817                if let Some(replaced) = replaced {
818                    listener.replaced(replaced.clone(), *transaction.hash());
819                }
820            }
821        }
822    }
823
824    /// Returns an iterator that yields transactions that are ready to be included in the block.
825    pub fn best_transactions(&self) -> BestTransactions<T> {
826        self.get_pool_data().best_transactions()
827    }
828
829    /// Returns an iterator that yields transactions that are ready to be included in the block with
830    /// the given base fee and optional blob fee attributes.
831    pub fn best_transactions_with_attributes(
832        &self,
833        best_transactions_attributes: BestTransactionsAttributes,
834    ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
835    {
836        self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
837    }
838
839    /// Returns only the first `max` transactions in the pending pool.
840    pub fn pending_transactions_max(
841        &self,
842        max: usize,
843    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
844        self.get_pool_data().pending_transactions_iter().take(max).collect()
845    }
846
847    /// Returns all transactions from the pending sub-pool
848    pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
849        self.get_pool_data().pending_transactions()
850    }
851
852    /// Returns all transactions from parked pools
853    pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
854        self.get_pool_data().queued_transactions()
855    }
856
857    /// Returns all transactions in the pool
858    pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
859        let pool = self.get_pool_data();
860        AllPoolTransactions {
861            pending: pool.pending_transactions(),
862            queued: pool.queued_transactions(),
863        }
864    }
865
866    /// Returns _all_ transactions in the pool
867    pub fn all_transaction_hashes(&self) -> Vec<TxHash> {
868        self.get_pool_data().all().transactions_iter().map(|tx| *tx.hash()).collect()
869    }
870
871    /// Removes and returns all matching transactions from the pool.
872    ///
873    /// This behaves as if the transactions got discarded (_not_ mined), effectively introducing a
874    /// nonce gap for the given transactions.
875    pub fn remove_transactions(
876        &self,
877        hashes: Vec<TxHash>,
878    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
879        if hashes.is_empty() {
880            return Vec::new()
881        }
882        let removed = self.pool.write().remove_transactions(hashes);
883
884        self.event_listener.write().discarded_many(&removed);
885
886        removed
887    }
888
889    /// Removes and returns all matching transactions and their dependent transactions from the
890    /// pool.
891    pub fn remove_transactions_and_descendants(
892        &self,
893        hashes: Vec<TxHash>,
894    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
895        if hashes.is_empty() {
896            return Vec::new()
897        }
898        let removed = self.pool.write().remove_transactions_and_descendants(hashes);
899
900        let mut listener = self.event_listener.write();
901
902        for tx in &removed {
903            listener.discarded(tx.hash());
904        }
905
906        removed
907    }
908
909    /// Removes and returns all transactions by the specified sender from the pool.
910    pub fn remove_transactions_by_sender(
911        &self,
912        sender: Address,
913    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
914        let sender_id = self.get_sender_id(sender);
915        let removed = self.pool.write().remove_transactions_by_sender(sender_id);
916
917        self.event_listener.write().discarded_many(&removed);
918
919        removed
920    }
921
922    /// Removes and returns all transactions that are present in the pool.
923    pub fn retain_unknown<A>(&self, announcement: &mut A)
924    where
925        A: HandleMempoolData,
926    {
927        if announcement.is_empty() {
928            return
929        }
930        let pool = self.get_pool_data();
931        announcement.retain_by_hash(|tx| !pool.contains(tx))
932    }
933
934    /// Returns the transaction by hash.
935    pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
936        self.get_pool_data().get(tx_hash)
937    }
938
939    /// Returns all transactions of the address
940    pub fn get_transactions_by_sender(
941        &self,
942        sender: Address,
943    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
944        let sender_id = self.get_sender_id(sender);
945        self.get_pool_data().get_transactions_by_sender(sender_id)
946    }
947
948    /// Returns all queued transactions of the address by sender
949    pub fn get_queued_transactions_by_sender(
950        &self,
951        sender: Address,
952    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
953        let sender_id = self.get_sender_id(sender);
954        self.get_pool_data().queued_txs_by_sender(sender_id)
955    }
956
957    /// Returns all pending transactions filtered by predicate
958    pub fn pending_transactions_with_predicate(
959        &self,
960        predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
961    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
962        self.get_pool_data().pending_transactions_with_predicate(predicate)
963    }
964
965    /// Returns all pending transactions of the address by sender
966    pub fn get_pending_transactions_by_sender(
967        &self,
968        sender: Address,
969    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
970        let sender_id = self.get_sender_id(sender);
971        self.get_pool_data().pending_txs_by_sender(sender_id)
972    }
973
974    /// Returns the highest transaction of the address
975    pub fn get_highest_transaction_by_sender(
976        &self,
977        sender: Address,
978    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
979        let sender_id = self.get_sender_id(sender);
980        self.get_pool_data().get_highest_transaction_by_sender(sender_id)
981    }
982
983    /// Returns the transaction with the highest nonce that is executable given the on chain nonce.
984    pub fn get_highest_consecutive_transaction_by_sender(
985        &self,
986        sender: Address,
987        on_chain_nonce: u64,
988    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
989        let sender_id = self.get_sender_id(sender);
990        self.get_pool_data().get_highest_consecutive_transaction_by_sender(
991            sender_id.into_transaction_id(on_chain_nonce),
992        )
993    }
994
995    /// Returns the transaction given a [`TransactionId`]
996    pub fn get_transaction_by_transaction_id(
997        &self,
998        transaction_id: &TransactionId,
999    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1000        self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
1001    }
1002
1003    /// Returns all transactions that where submitted with the given [`TransactionOrigin`]
1004    pub fn get_transactions_by_origin(
1005        &self,
1006        origin: TransactionOrigin,
1007    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1008        self.get_pool_data()
1009            .all()
1010            .transactions_iter()
1011            .filter(|tx| tx.origin == origin)
1012            .cloned()
1013            .collect()
1014    }
1015
1016    /// Returns all pending transactions filtered by [`TransactionOrigin`]
1017    pub fn get_pending_transactions_by_origin(
1018        &self,
1019        origin: TransactionOrigin,
1020    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1021        self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
1022    }
1023
1024    /// Returns all the transactions belonging to the hashes.
1025    ///
1026    /// If no transaction exists, it is skipped.
1027    pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1028        if txs.is_empty() {
1029            return Vec::new()
1030        }
1031        self.get_pool_data().get_all(txs).collect()
1032    }
1033
1034    /// Returns all the transactions belonging to the hashes that are propagatable.
1035    ///
1036    /// If no transaction exists, it is skipped.
1037    fn get_all_propagatable(
1038        &self,
1039        txs: Vec<TxHash>,
1040    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1041        if txs.is_empty() {
1042            return Vec::new()
1043        }
1044        self.get_pool_data().get_all(txs).filter(|tx| tx.propagate).collect()
1045    }
1046
1047    /// Notify about propagated transactions.
1048    pub fn on_propagated(&self, txs: PropagatedTransactions) {
1049        if txs.0.is_empty() {
1050            return
1051        }
1052        let mut listener = self.event_listener.write();
1053
1054        if !listener.is_empty() {
1055            txs.0.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers));
1056        }
1057    }
1058
1059    /// Number of transactions in the entire pool
1060    pub fn len(&self) -> usize {
1061        self.get_pool_data().len()
1062    }
1063
1064    /// Whether the pool is empty
1065    pub fn is_empty(&self) -> bool {
1066        self.get_pool_data().is_empty()
1067    }
1068
1069    /// Returns whether or not the pool is over its configured size and transaction count limits.
1070    pub fn is_exceeded(&self) -> bool {
1071        self.pool.read().is_exceeded()
1072    }
1073
1074    /// Inserts a blob transaction into the blob store
1075    fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecarVariant) {
1076        debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
1077        if let Err(err) = self.blob_store.insert(hash, blob) {
1078            warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
1079            self.blob_store_metrics.blobstore_failed_inserts.increment(1);
1080        }
1081        self.update_blob_store_metrics();
1082    }
1083
1084    /// Delete a blob from the blob store
1085    pub fn delete_blob(&self, blob: TxHash) {
1086        let _ = self.blob_store.delete(blob);
1087    }
1088
1089    /// Delete all blobs from the blob store
1090    pub fn delete_blobs(&self, txs: Vec<TxHash>) {
1091        let _ = self.blob_store.delete_all(txs);
1092    }
1093
1094    /// Cleans up the blob store
1095    pub fn cleanup_blobs(&self) {
1096        let stat = self.blob_store.cleanup();
1097        self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
1098        self.update_blob_store_metrics();
1099    }
1100
1101    fn update_blob_store_metrics(&self) {
1102        if let Some(data_size) = self.blob_store.data_size_hint() {
1103            self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
1104        }
1105        self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
1106    }
1107
1108    /// Deletes all blob transactions that were discarded.
1109    fn delete_discarded_blobs<'a>(
1110        &'a self,
1111        transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
1112    ) {
1113        let blob_txs = transactions
1114            .into_iter()
1115            .filter(|tx| tx.transaction.is_eip4844())
1116            .map(|tx| *tx.hash())
1117            .collect();
1118        self.delete_blobs(blob_txs);
1119    }
1120}
1121
1122impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
1123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1124        f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
1125    }
1126}
1127
1128/// Tracks an added transaction and all graph changes caused by adding it.
1129#[derive(Debug, Clone)]
1130pub struct AddedPendingTransaction<T: PoolTransaction> {
1131    /// Inserted transaction.
1132    pub transaction: Arc<ValidPoolTransaction<T>>,
1133    /// Replaced transaction.
1134    pub replaced: Option<Arc<ValidPoolTransaction<T>>>,
1135    /// transactions promoted to the pending queue
1136    pub promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1137    /// transactions that failed and became discarded
1138    pub discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1139}
1140
1141impl<T: PoolTransaction> AddedPendingTransaction<T> {
1142    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1143    /// [`TransactionListenerKind`].
1144    ///
1145    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1146    /// are allowed to be propagated are returned.
1147    pub(crate) fn pending_transactions(
1148        &self,
1149        kind: TransactionListenerKind,
1150    ) -> impl Iterator<Item = B256> + '_ {
1151        let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1152        PendingTransactionIter { kind, iter }
1153    }
1154}
1155
1156pub(crate) struct PendingTransactionIter<Iter> {
1157    kind: TransactionListenerKind,
1158    iter: Iter,
1159}
1160
1161impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1162where
1163    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1164    T: PoolTransaction + 'a,
1165{
1166    type Item = B256;
1167
1168    fn next(&mut self) -> Option<Self::Item> {
1169        loop {
1170            let next = self.iter.next()?;
1171            if self.kind.is_propagate_only() && !next.propagate {
1172                continue
1173            }
1174            return Some(*next.hash())
1175        }
1176    }
1177}
1178
1179/// An iterator over full pending transactions
1180pub(crate) struct FullPendingTransactionIter<Iter> {
1181    kind: TransactionListenerKind,
1182    iter: Iter,
1183}
1184
1185impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1186where
1187    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1188    T: PoolTransaction + 'a,
1189{
1190    type Item = NewTransactionEvent<T>;
1191
1192    fn next(&mut self) -> Option<Self::Item> {
1193        loop {
1194            let next = self.iter.next()?;
1195            if self.kind.is_propagate_only() && !next.propagate {
1196                continue
1197            }
1198            return Some(NewTransactionEvent {
1199                subpool: SubPool::Pending,
1200                transaction: next.clone(),
1201            })
1202        }
1203    }
1204}
1205
1206/// Represents a transaction that was added into the pool and its state
1207#[derive(Debug, Clone)]
1208pub enum AddedTransaction<T: PoolTransaction> {
1209    /// Transaction was successfully added and moved to the pending pool.
1210    Pending(AddedPendingTransaction<T>),
1211    /// Transaction was successfully added but not yet ready for processing and moved to a
1212    /// parked pool instead.
1213    Parked {
1214        /// Inserted transaction.
1215        transaction: Arc<ValidPoolTransaction<T>>,
1216        /// Replaced transaction.
1217        replaced: Option<Arc<ValidPoolTransaction<T>>>,
1218        /// The subpool it was moved to.
1219        subpool: SubPool,
1220        /// The specific reason why the transaction is queued (if applicable).
1221        queued_reason: Option<QueuedReason>,
1222    },
1223}
1224
1225impl<T: PoolTransaction> AddedTransaction<T> {
1226    /// Returns whether the transaction has been added to the pending pool.
1227    pub const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1228        match self {
1229            Self::Pending(tx) => Some(tx),
1230            _ => None,
1231        }
1232    }
1233
1234    /// Returns the replaced transaction if there was one
1235    pub const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1236        match self {
1237            Self::Pending(tx) => tx.replaced.as_ref(),
1238            Self::Parked { replaced, .. } => replaced.as_ref(),
1239        }
1240    }
1241
1242    /// Returns the discarded transactions if there were any
1243    pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1244        match self {
1245            Self::Pending(tx) => Some(&tx.discarded),
1246            Self::Parked { .. } => None,
1247        }
1248    }
1249
1250    /// Returns the hash of the replaced transaction if it is a blob transaction.
1251    pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1252        self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1253    }
1254
1255    /// Returns the hash of the transaction
1256    pub fn hash(&self) -> &TxHash {
1257        match self {
1258            Self::Pending(tx) => tx.transaction.hash(),
1259            Self::Parked { transaction, .. } => transaction.hash(),
1260        }
1261    }
1262
1263    /// Converts this type into the event type for listeners
1264    pub fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1265        match self {
1266            Self::Pending(tx) => {
1267                NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1268            }
1269            Self::Parked { transaction, subpool, .. } => {
1270                NewTransactionEvent { transaction, subpool }
1271            }
1272        }
1273    }
1274
1275    /// Returns the subpool this transaction was added to
1276    pub(crate) const fn subpool(&self) -> SubPool {
1277        match self {
1278            Self::Pending(_) => SubPool::Pending,
1279            Self::Parked { subpool, .. } => *subpool,
1280        }
1281    }
1282
1283    /// Returns the [`TransactionId`] of the added transaction
1284    #[cfg(test)]
1285    pub(crate) fn id(&self) -> &TransactionId {
1286        match self {
1287            Self::Pending(added) => added.transaction.id(),
1288            Self::Parked { transaction, .. } => transaction.id(),
1289        }
1290    }
1291
1292    /// Returns the queued reason if the transaction is parked with a queued reason.
1293    pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1294        match self {
1295            Self::Pending(_) => None,
1296            Self::Parked { queued_reason, .. } => queued_reason.as_ref(),
1297        }
1298    }
1299
1300    /// Returns the transaction state based on the subpool and queued reason.
1301    pub fn transaction_state(&self) -> AddedTransactionState {
1302        match self.subpool() {
1303            SubPool::Pending => AddedTransactionState::Pending,
1304            _ => {
1305                // For non-pending transactions, use the queued reason directly from the
1306                // AddedTransaction
1307                if let Some(reason) = self.queued_reason() {
1308                    AddedTransactionState::Queued(reason.clone())
1309                } else {
1310                    // Fallback - this shouldn't happen with the new implementation
1311                    AddedTransactionState::Queued(QueuedReason::NonceGap)
1312                }
1313            }
1314        }
1315    }
1316}
1317
1318/// The specific reason why a transaction is queued (not ready for execution)
1319#[derive(Debug, Clone, PartialEq, Eq)]
1320pub enum QueuedReason {
1321    /// Transaction has a nonce gap - missing prior transactions
1322    NonceGap,
1323    /// Transaction has parked ancestors - waiting for other transactions to be mined
1324    ParkedAncestors,
1325    /// Sender has insufficient balance to cover the transaction cost
1326    InsufficientBalance,
1327    /// Transaction exceeds the block gas limit
1328    TooMuchGas,
1329    /// Transaction doesn't meet the base fee requirement
1330    InsufficientBaseFee,
1331    /// Transaction doesn't meet the blob fee requirement (EIP-4844)
1332    InsufficientBlobFee,
1333}
1334
1335/// The state of a transaction when is was added to the pool
1336#[derive(Debug, Clone, PartialEq, Eq)]
1337pub enum AddedTransactionState {
1338    /// Ready for execution
1339    Pending,
1340    /// Not ready for execution due to a specific condition
1341    Queued(QueuedReason),
1342}
1343
1344impl AddedTransactionState {
1345    /// Returns whether the transaction was submitted as queued.
1346    pub const fn is_queued(&self) -> bool {
1347        matches!(self, Self::Queued(_))
1348    }
1349
1350    /// Returns whether the transaction was submitted as pending.
1351    pub const fn is_pending(&self) -> bool {
1352        matches!(self, Self::Pending)
1353    }
1354
1355    /// Returns the specific queued reason if the transaction is queued.
1356    pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1357        match self {
1358            Self::Queued(reason) => Some(reason),
1359            Self::Pending => None,
1360        }
1361    }
1362}
1363
1364/// The outcome of a successful transaction addition
1365#[derive(Debug, Clone, PartialEq, Eq)]
1366pub struct AddedTransactionOutcome {
1367    /// The hash of the transaction
1368    pub hash: TxHash,
1369    /// The state of the transaction
1370    pub state: AddedTransactionState,
1371}
1372
1373impl AddedTransactionOutcome {
1374    /// Returns whether the transaction was submitted as queued.
1375    pub const fn is_queued(&self) -> bool {
1376        self.state.is_queued()
1377    }
1378
1379    /// Returns whether the transaction was submitted as pending.
1380    pub const fn is_pending(&self) -> bool {
1381        self.state.is_pending()
1382    }
1383}
1384
1385/// Contains all state changes after a [`CanonicalStateUpdate`] was processed
1386#[derive(Debug)]
1387pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1388    /// Hash of the block.
1389    pub(crate) block_hash: B256,
1390    /// All mined transactions.
1391    pub(crate) mined: Vec<TxHash>,
1392    /// Transactions promoted to the pending pool.
1393    pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1394    /// transaction that were discarded during the update
1395    pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1396}
1397
1398impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1399    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1400    /// [`TransactionListenerKind`].
1401    ///
1402    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1403    /// are allowed to be propagated are returned.
1404    pub(crate) fn pending_transactions(
1405        &self,
1406        kind: TransactionListenerKind,
1407    ) -> impl Iterator<Item = B256> + '_ {
1408        let iter = self.promoted.iter();
1409        PendingTransactionIter { kind, iter }
1410    }
1411
1412    /// Returns all FULL transactions that were promoted to the pending pool and adhere to the given
1413    /// [`TransactionListenerKind`].
1414    ///
1415    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1416    /// are allowed to be propagated are returned.
1417    pub(crate) fn full_pending_transactions(
1418        &self,
1419        kind: TransactionListenerKind,
1420    ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1421        let iter = self.promoted.iter();
1422        FullPendingTransactionIter { kind, iter }
1423    }
1424}
1425
1426#[cfg(test)]
1427mod tests {
1428    use crate::{
1429        blobstore::{BlobStore, InMemoryBlobStore},
1430        identifier::SenderId,
1431        test_utils::{MockTransaction, TestPoolBuilder},
1432        validate::ValidTransaction,
1433        BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1434    };
1435    use alloy_eips::{eip4844::BlobTransactionSidecar, eip7594::BlobTransactionSidecarVariant};
1436    use alloy_primitives::Address;
1437    use std::{fs, path::PathBuf};
1438
1439    #[test]
1440    fn test_discard_blobs_on_blob_tx_eviction() {
1441        let blobs = {
1442            // Read the contents of the JSON file into a string.
1443            let json_content = fs::read_to_string(
1444                PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1445            )
1446            .expect("Failed to read the blob data file");
1447
1448            // Parse the JSON contents into a serde_json::Value.
1449            let json_value: serde_json::Value =
1450                serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1451
1452            // Extract blob data from JSON and convert it to Blob.
1453            vec![
1454                // Extract the "data" field from the JSON and parse it as a string.
1455                json_value
1456                    .get("data")
1457                    .unwrap()
1458                    .as_str()
1459                    .expect("Data is not a valid string")
1460                    .to_string(),
1461            ]
1462        };
1463
1464        // Generate a BlobTransactionSidecar from the blobs.
1465        let sidecar = BlobTransactionSidecarVariant::Eip4844(
1466            BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap(),
1467        );
1468
1469        // Define the maximum limit for blobs in the sub-pool.
1470        let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1471
1472        // Create a test pool with default configuration and the specified blob limit.
1473        let test_pool = &TestPoolBuilder::default()
1474            .with_config(PoolConfig { blob_limit, ..Default::default() })
1475            .pool;
1476
1477        // Set the block info for the pool, including a pending blob fee.
1478        test_pool
1479            .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1480
1481        // Create an in-memory blob store.
1482        let blob_store = InMemoryBlobStore::default();
1483
1484        // Loop to add transactions to the pool and test blob eviction.
1485        for n in 0..blob_limit.max_txs + 10 {
1486            // Create a mock transaction with the generated blob sidecar.
1487            let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1488
1489            // Set non zero size
1490            tx.set_size(1844674407370951);
1491
1492            // Insert the sidecar into the blob store if the current index is within the blob limit.
1493            if n < blob_limit.max_txs {
1494                blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1495            }
1496
1497            // Add the transaction to the pool with external origin and valid outcome.
1498            test_pool.add_transactions(
1499                TransactionOrigin::External,
1500                [TransactionValidationOutcome::Valid {
1501                    balance: U256::from(1_000),
1502                    state_nonce: 0,
1503                    bytecode_hash: None,
1504                    transaction: ValidTransaction::ValidWithSidecar {
1505                        transaction: tx,
1506                        sidecar: sidecar.clone(),
1507                    },
1508                    propagate: true,
1509                    authorities: None,
1510                }],
1511            );
1512        }
1513
1514        // Assert that the size of the pool's blob component is equal to the maximum blob limit.
1515        assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1516
1517        // Assert that the size of the pool's blob_size component matches the expected value.
1518        assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1519
1520        // Assert that the pool's blob store matches the expected blob store.
1521        assert_eq!(*test_pool.blob_store(), blob_store);
1522    }
1523
1524    #[test]
1525    fn test_auths_stored_in_identifiers() {
1526        // Create a test pool with default configuration.
1527        let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1528
1529        let auth = Address::new([1; 20]);
1530        let tx = MockTransaction::eip7702();
1531
1532        test_pool.add_transactions(
1533            TransactionOrigin::Local,
1534            [TransactionValidationOutcome::Valid {
1535                balance: U256::from(1_000),
1536                state_nonce: 0,
1537                bytecode_hash: None,
1538                transaction: ValidTransaction::Valid(tx),
1539                propagate: true,
1540                authorities: Some(vec![auth]),
1541            }],
1542        );
1543
1544        let identifiers = test_pool.identifiers.read();
1545        assert_eq!(identifiers.sender_id(&auth), Some(SenderId::from(1)));
1546    }
1547}