Skip to main content

reth_transaction_pool/pool/
mod.rs

1//! Transaction Pool internals.
2//!
3//! Incoming transactions are validated before they enter the pool first. The validation outcome can
4//! have 3 states:
5//!
6//!  1. Transaction can _never_ be valid
7//!  2. Transaction is _currently_ valid
8//!  3. Transaction is _currently_ invalid, but could potentially become valid in the future
9//!
10//! However, (2.) and (3.) of a transaction can only be determined on the basis of the current
11//! state, whereas (1.) holds indefinitely. This means once the state changes (2.) and (3.) the
12//! state of a transaction needs to be reevaluated again.
13//!
14//! The transaction pool is responsible for storing new, valid transactions and providing the next
15//! best transactions sorted by their priority. Where priority is determined by the transaction's
16//! score ([`TransactionOrdering`]).
17//!
18//! Furthermore, the following characteristics fall under (3.):
19//!
20//!  a) Nonce of a transaction is higher than the expected nonce for the next transaction of its
21//! sender. A distinction is made here whether multiple transactions from the same sender have
22//! gapless nonce increments.
23//!
24//!  a)(1) If _no_ transaction is missing in a chain of multiple
25//! transactions from the same sender (all nonce in row), all of them can in principle be executed
26//! on the current state one after the other.
27//!
28//!  a)(2) If there's a nonce gap, then all
29//! transactions after the missing transaction are blocked until the missing transaction arrives.
30//!
31//!  b) Transaction does not meet the dynamic fee cap requirement introduced by EIP-1559: The
32//! fee cap of the transaction needs to be no less than the base fee of block.
33//!
34//!
35//! In essence the transaction pool is made of three separate sub-pools:
36//!
37//!  - Pending Pool: Contains all transactions that are valid on the current state and satisfy (3.
38//!    a)(1): _No_ nonce gaps. A _pending_ transaction is considered _ready_ when it has the lowest
39//!    nonce of all transactions from the same sender. Once a _ready_ transaction with nonce `n` has
40//!    been executed, the next highest transaction from the same sender `n + 1` becomes ready.
41//!
42//!  - Queued Pool: Contains all transactions that are currently blocked by missing transactions:
43//!    (3. a)(2): _With_ nonce gaps or due to lack of funds.
44//!
45//!  - Basefee Pool: To account for the dynamic base fee requirement (3. b) which could render an
46//!    EIP-1559 and all subsequent transactions of the sender currently invalid.
47//!
48//! The classification of transactions is always dependent on the current state that is changed as
49//! soon as a new block is mined. Once a new block is mined, the account changeset must be applied
50//! to the transaction pool.
51//!
52//!
53//! Depending on the use case, consumers of the [`TransactionPool`](crate::traits::TransactionPool)
54//! are interested in (2.) and/or (3.).
55
56//! A generic [`TransactionPool`](crate::traits::TransactionPool) that only handles transactions.
57//!
58//! This Pool maintains two separate sub-pools for (2.) and (3.)
59//!
60//! ## Terminology
61//!
62//!  - _Pending_: pending transactions are transactions that fall under (2.). These transactions can
63//!    currently be executed and are stored in the pending sub-pool
64//!  - _Queued_: queued transactions are transactions that fall under category (3.). Those
65//!    transactions are _currently_ waiting for state changes that eventually move them into
66//!    category (2.) and become pending.
67
68use crate::{
69    blobstore::BlobStore,
70    error::{PoolError, PoolErrorKind, PoolResult},
71    identifier::{SenderId, SenderIdentifiers, TransactionId},
72    metrics::BlobStoreMetrics,
73    pool::{
74        listener::{
75            BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76            TransactionListener,
77        },
78        state::SubPool,
79        txpool::{SenderInfo, TxPool},
80        update::UpdateOutcome,
81    },
82    traits::{
83        AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84        NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85    },
86    validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87    CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88    TransactionValidator,
89};
90
91use alloy_primitives::{
92    map::{AddressSet, HashSet},
93    Address, TxHash, B256,
94};
95use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
96use reth_eth_wire_types::HandleMempoolData;
97use reth_execution_types::ChangedAccount;
98
99use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718};
100use reth_primitives_traits::Recovered;
101use rustc_hash::FxHashMap;
102use std::{
103    fmt,
104    sync::{
105        atomic::{AtomicBool, Ordering},
106        Arc,
107    },
108    time::Instant,
109};
110use tokio::sync::mpsc;
111use tracing::{debug, trace, warn};
112mod events;
113pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
114pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
115pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
116pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
117pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
118pub use pending::PendingPool;
119
120mod best;
121pub use best::BestTransactions;
122
123mod blob;
124pub mod listener;
125mod parked;
126pub mod pending;
127pub mod size;
128pub(crate) mod state;
129pub mod txpool;
130mod update;
131
132/// Bound on number of pending transactions from `reth_network::TransactionsManager` to buffer.
133pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
134/// Bound on number of new transactions from `reth_network::TransactionsManager` to buffer.
135pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
136
137const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
138
139/// Transaction pool internals.
140pub struct PoolInner<V, T, S>
141where
142    T: TransactionOrdering,
143{
144    /// Internal mapping of addresses to plain ints.
145    identifiers: RwLock<SenderIdentifiers>,
146    /// Transaction validator.
147    validator: V,
148    /// Storage for blob transactions
149    blob_store: S,
150    /// The internal pool that manages all transactions.
151    pool: RwLock<TxPool<T>>,
152    /// Pool settings.
153    config: PoolConfig,
154    /// Manages listeners for transaction state change events.
155    event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
156    /// Tracks whether any event listeners have ever been installed.
157    has_event_listeners: AtomicBool,
158    /// Listeners for new _full_ pending transactions.
159    pending_transaction_listener: RwLock<Vec<PendingTransactionHashListener>>,
160    /// Listeners for new transactions added to the pool.
161    transaction_listener: RwLock<Vec<TransactionListener<T::Transaction>>>,
162    /// Listener for new blob transaction sidecars added to the pool.
163    blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
164    /// Metrics for the blob store
165    blob_store_metrics: BlobStoreMetrics,
166}
167
168// === impl PoolInner ===
169
170impl<V, T, S> PoolInner<V, T, S>
171where
172    V: TransactionValidator,
173    T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
174    S: BlobStore,
175{
176    /// Create a new transaction pool instance.
177    pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
178        Self {
179            identifiers: Default::default(),
180            validator,
181            event_listener: Default::default(),
182            has_event_listeners: AtomicBool::new(false),
183            pool: RwLock::new(TxPool::new(ordering, config.clone())),
184            pending_transaction_listener: Default::default(),
185            transaction_listener: Default::default(),
186            blob_transaction_sidecar_listener: Default::default(),
187            config,
188            blob_store,
189            blob_store_metrics: Default::default(),
190        }
191    }
192
193    /// Returns the configured blob store.
194    pub const fn blob_store(&self) -> &S {
195        &self.blob_store
196    }
197
198    /// Returns stats about the size of the pool.
199    pub fn size(&self) -> PoolSize {
200        self.get_pool_data().size()
201    }
202
203    /// Returns the currently tracked block
204    pub fn block_info(&self) -> BlockInfo {
205        self.get_pool_data().block_info()
206    }
207    /// Sets the currently tracked block.
208    ///
209    /// This will also notify subscribers about any transactions that were promoted to the pending
210    /// pool due to fee changes.
211    pub fn set_block_info(&self, info: BlockInfo) {
212        let outcome = self.pool.write().set_block_info(info);
213
214        // Notify subscribers about promoted transactions due to fee changes
215        self.notify_on_transaction_updates(outcome.promoted, outcome.discarded);
216    }
217
218    /// Returns the internal [`SenderId`] for this address, allocating a new mapping when the
219    /// address is first observed.
220    ///
221    /// This must only be used on paths that intentionally begin tracking a sender, such as
222    /// transaction insertion. Read-only lookups should prefer [`Self::sender_id`] to avoid
223    /// growing the sender-id map for unknown addresses.
224    pub fn get_sender_id(&self, addr: Address) -> SenderId {
225        self.identifiers.write().sender_id_or_create(addr)
226    }
227
228    /// Returns the internal [`SenderId`] for this address if it is already tracked.
229    ///
230    /// Unlike [`Self::get_sender_id`], this never allocates a new sender mapping and is therefore
231    /// suitable for read-only queries or best-effort cleanup on unknown addresses.
232    pub fn sender_id(&self, addr: &Address) -> Option<SenderId> {
233        self.identifiers.read().sender_id(addr)
234    }
235
236    /// Returns the internal [`SenderId`]s for the given addresses.
237    pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
238        self.identifiers.write().sender_ids_or_create(addrs)
239    }
240
241    /// Returns all senders in the pool
242    pub fn unique_senders(&self) -> AddressSet {
243        self.get_pool_data().unique_senders()
244    }
245
246    /// Converts the changed accounts to a map of sender ids to sender info (internal identifier
247    /// used for __tracked__ accounts)
248    fn changed_senders(
249        &self,
250        accs: impl Iterator<Item = ChangedAccount>,
251    ) -> FxHashMap<SenderId, SenderInfo> {
252        let identifiers = self.identifiers.read();
253        accs.into_iter()
254            .filter_map(|acc| {
255                let ChangedAccount { address, nonce, balance } = acc;
256                let sender_id = identifiers.sender_id(&address)?;
257                Some((sender_id, SenderInfo { state_nonce: nonce, balance }))
258            })
259            .collect()
260    }
261
262    /// Get the config the pool was configured with.
263    pub const fn config(&self) -> &PoolConfig {
264        &self.config
265    }
266
267    /// Get the validator reference.
268    pub const fn validator(&self) -> &V {
269        &self.validator
270    }
271
272    /// Adds a new transaction listener to the pool that gets notified about every new _pending_
273    /// transaction inserted into the pool
274    pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
275        let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
276        let listener = PendingTransactionHashListener { sender, kind };
277
278        let mut listeners = self.pending_transaction_listener.write();
279        // Clean up dead listeners before adding new one
280        listeners.retain(|l| !l.sender.is_closed());
281        listeners.push(listener);
282
283        rx
284    }
285
286    /// Adds a new transaction listener to the pool that gets notified about every new transaction.
287    pub fn add_new_transaction_listener(
288        &self,
289        kind: TransactionListenerKind,
290    ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
291        let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
292        let listener = TransactionListener { sender, kind };
293
294        let mut listeners = self.transaction_listener.write();
295        // Clean up dead listeners before adding new one
296        listeners.retain(|l| !l.sender.is_closed());
297        listeners.push(listener);
298
299        rx
300    }
301    /// Adds a new blob sidecar listener to the pool that gets notified about every new
302    /// eip4844 transaction's blob sidecar.
303    pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
304        let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
305        let listener = BlobTransactionSidecarListener { sender };
306        self.blob_transaction_sidecar_listener.lock().push(listener);
307        rx
308    }
309
310    /// If the pool contains the transaction, this adds a new listener that gets notified about
311    /// transaction events.
312    pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
313        if !self.get_pool_data().contains(&tx_hash) {
314            return None
315        }
316        let mut listener = self.event_listener.write();
317        let events = listener.subscribe(tx_hash);
318        self.mark_event_listener_installed();
319        Some(events)
320    }
321
322    /// Adds a listener for all transaction events.
323    pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
324        let mut listener = self.event_listener.write();
325        let events = listener.subscribe_all();
326        self.mark_event_listener_installed();
327        events
328    }
329
330    #[inline]
331    fn has_event_listeners(&self) -> bool {
332        self.has_event_listeners.load(Ordering::Relaxed)
333    }
334
335    #[inline]
336    fn mark_event_listener_installed(&self) {
337        self.has_event_listeners.store(true, Ordering::Relaxed);
338    }
339
340    #[inline]
341    fn update_event_listener_state(&self, listener: &PoolEventBroadcast<T::Transaction>) {
342        if listener.is_empty() {
343            self.has_event_listeners.store(false, Ordering::Relaxed);
344        }
345    }
346
347    #[inline]
348    fn with_event_listener<F>(&self, emit: F)
349    where
350        F: FnOnce(&mut PoolEventBroadcast<T::Transaction>),
351    {
352        if !self.has_event_listeners() {
353            return
354        }
355        let mut listener = self.event_listener.write();
356        if !listener.is_empty() {
357            emit(&mut listener);
358        }
359        self.update_event_listener_state(&listener);
360    }
361
362    /// Returns a read lock to the pool's data.
363    pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
364        self.pool.read()
365    }
366
367    /// Returns transactions in the pool that can be propagated
368    pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
369        let mut out = Vec::new();
370        self.append_pooled_transactions(&mut out);
371        out
372    }
373
374    /// Returns hashes of transactions in the pool that can be propagated.
375    pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
376        let mut out = Vec::new();
377        self.append_pooled_transactions_hashes(&mut out);
378        out
379    }
380
381    /// Returns only the first `max` transactions in the pool that can be propagated.
382    pub fn pooled_transactions_max(
383        &self,
384        max: usize,
385    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
386        if max == 0 {
387            return Vec::new()
388        }
389
390        let pool = self.get_pool_data();
391        let mut out = Vec::with_capacity(max.min(pool.all().len()));
392        out.extend(pool.all().transactions_iter().filter(|tx| tx.propagate).take(max).cloned());
393        out
394    }
395
396    /// Extends the given vector with all transactions in the pool that can be propagated.
397    pub fn append_pooled_transactions(
398        &self,
399        out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
400    ) {
401        out.extend(
402            self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned(),
403        );
404    }
405
406    /// Extends the given vector with pooled transactions for the given hashes that are allowed to
407    /// be propagated.
408    pub fn append_pooled_transaction_elements(
409        &self,
410        tx_hashes: &[TxHash],
411        limit: GetPooledTransactionLimit,
412        out: &mut Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>,
413    ) where
414        <V as TransactionValidator>::Transaction: EthPoolTransaction,
415    {
416        let transactions = self.get_all_propagatable(tx_hashes);
417        let mut size = 0;
418        for transaction in transactions {
419            let encoded_len = transaction.encoded_length();
420            let Some(pooled) = self.to_pooled_transaction(transaction) else {
421                continue;
422            };
423
424            size += encoded_len;
425            out.push(pooled.into_inner());
426
427            if limit.exceeds(size) {
428                break
429            }
430        }
431    }
432
433    /// Extends the given vector with the hashes of all transactions in the pool that can be
434    /// propagated.
435    pub fn append_pooled_transactions_hashes(&self, out: &mut Vec<TxHash>) {
436        out.extend(
437            self.get_pool_data()
438                .all()
439                .transactions_iter()
440                .filter(|tx| tx.propagate)
441                .map(|tx| *tx.hash()),
442        );
443    }
444
445    /// Extends the given vector with only the first `max` transactions in the pool that can be
446    /// propagated.
447    pub fn append_pooled_transactions_max(
448        &self,
449        max: usize,
450        out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
451    ) {
452        out.extend(
453            self.get_pool_data()
454                .all()
455                .transactions_iter()
456                .filter(|tx| tx.propagate)
457                .take(max)
458                .cloned(),
459        );
460    }
461
462    /// Returns only the first `max` hashes of transactions in the pool that can be propagated.
463    pub fn pooled_transactions_hashes_max(&self, max: usize) -> Vec<TxHash> {
464        if max == 0 {
465            return Vec::new();
466        }
467
468        let pool = self.get_pool_data();
469        let mut out = Vec::with_capacity(max.min(pool.all().len()));
470        out.extend(
471            pool.all().transactions_iter().filter(|tx| tx.propagate).take(max).map(|tx| *tx.hash()),
472        );
473        out
474    }
475
476    /// Converts the internally tracked transaction to the pooled format.
477    ///
478    /// If the transaction is an EIP-4844 transaction, the blob sidecar is fetched from the blob
479    /// store and attached to the transaction.
480    fn to_pooled_transaction(
481        &self,
482        transaction: Arc<ValidPoolTransaction<T::Transaction>>,
483    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
484    where
485        <V as TransactionValidator>::Transaction: EthPoolTransaction,
486    {
487        if transaction.is_eip4844() {
488            let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
489            transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
490        } else {
491            transaction
492                .transaction
493                .clone_into_pooled()
494                .inspect_err(|err| {
495                    debug!(
496                        target: "txpool", %err,
497                        "failed to convert transaction to pooled element; skipping",
498                    );
499                })
500                .ok()
501        }
502    }
503
504    /// Returns pooled transactions for the given transaction hashes that are allowed to be
505    /// propagated.
506    pub fn get_pooled_transaction_elements(
507        &self,
508        tx_hashes: Vec<TxHash>,
509        limit: GetPooledTransactionLimit,
510    ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
511    where
512        <V as TransactionValidator>::Transaction: EthPoolTransaction,
513    {
514        let mut elements = Vec::new();
515        self.append_pooled_transaction_elements(&tx_hashes, limit, &mut elements);
516        elements.shrink_to_fit();
517        elements
518    }
519
520    /// Returns converted pooled transaction for the given transaction hash.
521    pub fn get_pooled_transaction_element(
522        &self,
523        tx_hash: TxHash,
524    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
525    where
526        <V as TransactionValidator>::Transaction: EthPoolTransaction,
527    {
528        self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
529    }
530
531    /// Updates the entire pool after a new block was executed.
532    pub fn on_canonical_state_change(&self, update: CanonicalStateUpdate<'_, V::Block>) {
533        trace!(target: "txpool", ?update, "updating pool on canonical state change");
534
535        let block_info = update.block_info();
536        let CanonicalStateUpdate {
537            new_tip, changed_accounts, mined_transactions, update_kind, ..
538        } = update;
539        self.validator.on_new_head_block(new_tip);
540
541        let changed_senders = self.changed_senders(changed_accounts.into_iter());
542
543        // update the pool
544        let outcome = self.pool.write().on_canonical_state_change(
545            block_info,
546            mined_transactions,
547            changed_senders,
548            update_kind,
549        );
550
551        // This will discard outdated transactions based on the account's nonce
552        self.delete_discarded_blobs(outcome.discarded.iter());
553
554        // notify listeners about updates
555        self.notify_on_new_state(outcome);
556    }
557
558    /// Performs account updates on the pool.
559    ///
560    /// This will either promote or discard transactions based on the new account state.
561    ///
562    /// This should be invoked when the pool drifted and accounts are updated manually
563    pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
564        let changed_senders = self.changed_senders(accounts.into_iter());
565        let UpdateOutcome { promoted, discarded } =
566            self.pool.write().update_accounts(changed_senders);
567
568        self.notify_on_transaction_updates(promoted, discarded);
569    }
570
571    /// Add a single validated transaction into the pool.
572    ///
573    /// Returns the outcome and optionally metadata to be processed after the pool lock is
574    /// released.
575    ///
576    /// Note: this is only used internally by [`Self::add_transactions()`], all new transaction(s)
577    /// come in through that function, either as a batch or `std::iter::once`.
578    fn add_transaction(
579        &self,
580        pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
581        origin: TransactionOrigin,
582        tx: TransactionValidationOutcome<T::Transaction>,
583    ) -> (PoolResult<AddedTransactionOutcome>, Option<AddedTransactionMeta<T::Transaction>>) {
584        match tx {
585            TransactionValidationOutcome::Valid {
586                balance,
587                state_nonce,
588                transaction,
589                propagate,
590                bytecode_hash,
591                authorities,
592            } => {
593                let sender_id = self.get_sender_id(transaction.sender());
594                let transaction_id = TransactionId::new(sender_id, transaction.nonce());
595
596                // split the valid transaction and the blob sidecar if it has any
597                let (transaction, blob_sidecar) = match transaction {
598                    ValidTransaction::Valid(tx) => (tx, None),
599                    ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
600                        debug_assert!(
601                            transaction.is_eip4844(),
602                            "validator returned sidecar for non EIP-4844 transaction"
603                        );
604                        (transaction, Some(sidecar))
605                    }
606                };
607
608                let tx = ValidPoolTransaction {
609                    transaction,
610                    transaction_id,
611                    propagate,
612                    timestamp: Instant::now(),
613                    origin,
614                    authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
615                };
616
617                let added = match pool.add_transaction(tx, balance, state_nonce, bytecode_hash) {
618                    Ok(added) => added,
619                    Err(err) => return (Err(err), None),
620                };
621                let hash = *added.hash();
622                let state = added.transaction_state();
623
624                let meta = AddedTransactionMeta { added, blob_sidecar };
625
626                (Ok(AddedTransactionOutcome { hash, state }), Some(meta))
627            }
628            TransactionValidationOutcome::Invalid(tx, err) => {
629                self.with_event_listener(|listener| listener.invalid(tx.hash()));
630                (Err(PoolError::new(*tx.hash(), err)), None)
631            }
632            TransactionValidationOutcome::Error(tx_hash, err) => {
633                self.with_event_listener(|listener| listener.discarded(&tx_hash));
634                (Err(PoolError::other(tx_hash, err)), None)
635            }
636        }
637    }
638
639    /// Adds a transaction and returns the event stream.
640    pub fn add_transaction_and_subscribe(
641        &self,
642        origin: TransactionOrigin,
643        tx: TransactionValidationOutcome<T::Transaction>,
644    ) -> PoolResult<TransactionEvents> {
645        let listener = {
646            let mut listener = self.event_listener.write();
647            let events = listener.subscribe(tx.tx_hash());
648            self.mark_event_listener_installed();
649            events
650        };
651        let mut results = self.add_transactions(origin, std::iter::once(tx));
652        results.pop().expect("result length is the same as the input")?;
653        Ok(listener)
654    }
655
656    /// Adds all transactions in the iterator to the pool, returning a list of results.
657    ///
658    /// Convenience method that assigns the same origin to all transactions. Delegates to
659    /// [`Self::add_transactions_with_origins`].
660    pub fn add_transactions(
661        &self,
662        origin: TransactionOrigin,
663        transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
664    ) -> Vec<PoolResult<AddedTransactionOutcome>> {
665        self.add_transactions_with_origins(transactions.into_iter().map(|tx| (origin, tx)))
666    }
667
668    /// Adds all transactions in the iterator to the pool, each with its own
669    /// [`TransactionOrigin`], returning a list of results.
670    pub fn add_transactions_with_origins(
671        &self,
672        transactions: impl IntoIterator<
673            Item = (TransactionOrigin, TransactionValidationOutcome<T::Transaction>),
674        >,
675    ) -> Vec<PoolResult<AddedTransactionOutcome>> {
676        // Collect results and metadata while holding the pool write lock
677        let (mut results, added_metas, discarded) = {
678            let mut pool = self.pool.write();
679            let mut added_metas = Vec::new();
680
681            let results = transactions
682                .into_iter()
683                .map(|(origin, tx)| {
684                    let (result, meta) = self.add_transaction(&mut pool, origin, tx);
685
686                    // Only collect metadata for successful insertions
687                    if result.is_ok() &&
688                        let Some(meta) = meta
689                    {
690                        added_metas.push(meta);
691                    }
692
693                    result
694                })
695                .collect::<Vec<_>>();
696
697            // Enforce the pool size limits if at least one transaction was added successfully
698            let discarded = if results.iter().any(Result::is_ok) {
699                pool.discard_worst()
700            } else {
701                Default::default()
702            };
703
704            (results, added_metas, discarded)
705        };
706
707        for meta in added_metas {
708            self.on_added_transaction(meta);
709        }
710
711        if !discarded.is_empty() {
712            // Delete any blobs associated with discarded blob transactions
713            self.delete_discarded_blobs(discarded.iter());
714            self.with_event_listener(|listener| listener.discarded_many(&discarded));
715
716            let discarded_hashes =
717                discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
718
719            // A newly added transaction may be immediately discarded, so we need to
720            // adjust the result here
721            for res in &mut results {
722                if let Ok(AddedTransactionOutcome { hash, .. }) = res &&
723                    discarded_hashes.contains(hash)
724                {
725                    *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
726                }
727            }
728        };
729
730        results
731    }
732
733    /// Process a transaction that was added to the pool.
734    ///
735    /// Performs blob storage operations and sends all notifications. This should be called
736    /// after the pool write lock has been released to avoid blocking pool operations.
737    fn on_added_transaction(&self, meta: AddedTransactionMeta<T::Transaction>) {
738        // Handle blob sidecar storage and notifications for EIP-4844 transactions
739        if let Some(sidecar) = meta.blob_sidecar {
740            let hash = *meta.added.hash();
741            self.on_new_blob_sidecar(&hash, &sidecar);
742            self.insert_blob(hash, sidecar);
743        }
744
745        // Delete replaced blob sidecar if any
746        if let Some(replaced) = meta.added.replaced_blob_transaction() {
747            debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
748            self.delete_blob(replaced);
749        }
750
751        // Delete discarded blob sidecars if any, this doesnt do any IO.
752        if let Some(discarded) = meta.added.discarded_transactions() {
753            self.delete_discarded_blobs(discarded.iter());
754        }
755
756        // Notify pending transaction listeners
757        if let Some(pending) = meta.added.as_pending() {
758            self.on_new_pending_transaction(pending);
759        }
760
761        // Notify event listeners
762        self.notify_event_listeners(&meta.added);
763
764        // Notify new transaction listeners
765        self.on_new_transaction(meta.added.into_new_transaction_event());
766    }
767
768    /// Notify all listeners about a new pending transaction.
769    ///
770    /// See also [`Self::add_pending_listener`]
771    ///
772    /// CAUTION: This function is only intended to be used manually in order to use this type's
773    /// pending transaction receivers when manually implementing the
774    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
775    /// [`TransactionPool::pending_transactions_listener_for`](crate::TransactionPool).
776    pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
777        let mut needs_cleanup = false;
778
779        {
780            let listeners = self.pending_transaction_listener.read();
781            for listener in listeners.iter() {
782                if !listener.send_all(pending.pending_transactions(listener.kind)) {
783                    needs_cleanup = true;
784                }
785            }
786        }
787
788        // Clean up dead listeners if we detected any closed channels
789        if needs_cleanup {
790            self.pending_transaction_listener
791                .write()
792                .retain(|listener| !listener.sender.is_closed());
793        }
794    }
795
796    /// Notify all listeners about a newly inserted pending transaction.
797    ///
798    /// See also [`Self::add_new_transaction_listener`]
799    ///
800    /// CAUTION: This function is only intended to be used manually in order to use this type's
801    /// transaction receivers when manually implementing the
802    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
803    /// [`TransactionPool::new_transactions_listener_for`](crate::TransactionPool).
804    pub fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
805        let mut needs_cleanup = false;
806
807        {
808            let listeners = self.transaction_listener.read();
809            for listener in listeners.iter() {
810                if listener.kind.is_propagate_only() && !event.transaction.propagate {
811                    if listener.sender.is_closed() {
812                        needs_cleanup = true;
813                    }
814                    // Skip non-propagate transactions for propagate-only listeners
815                    continue
816                }
817
818                if !listener.send(event.clone()) {
819                    needs_cleanup = true;
820                }
821            }
822        }
823
824        // Clean up dead listeners if we detected any closed channels
825        if needs_cleanup {
826            self.transaction_listener.write().retain(|listener| !listener.sender.is_closed());
827        }
828    }
829
830    /// Notify all listeners about a blob sidecar for a newly inserted blob (eip4844) transaction.
831    fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecarVariant) {
832        let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
833        if sidecar_listeners.is_empty() {
834            return
835        }
836        let sidecar = Arc::new(sidecar.clone());
837        sidecar_listeners.retain_mut(|listener| {
838            let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
839            match listener.sender.try_send(new_blob_event) {
840                Ok(()) => true,
841                Err(err) => {
842                    if matches!(err, mpsc::error::TrySendError::Full(_)) {
843                        debug!(
844                            target: "txpool",
845                            "[{:?}] failed to send blob sidecar; channel full",
846                            sidecar,
847                        );
848                        true
849                    } else {
850                        false
851                    }
852                }
853            }
854        })
855    }
856
857    /// Notifies transaction listeners about changes once a block was processed.
858    fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
859        trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
860
861        // notify about promoted pending transactions - emit hashes
862        let mut needs_pending_cleanup = false;
863        {
864            let listeners = self.pending_transaction_listener.read();
865            for listener in listeners.iter() {
866                if !listener.send_all(outcome.pending_transactions(listener.kind)) {
867                    needs_pending_cleanup = true;
868                }
869            }
870        }
871        if needs_pending_cleanup {
872            self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
873        }
874
875        // emit full transactions
876        let mut needs_tx_cleanup = false;
877        {
878            let listeners = self.transaction_listener.read();
879            for listener in listeners.iter() {
880                if !listener.send_all(outcome.full_pending_transactions(listener.kind)) {
881                    needs_tx_cleanup = true;
882                }
883            }
884        }
885        if needs_tx_cleanup {
886            self.transaction_listener.write().retain(|l| !l.sender.is_closed());
887        }
888
889        let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
890
891        // broadcast specific transaction events
892        self.with_event_listener(|listener| {
893            for tx in &mined {
894                listener.mined(tx, block_hash);
895            }
896            for tx in &promoted {
897                listener.pending(tx.hash(), None);
898            }
899            for tx in &discarded {
900                listener.discarded(tx.hash());
901            }
902        })
903    }
904
905    /// Notifies all listeners about the transaction movements.
906    ///
907    /// This will emit events according to the provided changes.
908    ///
909    /// CAUTION: This function is only intended to be used manually in order to use this type's
910    /// [`TransactionEvents`] receivers when manually implementing the
911    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
912    /// [`TransactionPool::transaction_event_listener`](crate::TransactionPool).
913    pub fn notify_on_transaction_updates(
914        &self,
915        promoted: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
916        discarded: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
917    ) {
918        // Notify about promoted pending transactions (similar to notify_on_new_state)
919        if !promoted.is_empty() {
920            let mut needs_pending_cleanup = false;
921            {
922                let listeners = self.pending_transaction_listener.read();
923                for listener in listeners.iter() {
924                    let promoted_hashes = promoted.iter().filter_map(|tx| {
925                        if listener.kind.is_propagate_only() && !tx.propagate {
926                            None
927                        } else {
928                            Some(*tx.hash())
929                        }
930                    });
931                    if !listener.send_all(promoted_hashes) {
932                        needs_pending_cleanup = true;
933                    }
934                }
935            }
936            if needs_pending_cleanup {
937                self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
938            }
939
940            // in this case we should also emit promoted transactions in full
941            let mut needs_tx_cleanup = false;
942            {
943                let listeners = self.transaction_listener.read();
944                for listener in listeners.iter() {
945                    let promoted_txs = promoted.iter().filter_map(|tx| {
946                        if listener.kind.is_propagate_only() && !tx.propagate {
947                            None
948                        } else {
949                            Some(NewTransactionEvent::pending(tx.clone()))
950                        }
951                    });
952                    if !listener.send_all(promoted_txs) {
953                        needs_tx_cleanup = true;
954                    }
955                }
956            }
957            if needs_tx_cleanup {
958                self.transaction_listener.write().retain(|l| !l.sender.is_closed());
959            }
960        }
961
962        self.with_event_listener(|listener| {
963            for tx in &promoted {
964                listener.pending(tx.hash(), None);
965            }
966            for tx in &discarded {
967                listener.discarded(tx.hash());
968            }
969        });
970
971        if !discarded.is_empty() {
972            // This deletes outdated blob txs from the blob store, based on the account's nonce.
973            // This is called during txpool maintenance when the pool drifted.
974            self.delete_discarded_blobs(discarded.iter());
975        }
976    }
977
978    /// Fire events for the newly added transaction if there are any.
979    ///
980    /// See also [`Self::add_transaction_event_listener`].
981    ///
982    /// CAUTION: This function is only intended to be used manually in order to use this type's
983    /// [`TransactionEvents`] receivers when manually implementing the
984    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
985    /// [`TransactionPool::transaction_event_listener`](crate::TransactionPool).
986    pub fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
987        self.with_event_listener(|listener| match tx {
988            AddedTransaction::Pending(tx) => {
989                let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
990
991                listener.pending(transaction.hash(), replaced.clone());
992                for tx in promoted {
993                    listener.pending(tx.hash(), None);
994                }
995                for tx in discarded {
996                    listener.discarded(tx.hash());
997                }
998            }
999            AddedTransaction::Parked { transaction, replaced, queued_reason, .. } => {
1000                listener.queued(transaction.hash(), queued_reason.clone());
1001                if let Some(replaced) = replaced {
1002                    listener.replaced(replaced.clone(), *transaction.hash());
1003                }
1004            }
1005        });
1006    }
1007
1008    /// Returns an iterator that yields transactions that are ready to be included in the block.
1009    pub fn best_transactions(&self) -> BestTransactions<T> {
1010        self.get_pool_data().best_transactions()
1011    }
1012
1013    /// Returns an iterator that yields transactions that are ready to be included in the block with
1014    /// the given base fee and optional blob fee attributes.
1015    pub fn best_transactions_with_attributes(
1016        &self,
1017        best_transactions_attributes: BestTransactionsAttributes,
1018    ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
1019    {
1020        self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
1021    }
1022
1023    /// Returns only the first `max` transactions in the pending pool.
1024    pub fn pending_transactions_max(
1025        &self,
1026        max: usize,
1027    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1028        self.get_pool_data().pending_transactions_iter().take(max).collect()
1029    }
1030
1031    /// Returns all transactions from the pending sub-pool
1032    pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1033        self.get_pool_data().pending_transactions()
1034    }
1035
1036    /// Returns all transactions from parked pools
1037    pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1038        self.get_pool_data().queued_transactions()
1039    }
1040
1041    /// Returns all transactions in the pool
1042    pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
1043        let pool = self.get_pool_data();
1044        AllPoolTransactions {
1045            pending: pool.pending_transactions(),
1046            queued: pool.queued_transactions(),
1047        }
1048    }
1049
1050    /// Returns _all_ transactions in the pool
1051    pub fn all_transaction_hashes(&self) -> Vec<TxHash> {
1052        self.get_pool_data().all().transactions_iter().map(|tx| *tx.hash()).collect()
1053    }
1054
1055    /// Removes and returns all matching transactions from the pool.
1056    ///
1057    /// This behaves as if the transactions got discarded (_not_ mined), effectively introducing a
1058    /// nonce gap for the given transactions.
1059    pub fn remove_transactions(
1060        &self,
1061        hashes: Vec<TxHash>,
1062    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1063        if hashes.is_empty() {
1064            return Vec::new()
1065        }
1066        let removed = self.pool.write().remove_transactions(hashes);
1067
1068        self.with_event_listener(|listener| listener.discarded_many(&removed));
1069
1070        removed
1071    }
1072
1073    /// Removes and returns all matching transactions and their dependent transactions from the
1074    /// pool.
1075    pub fn remove_transactions_and_descendants(
1076        &self,
1077        hashes: Vec<TxHash>,
1078    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1079        if hashes.is_empty() {
1080            return Vec::new()
1081        }
1082        let removed = self.pool.write().remove_transactions_and_descendants(hashes);
1083
1084        self.with_event_listener(|listener| {
1085            for tx in &removed {
1086                listener.discarded(tx.hash());
1087            }
1088        });
1089
1090        removed
1091    }
1092
1093    /// Removes and returns all transactions by the specified sender from the pool.
1094    pub fn remove_transactions_by_sender(
1095        &self,
1096        sender: Address,
1097    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1098        let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1099        let removed = self.pool.write().remove_transactions_by_sender(sender_id);
1100
1101        self.with_event_listener(|listener| listener.discarded_many(&removed));
1102
1103        removed
1104    }
1105
1106    /// Prunes and returns all matching transactions from the pool.
1107    ///
1108    /// This removes the transactions as if they were mined: descendant transactions are **not**
1109    /// parked and remain eligible for inclusion.
1110    pub fn prune_transactions(
1111        &self,
1112        hashes: Vec<TxHash>,
1113    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1114        if hashes.is_empty() {
1115            return Vec::new()
1116        }
1117
1118        self.pool.write().prune_transactions(hashes)
1119    }
1120
1121    /// Retains only transactions that are not present in the pool.
1122    pub fn retain_unknown<A>(&self, announcement: &mut A)
1123    where
1124        A: HandleMempoolData,
1125    {
1126        if announcement.is_empty() {
1127            return
1128        }
1129        let pool = self.get_pool_data();
1130        announcement.retain_by_hash(|tx| !pool.contains(tx))
1131    }
1132
1133    /// Retains only transactions that are present in the pool.
1134    pub fn retain_contains<A>(&self, announcement: &mut A)
1135    where
1136        A: HandleMempoolData,
1137    {
1138        if announcement.is_empty() {
1139            return
1140        }
1141        let pool = self.get_pool_data();
1142        announcement.retain_by_hash(|tx| pool.contains(tx))
1143    }
1144
1145    /// Returns the transaction by hash.
1146    pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1147        self.get_pool_data().get(tx_hash)
1148    }
1149
1150    /// Returns all transactions of the address
1151    pub fn get_transactions_by_sender(
1152        &self,
1153        sender: Address,
1154    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1155        let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1156        self.get_pool_data().get_transactions_by_sender(sender_id)
1157    }
1158
1159    /// Returns a pending transaction sent by the given sender with the given nonce.
1160    pub fn get_pending_transaction_by_sender_and_nonce(
1161        &self,
1162        sender: Address,
1163        nonce: u64,
1164    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1165        let sender_id = self.sender_id(&sender)?;
1166        self.get_pool_data().get_pending_transaction_by_sender_and_nonce(sender_id, nonce)
1167    }
1168
1169    /// Returns all queued transactions of the address by sender
1170    pub fn get_queued_transactions_by_sender(
1171        &self,
1172        sender: Address,
1173    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1174        let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1175        self.get_pool_data().queued_txs_by_sender(sender_id)
1176    }
1177
1178    /// Returns all pending transactions filtered by predicate
1179    pub fn pending_transactions_with_predicate(
1180        &self,
1181        predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
1182    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1183        self.get_pool_data().pending_transactions_with_predicate(predicate)
1184    }
1185
1186    /// Returns all pending transactions of the address by sender
1187    pub fn get_pending_transactions_by_sender(
1188        &self,
1189        sender: Address,
1190    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1191        let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1192        self.get_pool_data().pending_txs_by_sender(sender_id)
1193    }
1194
1195    /// Returns the highest transaction of the address
1196    pub fn get_highest_transaction_by_sender(
1197        &self,
1198        sender: Address,
1199    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1200        let sender_id = self.sender_id(&sender)?;
1201        self.get_pool_data().get_highest_transaction_by_sender(sender_id)
1202    }
1203
1204    /// Returns the transaction with the highest nonce that is executable given the on chain nonce.
1205    pub fn get_highest_consecutive_transaction_by_sender(
1206        &self,
1207        sender: Address,
1208        on_chain_nonce: u64,
1209    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1210        let sender_id = self.sender_id(&sender)?;
1211        self.get_pool_data().get_highest_consecutive_transaction_by_sender(
1212            sender_id.into_transaction_id(on_chain_nonce),
1213        )
1214    }
1215
1216    /// Returns the transaction given a [`TransactionId`]
1217    pub fn get_transaction_by_transaction_id(
1218        &self,
1219        transaction_id: &TransactionId,
1220    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1221        self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
1222    }
1223
1224    /// Returns all transactions that where submitted with the given [`TransactionOrigin`]
1225    pub fn get_transactions_by_origin(
1226        &self,
1227        origin: TransactionOrigin,
1228    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1229        self.get_pool_data()
1230            .all()
1231            .transactions_iter()
1232            .filter(|tx| tx.origin == origin)
1233            .cloned()
1234            .collect()
1235    }
1236
1237    /// Returns all pending transactions filtered by [`TransactionOrigin`]
1238    pub fn get_pending_transactions_by_origin(
1239        &self,
1240        origin: TransactionOrigin,
1241    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1242        self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
1243    }
1244
1245    /// Returns all the transactions belonging to the hashes.
1246    ///
1247    /// If no transaction exists, it is skipped.
1248    pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1249        if txs.is_empty() {
1250            return Vec::new()
1251        }
1252        self.get_pool_data().get_all(txs).collect()
1253    }
1254
1255    /// Returns all the transactions belonging to the hashes that are propagatable.
1256    ///
1257    /// If no transaction exists, it is skipped.
1258    fn get_all_propagatable(
1259        &self,
1260        txs: &[TxHash],
1261    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1262        if txs.is_empty() {
1263            return Vec::new()
1264        }
1265        let pool = self.get_pool_data();
1266        txs.iter().filter_map(|tx| pool.get(tx).filter(|tx| tx.propagate)).collect()
1267    }
1268
1269    /// Notify about propagated transactions.
1270    pub fn on_propagated(&self, txs: PropagatedTransactions) {
1271        if txs.is_empty() {
1272            return
1273        }
1274        self.with_event_listener(|listener| {
1275            txs.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers));
1276        });
1277    }
1278
1279    /// Number of transactions in the entire pool
1280    pub fn len(&self) -> usize {
1281        self.get_pool_data().len()
1282    }
1283
1284    /// Whether the pool is empty
1285    pub fn is_empty(&self) -> bool {
1286        self.get_pool_data().is_empty()
1287    }
1288
1289    /// Returns whether or not the pool is over its configured size and transaction count limits.
1290    pub fn is_exceeded(&self) -> bool {
1291        self.pool.read().is_exceeded()
1292    }
1293
1294    /// Inserts a blob transaction into the blob store
1295    fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecarVariant) {
1296        debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
1297        if let Err(err) = self.blob_store.insert(hash, blob) {
1298            warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
1299            self.blob_store_metrics.blobstore_failed_inserts.increment(1);
1300        }
1301        self.update_blob_store_metrics();
1302    }
1303
1304    /// Delete a blob from the blob store
1305    pub fn delete_blob(&self, blob: TxHash) {
1306        let _ = self.blob_store.delete(blob);
1307    }
1308
1309    /// Delete all blobs from the blob store
1310    pub fn delete_blobs(&self, txs: Vec<TxHash>) {
1311        let _ = self.blob_store.delete_all(txs);
1312    }
1313
1314    /// Cleans up the blob store
1315    pub fn cleanup_blobs(&self) {
1316        let stat = self.blob_store.cleanup();
1317        self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
1318        self.update_blob_store_metrics();
1319    }
1320
1321    fn update_blob_store_metrics(&self) {
1322        if let Some(data_size) = self.blob_store.data_size_hint() {
1323            self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
1324        }
1325        self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
1326    }
1327
1328    /// Deletes all blob transactions that were discarded.
1329    fn delete_discarded_blobs<'a>(
1330        &'a self,
1331        transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
1332    ) {
1333        let blob_txs = transactions
1334            .into_iter()
1335            .filter(|tx| tx.transaction.is_eip4844())
1336            .map(|tx| *tx.hash())
1337            .collect();
1338        self.delete_blobs(blob_txs);
1339    }
1340}
1341
1342impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
1343    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1344        f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
1345    }
1346}
1347
1348/// Metadata for a transaction that was added to the pool.
1349///
1350/// This holds all the data needed to complete post-insertion operations (notifications,
1351/// blob storage).
1352#[derive(Debug)]
1353struct AddedTransactionMeta<T: PoolTransaction> {
1354    /// The transaction that was added to the pool
1355    added: AddedTransaction<T>,
1356    /// Optional blob sidecar for EIP-4844 transactions
1357    blob_sidecar: Option<BlobTransactionSidecarVariant>,
1358}
1359
1360/// Tracks an added transaction and all graph changes caused by adding it.
1361#[derive(Debug, Clone)]
1362pub struct AddedPendingTransaction<T: PoolTransaction> {
1363    /// Inserted transaction.
1364    pub transaction: Arc<ValidPoolTransaction<T>>,
1365    /// Replaced transaction.
1366    pub replaced: Option<Arc<ValidPoolTransaction<T>>>,
1367    /// transactions promoted to the pending queue
1368    pub promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1369    /// transactions that failed and became discarded
1370    pub discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1371}
1372
1373impl<T: PoolTransaction> AddedPendingTransaction<T> {
1374    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1375    /// [`TransactionListenerKind`].
1376    ///
1377    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1378    /// are allowed to be propagated are returned.
1379    pub(crate) fn pending_transactions(
1380        &self,
1381        kind: TransactionListenerKind,
1382    ) -> impl Iterator<Item = B256> + '_ {
1383        let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1384        PendingTransactionIter { kind, iter }
1385    }
1386}
1387
1388pub(crate) struct PendingTransactionIter<Iter> {
1389    kind: TransactionListenerKind,
1390    iter: Iter,
1391}
1392
1393impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1394where
1395    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1396    T: PoolTransaction + 'a,
1397{
1398    type Item = B256;
1399
1400    fn next(&mut self) -> Option<Self::Item> {
1401        loop {
1402            let next = self.iter.next()?;
1403            if self.kind.is_propagate_only() && !next.propagate {
1404                continue
1405            }
1406            return Some(*next.hash())
1407        }
1408    }
1409}
1410
1411/// An iterator over full pending transactions
1412pub(crate) struct FullPendingTransactionIter<Iter> {
1413    kind: TransactionListenerKind,
1414    iter: Iter,
1415}
1416
1417impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1418where
1419    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1420    T: PoolTransaction + 'a,
1421{
1422    type Item = NewTransactionEvent<T>;
1423
1424    fn next(&mut self) -> Option<Self::Item> {
1425        loop {
1426            let next = self.iter.next()?;
1427            if self.kind.is_propagate_only() && !next.propagate {
1428                continue
1429            }
1430            return Some(NewTransactionEvent {
1431                subpool: SubPool::Pending,
1432                transaction: next.clone(),
1433            })
1434        }
1435    }
1436}
1437
1438/// Represents a transaction that was added into the pool and its state
1439#[derive(Debug, Clone)]
1440pub enum AddedTransaction<T: PoolTransaction> {
1441    /// Transaction was successfully added and moved to the pending pool.
1442    Pending(AddedPendingTransaction<T>),
1443    /// Transaction was successfully added but not yet ready for processing and moved to a
1444    /// parked pool instead.
1445    Parked {
1446        /// Inserted transaction.
1447        transaction: Arc<ValidPoolTransaction<T>>,
1448        /// Replaced transaction.
1449        replaced: Option<Arc<ValidPoolTransaction<T>>>,
1450        /// The subpool it was moved to.
1451        subpool: SubPool,
1452        /// The specific reason why the transaction is queued (if applicable).
1453        queued_reason: Option<QueuedReason>,
1454    },
1455}
1456
1457impl<T: PoolTransaction> AddedTransaction<T> {
1458    /// Returns whether the transaction has been added to the pending pool.
1459    pub const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1460        match self {
1461            Self::Pending(tx) => Some(tx),
1462            _ => None,
1463        }
1464    }
1465
1466    /// Returns the replaced transaction if there was one
1467    pub const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1468        match self {
1469            Self::Pending(tx) => tx.replaced.as_ref(),
1470            Self::Parked { replaced, .. } => replaced.as_ref(),
1471        }
1472    }
1473
1474    /// Returns the discarded transactions if there were any
1475    pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1476        match self {
1477            Self::Pending(tx) => Some(&tx.discarded),
1478            Self::Parked { .. } => None,
1479        }
1480    }
1481
1482    /// Returns the hash of the replaced transaction if it is a blob transaction.
1483    pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1484        self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1485    }
1486
1487    /// Returns the hash of the transaction
1488    pub fn hash(&self) -> &TxHash {
1489        match self {
1490            Self::Pending(tx) => tx.transaction.hash(),
1491            Self::Parked { transaction, .. } => transaction.hash(),
1492        }
1493    }
1494
1495    /// Converts this type into the event type for listeners
1496    pub fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1497        match self {
1498            Self::Pending(tx) => {
1499                NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1500            }
1501            Self::Parked { transaction, subpool, .. } => {
1502                NewTransactionEvent { transaction, subpool }
1503            }
1504        }
1505    }
1506
1507    /// Returns the subpool this transaction was added to
1508    pub(crate) const fn subpool(&self) -> SubPool {
1509        match self {
1510            Self::Pending(_) => SubPool::Pending,
1511            Self::Parked { subpool, .. } => *subpool,
1512        }
1513    }
1514
1515    /// Returns the [`TransactionId`] of the added transaction
1516    #[cfg(test)]
1517    pub(crate) fn id(&self) -> &TransactionId {
1518        match self {
1519            Self::Pending(added) => added.transaction.id(),
1520            Self::Parked { transaction, .. } => transaction.id(),
1521        }
1522    }
1523
1524    /// Returns the queued reason if the transaction is parked with a queued reason.
1525    pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1526        match self {
1527            Self::Pending(_) => None,
1528            Self::Parked { queued_reason, .. } => queued_reason.as_ref(),
1529        }
1530    }
1531
1532    /// Returns the transaction state based on the subpool and queued reason.
1533    pub fn transaction_state(&self) -> AddedTransactionState {
1534        match self.subpool() {
1535            SubPool::Pending => AddedTransactionState::Pending,
1536            _ => {
1537                // For non-pending transactions, use the queued reason directly from the
1538                // AddedTransaction
1539                if let Some(reason) = self.queued_reason() {
1540                    AddedTransactionState::Queued(reason.clone())
1541                } else {
1542                    // Fallback - this shouldn't happen with the new implementation
1543                    AddedTransactionState::Queued(QueuedReason::NonceGap)
1544                }
1545            }
1546        }
1547    }
1548}
1549
1550/// The specific reason why a transaction is queued (not ready for execution)
1551#[derive(Debug, Clone, PartialEq, Eq)]
1552pub enum QueuedReason {
1553    /// Transaction has a nonce gap - missing prior transactions
1554    NonceGap,
1555    /// Transaction has parked ancestors - waiting for other transactions to be mined
1556    ParkedAncestors,
1557    /// Sender has insufficient balance to cover the transaction cost
1558    InsufficientBalance,
1559    /// Transaction exceeds the block gas limit
1560    TooMuchGas,
1561    /// Transaction doesn't meet the base fee requirement
1562    InsufficientBaseFee,
1563    /// Transaction doesn't meet the blob fee requirement (EIP-4844)
1564    InsufficientBlobFee,
1565}
1566
1567/// The state of a transaction when is was added to the pool
1568#[derive(Debug, Clone, PartialEq, Eq)]
1569pub enum AddedTransactionState {
1570    /// Ready for execution
1571    Pending,
1572    /// Not ready for execution due to a specific condition
1573    Queued(QueuedReason),
1574}
1575
1576impl AddedTransactionState {
1577    /// Returns whether the transaction was submitted as queued.
1578    pub const fn is_queued(&self) -> bool {
1579        matches!(self, Self::Queued(_))
1580    }
1581
1582    /// Returns whether the transaction was submitted as pending.
1583    pub const fn is_pending(&self) -> bool {
1584        matches!(self, Self::Pending)
1585    }
1586
1587    /// Returns the specific queued reason if the transaction is queued.
1588    pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1589        match self {
1590            Self::Queued(reason) => Some(reason),
1591            Self::Pending => None,
1592        }
1593    }
1594}
1595
1596/// The outcome of a successful transaction addition
1597#[derive(Debug, Clone, PartialEq, Eq)]
1598pub struct AddedTransactionOutcome {
1599    /// The hash of the transaction
1600    pub hash: TxHash,
1601    /// The state of the transaction
1602    pub state: AddedTransactionState,
1603}
1604
1605impl AddedTransactionOutcome {
1606    /// Returns whether the transaction was submitted as queued.
1607    pub const fn is_queued(&self) -> bool {
1608        self.state.is_queued()
1609    }
1610
1611    /// Returns whether the transaction was submitted as pending.
1612    pub const fn is_pending(&self) -> bool {
1613        self.state.is_pending()
1614    }
1615}
1616
1617/// Contains all state changes after a [`CanonicalStateUpdate`] was processed
1618#[derive(Debug)]
1619pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1620    /// Hash of the block.
1621    pub(crate) block_hash: B256,
1622    /// All mined transactions.
1623    pub(crate) mined: Vec<TxHash>,
1624    /// Transactions promoted to the pending pool.
1625    pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1626    /// transaction that were discarded during the update
1627    pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1628}
1629
1630impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1631    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1632    /// [`TransactionListenerKind`].
1633    ///
1634    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1635    /// are allowed to be propagated are returned.
1636    pub(crate) fn pending_transactions(
1637        &self,
1638        kind: TransactionListenerKind,
1639    ) -> impl Iterator<Item = B256> + '_ {
1640        let iter = self.promoted.iter();
1641        PendingTransactionIter { kind, iter }
1642    }
1643
1644    /// Returns all FULL transactions that were promoted to the pending pool and adhere to the given
1645    /// [`TransactionListenerKind`].
1646    ///
1647    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1648    /// are allowed to be propagated are returned.
1649    pub(crate) fn full_pending_transactions(
1650        &self,
1651        kind: TransactionListenerKind,
1652    ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1653        let iter = self.promoted.iter();
1654        FullPendingTransactionIter { kind, iter }
1655    }
1656}
1657
1658#[cfg(test)]
1659mod tests {
1660    use crate::{
1661        blobstore::{BlobStore, InMemoryBlobStore},
1662        identifier::SenderId,
1663        test_utils::{MockTransaction, TestPoolBuilder},
1664        validate::ValidTransaction,
1665        BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1666    };
1667    use alloy_eips::{eip4844::BlobTransactionSidecar, eip7594::BlobTransactionSidecarVariant};
1668    use alloy_primitives::Address;
1669    use std::{fs, path::PathBuf};
1670
1671    #[test]
1672    fn test_discard_blobs_on_blob_tx_eviction() {
1673        let blobs = {
1674            // Read the contents of the JSON file into a string.
1675            let json_content = fs::read_to_string(
1676                PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1677            )
1678            .expect("Failed to read the blob data file");
1679
1680            // Parse the JSON contents into a serde_json::Value.
1681            let json_value: serde_json::Value =
1682                serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1683
1684            // Extract blob data from JSON and convert it to Blob.
1685            vec![
1686                // Extract the "data" field from the JSON and parse it as a string.
1687                json_value
1688                    .get("data")
1689                    .unwrap()
1690                    .as_str()
1691                    .expect("Data is not a valid string")
1692                    .to_string(),
1693            ]
1694        };
1695
1696        // Generate a BlobTransactionSidecar from the blobs.
1697        let sidecar = BlobTransactionSidecarVariant::Eip4844(
1698            BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap(),
1699        );
1700
1701        // Define the maximum limit for blobs in the sub-pool.
1702        let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1703
1704        // Create a test pool with default configuration and the specified blob limit.
1705        let test_pool = &TestPoolBuilder::default()
1706            .with_config(PoolConfig { blob_limit, ..Default::default() })
1707            .pool;
1708
1709        // Set the block info for the pool, including a pending blob fee.
1710        test_pool
1711            .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1712
1713        // Create an in-memory blob store.
1714        let blob_store = InMemoryBlobStore::default();
1715
1716        // Loop to add transactions to the pool and test blob eviction.
1717        for n in 0..blob_limit.max_txs + 10 {
1718            // Create a mock transaction with the generated blob sidecar.
1719            let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1720
1721            // Set non zero size
1722            tx.set_size(1844674407370951);
1723
1724            // Insert the sidecar into the blob store if the current index is within the blob limit.
1725            if n < blob_limit.max_txs {
1726                blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1727            }
1728
1729            // Add the transaction to the pool with external origin and valid outcome.
1730            test_pool.add_transactions(
1731                TransactionOrigin::External,
1732                [TransactionValidationOutcome::Valid {
1733                    balance: U256::from(1_000),
1734                    state_nonce: 0,
1735                    bytecode_hash: None,
1736                    transaction: ValidTransaction::ValidWithSidecar {
1737                        transaction: tx,
1738                        sidecar: sidecar.clone(),
1739                    },
1740                    propagate: true,
1741                    authorities: None,
1742                }],
1743            );
1744        }
1745
1746        // Assert that the size of the pool's blob component is equal to the maximum blob limit.
1747        assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1748
1749        // Assert that the size of the pool's blob_size component matches the expected value.
1750        assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1751
1752        // Assert that the pool's blob store matches the expected blob store.
1753        assert_eq!(*test_pool.blob_store(), blob_store);
1754    }
1755
1756    #[test]
1757    fn test_auths_stored_in_identifiers() {
1758        // Create a test pool with default configuration.
1759        let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1760
1761        let auth = Address::new([1; 20]);
1762        let tx = MockTransaction::eip7702();
1763
1764        test_pool.add_transactions(
1765            TransactionOrigin::Local,
1766            [TransactionValidationOutcome::Valid {
1767                balance: U256::from(1_000),
1768                state_nonce: 0,
1769                bytecode_hash: None,
1770                transaction: ValidTransaction::Valid(tx),
1771                propagate: true,
1772                authorities: Some(vec![auth]),
1773            }],
1774        );
1775
1776        let identifiers = test_pool.identifiers.read();
1777        assert_eq!(identifiers.sender_id(&auth), Some(SenderId::from(1)));
1778    }
1779
1780    #[test]
1781    fn sender_queries_do_not_allocate_ids_for_unknown_addresses() {
1782        let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1783        let sender = Address::new([9; 20]);
1784
1785        assert_eq!(test_pool.sender_id(&sender), None);
1786        assert!(test_pool.get_transactions_by_sender(sender).is_empty());
1787        assert!(test_pool.get_pending_transaction_by_sender_and_nonce(sender, 0).is_none());
1788        assert!(test_pool.get_queued_transactions_by_sender(sender).is_empty());
1789        assert!(test_pool.get_pending_transactions_by_sender(sender).is_empty());
1790        assert!(test_pool.get_highest_transaction_by_sender(sender).is_none());
1791        assert!(test_pool.get_highest_consecutive_transaction_by_sender(sender, 0).is_none());
1792        assert!(test_pool.remove_transactions_by_sender(sender).is_empty());
1793        assert_eq!(test_pool.sender_id(&sender), None);
1794    }
1795}