Skip to main content

reth_transaction_pool/pool/
mod.rs

1//! Transaction Pool internals.
2//!
3//! Incoming transactions are validated before they enter the pool first. The validation outcome can
4//! have 3 states:
5//!
6//!  1. Transaction can _never_ be valid
7//!  2. Transaction is _currently_ valid
8//!  3. Transaction is _currently_ invalid, but could potentially become valid in the future
9//!
10//! However, (2.) and (3.) of a transaction can only be determined on the basis of the current
11//! state, whereas (1.) holds indefinitely. This means once the state changes (2.) and (3.) the
12//! state of a transaction needs to be reevaluated again.
13//!
14//! The transaction pool is responsible for storing new, valid transactions and providing the next
15//! best transactions sorted by their priority. Where priority is determined by the transaction's
16//! score ([`TransactionOrdering`]).
17//!
18//! Furthermore, the following characteristics fall under (3.):
19//!
20//!  a) Nonce of a transaction is higher than the expected nonce for the next transaction of its
21//! sender. A distinction is made here whether multiple transactions from the same sender have
22//! gapless nonce increments.
23//!
24//!  a)(1) If _no_ transaction is missing in a chain of multiple
25//! transactions from the same sender (all nonce in row), all of them can in principle be executed
26//! on the current state one after the other.
27//!
28//!  a)(2) If there's a nonce gap, then all
29//! transactions after the missing transaction are blocked until the missing transaction arrives.
30//!
31//!  b) Transaction does not meet the dynamic fee cap requirement introduced by EIP-1559: The
32//! fee cap of the transaction needs to be no less than the base fee of block.
33//!
34//!
35//! In essence the transaction pool is made of three separate sub-pools:
36//!
37//!  - Pending Pool: Contains all transactions that are valid on the current state and satisfy (3.
38//!    a)(1): _No_ nonce gaps. A _pending_ transaction is considered _ready_ when it has the lowest
39//!    nonce of all transactions from the same sender. Once a _ready_ transaction with nonce `n` has
40//!    been executed, the next highest transaction from the same sender `n + 1` becomes ready.
41//!
42//!  - Queued Pool: Contains all transactions that are currently blocked by missing transactions:
43//!    (3. a)(2): _With_ nonce gaps or due to lack of funds.
44//!
45//!  - Basefee Pool: To account for the dynamic base fee requirement (3. b) which could render an
46//!    EIP-1559 and all subsequent transactions of the sender currently invalid.
47//!
48//! The classification of transactions is always dependent on the current state that is changed as
49//! soon as a new block is mined. Once a new block is mined, the account changeset must be applied
50//! to the transaction pool.
51//!
52//!
53//! Depending on the use case, consumers of the [`TransactionPool`](crate::traits::TransactionPool)
54//! are interested in (2.) and/or (3.).
55
56//! A generic [`TransactionPool`](crate::traits::TransactionPool) that only handles transactions.
57//!
58//! This Pool maintains two separate sub-pools for (2.) and (3.)
59//!
60//! ## Terminology
61//!
62//!  - _Pending_: pending transactions are transactions that fall under (2.). These transactions can
63//!    currently be executed and are stored in the pending sub-pool
64//!  - _Queued_: queued transactions are transactions that fall under category (3.). Those
65//!    transactions are _currently_ waiting for state changes that eventually move them into
66//!    category (2.) and become pending.
67
68use crate::{
69    blobstore::BlobStore,
70    error::{PoolError, PoolErrorKind, PoolResult},
71    identifier::{SenderId, SenderIdentifiers, TransactionId},
72    metrics::BlobStoreMetrics,
73    pool::{
74        listener::{
75            BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76            TransactionListener,
77        },
78        state::SubPool,
79        txpool::{SenderInfo, TxPool},
80        update::UpdateOutcome,
81    },
82    traits::{
83        AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84        NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85    },
86    validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87    CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88    TransactionValidator,
89};
90
91use alloy_primitives::{
92    map::{AddressSet, HashSet},
93    Address, TxHash, B256,
94};
95use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
96use reth_eth_wire_types::HandleMempoolData;
97use reth_execution_types::ChangedAccount;
98
99use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718};
100use reth_primitives_traits::Recovered;
101use rustc_hash::FxHashMap;
102use std::{
103    fmt,
104    sync::{
105        atomic::{AtomicBool, Ordering},
106        Arc,
107    },
108    time::Instant,
109};
110use tokio::sync::mpsc;
111use tracing::{debug, trace, warn};
112mod events;
113pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
114pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
115pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
116pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
117pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
118pub use pending::PendingPool;
119
120mod best;
121pub use best::BestTransactions;
122
123mod blob;
124pub mod listener;
125mod parked;
126pub mod pending;
127pub mod size;
128pub(crate) mod state;
129pub mod txpool;
130mod update;
131
132/// Bound on number of pending transactions from `reth_network::TransactionsManager` to buffer.
133pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
134/// Bound on number of new transactions from `reth_network::TransactionsManager` to buffer.
135pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
136
137const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
138
139/// Transaction pool internals.
140pub struct PoolInner<V, T, S>
141where
142    T: TransactionOrdering,
143{
144    /// Internal mapping of addresses to plain ints.
145    identifiers: RwLock<SenderIdentifiers>,
146    /// Transaction validator.
147    validator: V,
148    /// Storage for blob transactions
149    blob_store: S,
150    /// The internal pool that manages all transactions.
151    pool: RwLock<TxPool<T>>,
152    /// Pool settings.
153    config: PoolConfig,
154    /// Manages listeners for transaction state change events.
155    event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
156    /// Tracks whether any event listeners have ever been installed.
157    has_event_listeners: AtomicBool,
158    /// Listeners for new _full_ pending transactions.
159    pending_transaction_listener: RwLock<Vec<PendingTransactionHashListener>>,
160    /// Listeners for new transactions added to the pool.
161    transaction_listener: RwLock<Vec<TransactionListener<T::Transaction>>>,
162    /// Listener for new blob transaction sidecars added to the pool.
163    blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
164    /// Metrics for the blob store
165    blob_store_metrics: BlobStoreMetrics,
166}
167
168// === impl PoolInner ===
169
170impl<V, T, S> PoolInner<V, T, S>
171where
172    V: TransactionValidator,
173    T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
174    S: BlobStore,
175{
176    /// Create a new transaction pool instance.
177    pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
178        Self {
179            identifiers: Default::default(),
180            validator,
181            event_listener: Default::default(),
182            has_event_listeners: AtomicBool::new(false),
183            pool: RwLock::new(TxPool::new(ordering, config.clone())),
184            pending_transaction_listener: Default::default(),
185            transaction_listener: Default::default(),
186            blob_transaction_sidecar_listener: Default::default(),
187            config,
188            blob_store,
189            blob_store_metrics: Default::default(),
190        }
191    }
192
193    /// Returns the configured blob store.
194    pub const fn blob_store(&self) -> &S {
195        &self.blob_store
196    }
197
198    /// Returns stats about the size of the pool.
199    pub fn size(&self) -> PoolSize {
200        self.get_pool_data().size()
201    }
202
203    /// Returns the currently tracked block
204    pub fn block_info(&self) -> BlockInfo {
205        self.get_pool_data().block_info()
206    }
207    /// Sets the currently tracked block.
208    ///
209    /// This will also notify subscribers about any transactions that were promoted to the pending
210    /// pool due to fee changes.
211    pub fn set_block_info(&self, info: BlockInfo) {
212        let outcome = self.pool.write().set_block_info(info);
213
214        // Notify subscribers about promoted transactions due to fee changes
215        self.notify_on_transaction_updates(outcome.promoted, outcome.discarded);
216    }
217
218    /// Returns the internal [`SenderId`] for this address, allocating a new mapping when the
219    /// address is first observed.
220    ///
221    /// This must only be used on paths that intentionally begin tracking a sender, such as
222    /// transaction insertion. Read-only lookups should prefer [`Self::sender_id`] to avoid
223    /// growing the sender-id map for unknown addresses.
224    pub fn get_sender_id(&self, addr: Address) -> SenderId {
225        self.identifiers.write().sender_id_or_create(addr)
226    }
227
228    /// Returns the internal [`SenderId`] for this address if it is already tracked.
229    ///
230    /// Unlike [`Self::get_sender_id`], this never allocates a new sender mapping and is therefore
231    /// suitable for read-only queries or best-effort cleanup on unknown addresses.
232    pub fn sender_id(&self, addr: &Address) -> Option<SenderId> {
233        self.identifiers.read().sender_id(addr)
234    }
235
236    /// Returns the internal [`SenderId`]s for the given addresses.
237    pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
238        self.identifiers.write().sender_ids_or_create(addrs)
239    }
240
241    /// Returns all senders in the pool
242    pub fn unique_senders(&self) -> AddressSet {
243        self.get_pool_data().unique_senders()
244    }
245
246    /// Converts the changed accounts to a map of sender ids to sender info (internal identifier
247    /// used for __tracked__ accounts)
248    fn changed_senders(
249        &self,
250        accs: impl Iterator<Item = ChangedAccount>,
251    ) -> FxHashMap<SenderId, SenderInfo> {
252        let identifiers = self.identifiers.read();
253        accs.into_iter()
254            .filter_map(|acc| {
255                let ChangedAccount { address, nonce, balance } = acc;
256                let sender_id = identifiers.sender_id(&address)?;
257                Some((sender_id, SenderInfo { state_nonce: nonce, balance }))
258            })
259            .collect()
260    }
261
262    /// Get the config the pool was configured with.
263    pub const fn config(&self) -> &PoolConfig {
264        &self.config
265    }
266
267    /// Get the validator reference.
268    pub const fn validator(&self) -> &V {
269        &self.validator
270    }
271
272    /// Adds a new transaction listener to the pool that gets notified about every new _pending_
273    /// transaction inserted into the pool
274    pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
275        let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
276        let listener = PendingTransactionHashListener { sender, kind };
277
278        let mut listeners = self.pending_transaction_listener.write();
279        // Clean up dead listeners before adding new one
280        listeners.retain(|l| !l.sender.is_closed());
281        listeners.push(listener);
282
283        rx
284    }
285
286    /// Adds a new transaction listener to the pool that gets notified about every new transaction.
287    pub fn add_new_transaction_listener(
288        &self,
289        kind: TransactionListenerKind,
290    ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
291        let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
292        let listener = TransactionListener { sender, kind };
293
294        let mut listeners = self.transaction_listener.write();
295        // Clean up dead listeners before adding new one
296        listeners.retain(|l| !l.sender.is_closed());
297        listeners.push(listener);
298
299        rx
300    }
301    /// Adds a new blob sidecar listener to the pool that gets notified about every new
302    /// eip4844 transaction's blob sidecar.
303    pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
304        let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
305        let listener = BlobTransactionSidecarListener { sender };
306        self.blob_transaction_sidecar_listener.lock().push(listener);
307        rx
308    }
309
310    /// If the pool contains the transaction, this adds a new listener that gets notified about
311    /// transaction events.
312    pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
313        if !self.get_pool_data().contains(&tx_hash) {
314            return None
315        }
316        let mut listener = self.event_listener.write();
317        let events = listener.subscribe(tx_hash);
318        self.mark_event_listener_installed();
319        Some(events)
320    }
321
322    /// Adds a listener for all transaction events.
323    pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
324        let mut listener = self.event_listener.write();
325        let events = listener.subscribe_all();
326        self.mark_event_listener_installed();
327        events
328    }
329
330    #[inline]
331    fn has_event_listeners(&self) -> bool {
332        self.has_event_listeners.load(Ordering::Relaxed)
333    }
334
335    #[inline]
336    fn mark_event_listener_installed(&self) {
337        self.has_event_listeners.store(true, Ordering::Relaxed);
338    }
339
340    #[inline]
341    fn update_event_listener_state(&self, listener: &PoolEventBroadcast<T::Transaction>) {
342        if listener.is_empty() {
343            self.has_event_listeners.store(false, Ordering::Relaxed);
344        }
345    }
346
347    #[inline]
348    fn with_event_listener<F>(&self, emit: F)
349    where
350        F: FnOnce(&mut PoolEventBroadcast<T::Transaction>),
351    {
352        if !self.has_event_listeners() {
353            return
354        }
355        let mut listener = self.event_listener.write();
356        if !listener.is_empty() {
357            emit(&mut listener);
358        }
359        self.update_event_listener_state(&listener);
360    }
361
362    /// Returns a read lock to the pool's data.
363    pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
364        self.pool.read()
365    }
366
367    /// Returns transactions in the pool that can be propagated
368    pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
369        let mut out = Vec::new();
370        self.append_pooled_transactions(&mut out);
371        out
372    }
373
374    /// Returns hashes of transactions in the pool that can be propagated.
375    pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
376        let mut out = Vec::new();
377        self.append_pooled_transactions_hashes(&mut out);
378        out
379    }
380
381    /// Returns only the first `max` transactions in the pool that can be propagated.
382    pub fn pooled_transactions_max(
383        &self,
384        max: usize,
385    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
386        if max == 0 {
387            return Vec::new()
388        }
389
390        let pool = self.get_pool_data();
391        let mut out = Vec::with_capacity(max.min(pool.all().len()));
392        out.extend(pool.all().transactions_iter().filter(|tx| tx.propagate).take(max).cloned());
393        out
394    }
395
396    /// Extends the given vector with all transactions in the pool that can be propagated.
397    pub fn append_pooled_transactions(
398        &self,
399        out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
400    ) {
401        out.extend(
402            self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned(),
403        );
404    }
405
406    /// Extends the given vector with pooled transactions for the given hashes that are allowed to
407    /// be propagated.
408    pub fn append_pooled_transaction_elements(
409        &self,
410        tx_hashes: &[TxHash],
411        limit: GetPooledTransactionLimit,
412        out: &mut Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>,
413    ) where
414        <V as TransactionValidator>::Transaction: EthPoolTransaction,
415    {
416        let transactions = self.get_all_propagatable(tx_hashes);
417        let mut size = 0;
418        for transaction in transactions {
419            let encoded_len = transaction.encoded_length();
420            let Some(pooled) = self.to_pooled_transaction(transaction) else {
421                continue;
422            };
423
424            size += encoded_len;
425            out.push(pooled.into_inner());
426
427            if limit.exceeds(size) {
428                break
429            }
430        }
431    }
432
433    /// Extends the given vector with the hashes of all transactions in the pool that can be
434    /// propagated.
435    pub fn append_pooled_transactions_hashes(&self, out: &mut Vec<TxHash>) {
436        out.extend(
437            self.get_pool_data()
438                .all()
439                .transactions_iter()
440                .filter(|tx| tx.propagate)
441                .map(|tx| *tx.hash()),
442        );
443    }
444
445    /// Extends the given vector with only the first `max` transactions in the pool that can be
446    /// propagated.
447    pub fn append_pooled_transactions_max(
448        &self,
449        max: usize,
450        out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
451    ) {
452        out.extend(
453            self.get_pool_data()
454                .all()
455                .transactions_iter()
456                .filter(|tx| tx.propagate)
457                .take(max)
458                .cloned(),
459        );
460    }
461
462    /// Returns only the first `max` hashes of transactions in the pool that can be propagated.
463    pub fn pooled_transactions_hashes_max(&self, max: usize) -> Vec<TxHash> {
464        if max == 0 {
465            return Vec::new();
466        }
467
468        let pool = self.get_pool_data();
469        let mut out = Vec::with_capacity(max.min(pool.all().len()));
470        out.extend(
471            pool.all().transactions_iter().filter(|tx| tx.propagate).take(max).map(|tx| *tx.hash()),
472        );
473        out
474    }
475
476    /// Converts the internally tracked transaction to the pooled format.
477    ///
478    /// If the transaction is an EIP-4844 transaction, the blob sidecar is fetched from the blob
479    /// store and attached to the transaction.
480    fn to_pooled_transaction(
481        &self,
482        transaction: Arc<ValidPoolTransaction<T::Transaction>>,
483    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
484    where
485        <V as TransactionValidator>::Transaction: EthPoolTransaction,
486    {
487        if transaction.is_eip4844() {
488            let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
489            transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
490        } else {
491            transaction
492                .transaction
493                .clone_into_pooled()
494                .inspect_err(|err| {
495                    debug!(
496                        target: "txpool", %err,
497                        "failed to convert transaction to pooled element; skipping",
498                    );
499                })
500                .ok()
501        }
502    }
503
504    /// Returns pooled transactions for the given transaction hashes that are allowed to be
505    /// propagated.
506    pub fn get_pooled_transaction_elements(
507        &self,
508        tx_hashes: Vec<TxHash>,
509        limit: GetPooledTransactionLimit,
510    ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
511    where
512        <V as TransactionValidator>::Transaction: EthPoolTransaction,
513    {
514        let mut elements = Vec::new();
515        self.append_pooled_transaction_elements(&tx_hashes, limit, &mut elements);
516        elements.shrink_to_fit();
517        elements
518    }
519
520    /// Returns converted pooled transaction for the given transaction hash.
521    pub fn get_pooled_transaction_element(
522        &self,
523        tx_hash: TxHash,
524    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
525    where
526        <V as TransactionValidator>::Transaction: EthPoolTransaction,
527    {
528        self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
529    }
530
531    /// Updates the entire pool after a new block was executed.
532    pub fn on_canonical_state_change(&self, update: CanonicalStateUpdate<'_, V::Block>) {
533        trace!(target: "txpool", ?update, "updating pool on canonical state change");
534
535        let block_info = update.block_info();
536        let CanonicalStateUpdate {
537            new_tip, changed_accounts, mined_transactions, update_kind, ..
538        } = update;
539        self.validator.on_new_head_block(new_tip);
540
541        let changed_senders = self.changed_senders(changed_accounts.into_iter());
542
543        // update the pool
544        let outcome = self.pool.write().on_canonical_state_change(
545            block_info,
546            mined_transactions,
547            changed_senders,
548            update_kind,
549        );
550
551        // This will discard outdated transactions based on the account's nonce
552        self.delete_discarded_blobs(outcome.discarded.iter());
553
554        // notify listeners about updates
555        self.notify_on_new_state(outcome);
556    }
557
558    /// Performs account updates on the pool.
559    ///
560    /// This will either promote or discard transactions based on the new account state.
561    ///
562    /// This should be invoked when the pool drifted and accounts are updated manually
563    pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
564        let changed_senders = self.changed_senders(accounts.into_iter());
565        let UpdateOutcome { promoted, discarded } =
566            self.pool.write().update_accounts(changed_senders);
567
568        self.notify_on_transaction_updates(promoted, discarded);
569    }
570
571    /// Add a single validated transaction into the pool.
572    ///
573    /// Returns the outcome and optionally metadata to be processed after the pool lock is
574    /// released.
575    ///
576    /// Note: this is only used internally by [`Self::add_transactions()`], all new transaction(s)
577    /// come in through that function, either as a batch or `std::iter::once`.
578    fn add_transaction(
579        &self,
580        pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
581        origin: TransactionOrigin,
582        tx: TransactionValidationOutcome<T::Transaction>,
583    ) -> (PoolResult<AddedTransactionOutcome>, Option<AddedTransactionMeta<T::Transaction>>) {
584        match tx {
585            TransactionValidationOutcome::Valid {
586                balance,
587                state_nonce,
588                transaction,
589                propagate,
590                bytecode_hash,
591                authorities,
592            } => {
593                let sender_id = self.get_sender_id(transaction.sender());
594                let transaction_id = TransactionId::new(sender_id, transaction.nonce());
595
596                // split the valid transaction and the blob sidecar if it has any
597                let (transaction, blob_sidecar) = match transaction {
598                    ValidTransaction::Valid(tx) => (tx, None),
599                    ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
600                        debug_assert!(
601                            transaction.is_eip4844(),
602                            "validator returned sidecar for non EIP-4844 transaction"
603                        );
604                        (transaction, Some(sidecar))
605                    }
606                };
607
608                let tx = ValidPoolTransaction {
609                    transaction,
610                    transaction_id,
611                    propagate,
612                    timestamp: Instant::now(),
613                    origin,
614                    authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
615                };
616
617                let added = match pool.add_transaction(tx, balance, state_nonce, bytecode_hash) {
618                    Ok(added) => added,
619                    Err(err) => return (Err(err), None),
620                };
621                let hash = *added.hash();
622                let state = added.transaction_state();
623
624                let meta = AddedTransactionMeta { added, blob_sidecar };
625
626                (Ok(AddedTransactionOutcome { hash, state }), Some(meta))
627            }
628            TransactionValidationOutcome::Invalid(tx, err) => {
629                self.with_event_listener(|listener| listener.invalid(tx.hash()));
630                (Err(PoolError::new(*tx.hash(), err)), None)
631            }
632            TransactionValidationOutcome::Error(tx_hash, err) => {
633                self.with_event_listener(|listener| listener.discarded(&tx_hash));
634                (Err(PoolError::other(tx_hash, err)), None)
635            }
636        }
637    }
638
639    /// Adds a transaction and returns the event stream.
640    pub fn add_transaction_and_subscribe(
641        &self,
642        origin: TransactionOrigin,
643        tx: TransactionValidationOutcome<T::Transaction>,
644    ) -> PoolResult<TransactionEvents> {
645        let listener = {
646            let mut listener = self.event_listener.write();
647            let events = listener.subscribe(tx.tx_hash());
648            self.mark_event_listener_installed();
649            events
650        };
651        let mut results = self.add_transactions(origin, std::iter::once(tx));
652        results.pop().expect("result length is the same as the input")?;
653        Ok(listener)
654    }
655
656    /// Adds all transactions in the iterator to the pool, returning a list of results.
657    ///
658    /// Convenience method that assigns the same origin to all transactions. Delegates to
659    /// [`Self::add_transactions_with_origins`].
660    pub fn add_transactions(
661        &self,
662        origin: TransactionOrigin,
663        transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
664    ) -> Vec<PoolResult<AddedTransactionOutcome>> {
665        self.add_transactions_with_origins(transactions.into_iter().map(|tx| (origin, tx)))
666    }
667
668    /// Adds all transactions in the iterator to the pool, each with its own
669    /// [`TransactionOrigin`], returning a list of results.
670    pub fn add_transactions_with_origins(
671        &self,
672        transactions: impl IntoIterator<
673            Item = (TransactionOrigin, TransactionValidationOutcome<T::Transaction>),
674        >,
675    ) -> Vec<PoolResult<AddedTransactionOutcome>> {
676        // Collect results and metadata while holding the pool write lock
677        let (mut results, added_metas, discarded) = {
678            let mut pool = self.pool.write();
679            let mut added_metas = Vec::new();
680
681            let results = transactions
682                .into_iter()
683                .map(|(origin, tx)| {
684                    let (result, meta) = self.add_transaction(&mut pool, origin, tx);
685
686                    // Only collect metadata for successful insertions
687                    if result.is_ok() &&
688                        let Some(meta) = meta
689                    {
690                        added_metas.push(meta);
691                    }
692
693                    result
694                })
695                .collect::<Vec<_>>();
696
697            // Enforce the pool size limits if at least one transaction was added successfully
698            let discarded = if results.iter().any(Result::is_ok) {
699                let discarded = pool.discard_worst();
700                pool.update_size_metrics();
701                discarded
702            } else {
703                Default::default()
704            };
705
706            (results, added_metas, discarded)
707        };
708
709        for meta in added_metas {
710            self.on_added_transaction(meta);
711        }
712
713        if !discarded.is_empty() {
714            // Delete any blobs associated with discarded blob transactions
715            self.delete_discarded_blobs(discarded.iter());
716            self.with_event_listener(|listener| listener.discarded_many(&discarded));
717
718            let discarded_hashes =
719                discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
720
721            // A newly added transaction may be immediately discarded, so we need to
722            // adjust the result here
723            for res in &mut results {
724                if let Ok(AddedTransactionOutcome { hash, .. }) = res &&
725                    discarded_hashes.contains(hash)
726                {
727                    *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
728                }
729            }
730        };
731
732        results
733    }
734
735    /// Process a transaction that was added to the pool.
736    ///
737    /// Performs blob storage operations and sends all notifications. This should be called
738    /// after the pool write lock has been released to avoid blocking pool operations.
739    fn on_added_transaction(&self, meta: AddedTransactionMeta<T::Transaction>) {
740        // Handle blob sidecar storage and notifications for EIP-4844 transactions
741        if let Some(sidecar) = meta.blob_sidecar {
742            let hash = *meta.added.hash();
743            self.on_new_blob_sidecar(&hash, &sidecar);
744            self.insert_blob(hash, sidecar);
745        }
746
747        // Delete replaced blob sidecar if any
748        if let Some(replaced) = meta.added.replaced_blob_transaction() {
749            debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
750            self.delete_blob(replaced);
751        }
752
753        // Delete discarded blob sidecars if any, this doesnt do any IO.
754        if let Some(discarded) = meta.added.discarded_transactions() {
755            self.delete_discarded_blobs(discarded.iter());
756        }
757
758        // Notify pending transaction listeners
759        if let Some(pending) = meta.added.as_pending() {
760            self.on_new_pending_transaction(pending);
761        }
762
763        // Notify event listeners
764        self.notify_event_listeners(&meta.added);
765
766        // Notify new transaction listeners
767        self.on_new_transaction(meta.added.into_new_transaction_event());
768    }
769
770    /// Notify all listeners about a new pending transaction.
771    ///
772    /// See also [`Self::add_pending_listener`]
773    ///
774    /// CAUTION: This function is only intended to be used manually in order to use this type's
775    /// pending transaction receivers when manually implementing the
776    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
777    /// [`TransactionPool::pending_transactions_listener_for`](crate::TransactionPool).
778    pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
779        let mut needs_cleanup = false;
780
781        {
782            let listeners = self.pending_transaction_listener.read();
783            for listener in listeners.iter() {
784                if !listener.send_all(pending.pending_transactions(listener.kind)) {
785                    needs_cleanup = true;
786                }
787            }
788        }
789
790        // Clean up dead listeners if we detected any closed channels
791        if needs_cleanup {
792            self.pending_transaction_listener
793                .write()
794                .retain(|listener| !listener.sender.is_closed());
795        }
796    }
797
798    /// Notify all listeners about a newly inserted pending transaction.
799    ///
800    /// See also [`Self::add_new_transaction_listener`]
801    ///
802    /// CAUTION: This function is only intended to be used manually in order to use this type's
803    /// transaction receivers when manually implementing the
804    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
805    /// [`TransactionPool::new_transactions_listener_for`](crate::TransactionPool).
806    pub fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
807        let mut needs_cleanup = false;
808
809        {
810            let listeners = self.transaction_listener.read();
811            for listener in listeners.iter() {
812                if listener.kind.is_propagate_only() && !event.transaction.propagate {
813                    if listener.sender.is_closed() {
814                        needs_cleanup = true;
815                    }
816                    // Skip non-propagate transactions for propagate-only listeners
817                    continue
818                }
819
820                if !listener.send(event.clone()) {
821                    needs_cleanup = true;
822                }
823            }
824        }
825
826        // Clean up dead listeners if we detected any closed channels
827        if needs_cleanup {
828            self.transaction_listener.write().retain(|listener| !listener.sender.is_closed());
829        }
830    }
831
832    /// Notify all listeners about a blob sidecar for a newly inserted blob (eip4844) transaction.
833    fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecarVariant) {
834        let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
835        if sidecar_listeners.is_empty() {
836            return
837        }
838        let sidecar = Arc::new(sidecar.clone());
839        sidecar_listeners.retain_mut(|listener| {
840            let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
841            match listener.sender.try_send(new_blob_event) {
842                Ok(()) => true,
843                Err(err) => {
844                    if matches!(err, mpsc::error::TrySendError::Full(_)) {
845                        debug!(
846                            target: "txpool",
847                            "[{:?}] failed to send blob sidecar; channel full",
848                            sidecar,
849                        );
850                        true
851                    } else {
852                        false
853                    }
854                }
855            }
856        })
857    }
858
859    /// Notifies transaction listeners about changes once a block was processed.
860    fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
861        trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
862
863        // notify about promoted pending transactions - emit hashes
864        let mut needs_pending_cleanup = false;
865        {
866            let listeners = self.pending_transaction_listener.read();
867            for listener in listeners.iter() {
868                if !listener.send_all(outcome.pending_transactions(listener.kind)) {
869                    needs_pending_cleanup = true;
870                }
871            }
872        }
873        if needs_pending_cleanup {
874            self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
875        }
876
877        // emit full transactions
878        let mut needs_tx_cleanup = false;
879        {
880            let listeners = self.transaction_listener.read();
881            for listener in listeners.iter() {
882                if !listener.send_all(outcome.full_pending_transactions(listener.kind)) {
883                    needs_tx_cleanup = true;
884                }
885            }
886        }
887        if needs_tx_cleanup {
888            self.transaction_listener.write().retain(|l| !l.sender.is_closed());
889        }
890
891        let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
892
893        // broadcast specific transaction events
894        self.with_event_listener(|listener| {
895            for tx in &mined {
896                listener.mined(tx, block_hash);
897            }
898            for tx in &promoted {
899                listener.pending(tx.hash(), None);
900            }
901            for tx in &discarded {
902                listener.discarded(tx.hash());
903            }
904        })
905    }
906
907    /// Notifies all listeners about the transaction movements.
908    ///
909    /// This will emit events according to the provided changes.
910    ///
911    /// CAUTION: This function is only intended to be used manually in order to use this type's
912    /// [`TransactionEvents`] receivers when manually implementing the
913    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
914    /// [`TransactionPool::transaction_event_listener`](crate::TransactionPool).
915    pub fn notify_on_transaction_updates(
916        &self,
917        promoted: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
918        discarded: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
919    ) {
920        // Notify about promoted pending transactions (similar to notify_on_new_state)
921        if !promoted.is_empty() {
922            let mut needs_pending_cleanup = false;
923            {
924                let listeners = self.pending_transaction_listener.read();
925                for listener in listeners.iter() {
926                    let promoted_hashes = promoted.iter().filter_map(|tx| {
927                        if listener.kind.is_propagate_only() && !tx.propagate {
928                            None
929                        } else {
930                            Some(*tx.hash())
931                        }
932                    });
933                    if !listener.send_all(promoted_hashes) {
934                        needs_pending_cleanup = true;
935                    }
936                }
937            }
938            if needs_pending_cleanup {
939                self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
940            }
941
942            // in this case we should also emit promoted transactions in full
943            let mut needs_tx_cleanup = false;
944            {
945                let listeners = self.transaction_listener.read();
946                for listener in listeners.iter() {
947                    let promoted_txs = promoted.iter().filter_map(|tx| {
948                        if listener.kind.is_propagate_only() && !tx.propagate {
949                            None
950                        } else {
951                            Some(NewTransactionEvent::pending(tx.clone()))
952                        }
953                    });
954                    if !listener.send_all(promoted_txs) {
955                        needs_tx_cleanup = true;
956                    }
957                }
958            }
959            if needs_tx_cleanup {
960                self.transaction_listener.write().retain(|l| !l.sender.is_closed());
961            }
962        }
963
964        self.with_event_listener(|listener| {
965            for tx in &promoted {
966                listener.pending(tx.hash(), None);
967            }
968            for tx in &discarded {
969                listener.discarded(tx.hash());
970            }
971        });
972
973        if !discarded.is_empty() {
974            // This deletes outdated blob txs from the blob store, based on the account's nonce.
975            // This is called during txpool maintenance when the pool drifted.
976            self.delete_discarded_blobs(discarded.iter());
977        }
978    }
979
980    /// Fire events for the newly added transaction if there are any.
981    ///
982    /// See also [`Self::add_transaction_event_listener`].
983    ///
984    /// CAUTION: This function is only intended to be used manually in order to use this type's
985    /// [`TransactionEvents`] receivers when manually implementing the
986    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
987    /// [`TransactionPool::transaction_event_listener`](crate::TransactionPool).
988    pub fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
989        self.with_event_listener(|listener| match tx {
990            AddedTransaction::Pending(tx) => {
991                let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
992
993                listener.pending(transaction.hash(), replaced.clone());
994                for tx in promoted {
995                    listener.pending(tx.hash(), None);
996                }
997                for tx in discarded {
998                    listener.discarded(tx.hash());
999                }
1000            }
1001            AddedTransaction::Parked { transaction, replaced, queued_reason, .. } => {
1002                listener.queued(transaction.hash(), queued_reason.clone());
1003                if let Some(replaced) = replaced {
1004                    listener.replaced(replaced.clone(), *transaction.hash());
1005                }
1006            }
1007        });
1008    }
1009
1010    /// Returns an iterator that yields transactions that are ready to be included in the block.
1011    pub fn best_transactions(&self) -> BestTransactions<T> {
1012        self.get_pool_data().best_transactions()
1013    }
1014
1015    /// Returns an iterator that yields transactions that are ready to be included in the block with
1016    /// the given base fee and optional blob fee attributes.
1017    pub fn best_transactions_with_attributes(
1018        &self,
1019        best_transactions_attributes: BestTransactionsAttributes,
1020    ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
1021    {
1022        self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
1023    }
1024
1025    /// Returns only the first `max` transactions in the pending pool.
1026    pub fn pending_transactions_max(
1027        &self,
1028        max: usize,
1029    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1030        self.get_pool_data().pending_transactions_iter().take(max).collect()
1031    }
1032
1033    /// Returns all transactions from the pending sub-pool
1034    pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1035        self.get_pool_data().pending_transactions()
1036    }
1037
1038    /// Returns all transactions from parked pools
1039    pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1040        self.get_pool_data().queued_transactions()
1041    }
1042
1043    /// Returns all transactions in the pool
1044    pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
1045        let pool = self.get_pool_data();
1046        AllPoolTransactions {
1047            pending: pool.pending_transactions(),
1048            queued: pool.queued_transactions(),
1049        }
1050    }
1051
1052    /// Returns _all_ transactions in the pool
1053    pub fn all_transaction_hashes(&self) -> Vec<TxHash> {
1054        self.get_pool_data().all().transactions_iter().map(|tx| *tx.hash()).collect()
1055    }
1056
1057    /// Removes and returns all matching transactions from the pool.
1058    ///
1059    /// This behaves as if the transactions got discarded (_not_ mined), effectively introducing a
1060    /// nonce gap for the given transactions.
1061    pub fn remove_transactions(
1062        &self,
1063        hashes: Vec<TxHash>,
1064    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1065        if hashes.is_empty() {
1066            return Vec::new()
1067        }
1068        let removed = self.pool.write().remove_transactions(hashes);
1069
1070        self.with_event_listener(|listener| listener.discarded_many(&removed));
1071
1072        removed
1073    }
1074
1075    /// Removes and returns all matching transactions and their dependent transactions from the
1076    /// pool.
1077    pub fn remove_transactions_and_descendants(
1078        &self,
1079        hashes: Vec<TxHash>,
1080    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1081        if hashes.is_empty() {
1082            return Vec::new()
1083        }
1084        let removed = self.pool.write().remove_transactions_and_descendants(hashes);
1085
1086        self.with_event_listener(|listener| {
1087            for tx in &removed {
1088                listener.discarded(tx.hash());
1089            }
1090        });
1091
1092        removed
1093    }
1094
1095    /// Removes and returns all transactions by the specified sender from the pool.
1096    pub fn remove_transactions_by_sender(
1097        &self,
1098        sender: Address,
1099    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1100        let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1101        let removed = self.pool.write().remove_transactions_by_sender(sender_id);
1102
1103        self.with_event_listener(|listener| listener.discarded_many(&removed));
1104
1105        removed
1106    }
1107
1108    /// Prunes and returns all matching transactions from the pool.
1109    ///
1110    /// This removes the transactions as if they were mined: descendant transactions are **not**
1111    /// parked and remain eligible for inclusion.
1112    pub fn prune_transactions(
1113        &self,
1114        hashes: Vec<TxHash>,
1115    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1116        if hashes.is_empty() {
1117            return Vec::new()
1118        }
1119
1120        self.pool.write().prune_transactions(hashes)
1121    }
1122
1123    /// Retains only transactions that are not present in the pool.
1124    pub fn retain_unknown<A>(&self, announcement: &mut A)
1125    where
1126        A: HandleMempoolData,
1127    {
1128        if announcement.is_empty() {
1129            return
1130        }
1131        let pool = self.get_pool_data();
1132        announcement.retain_by_hash(|tx| !pool.contains(tx))
1133    }
1134
1135    /// Retains only transactions that are present in the pool.
1136    pub fn retain_contains<A>(&self, announcement: &mut A)
1137    where
1138        A: HandleMempoolData,
1139    {
1140        if announcement.is_empty() {
1141            return
1142        }
1143        let pool = self.get_pool_data();
1144        announcement.retain_by_hash(|tx| pool.contains(tx))
1145    }
1146
1147    /// Returns the transaction by hash.
1148    pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1149        self.get_pool_data().get(tx_hash)
1150    }
1151
1152    /// Returns all transactions of the address
1153    pub fn get_transactions_by_sender(
1154        &self,
1155        sender: Address,
1156    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1157        let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1158        self.get_pool_data().get_transactions_by_sender(sender_id)
1159    }
1160
1161    /// Returns a pending transaction sent by the given sender with the given nonce.
1162    pub fn get_pending_transaction_by_sender_and_nonce(
1163        &self,
1164        sender: Address,
1165        nonce: u64,
1166    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1167        let sender_id = self.sender_id(&sender)?;
1168        self.get_pool_data().get_pending_transaction_by_sender_and_nonce(sender_id, nonce)
1169    }
1170
1171    /// Returns all queued transactions of the address by sender
1172    pub fn get_queued_transactions_by_sender(
1173        &self,
1174        sender: Address,
1175    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1176        let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1177        self.get_pool_data().queued_txs_by_sender(sender_id)
1178    }
1179
1180    /// Returns all pending transactions filtered by predicate
1181    pub fn pending_transactions_with_predicate(
1182        &self,
1183        predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
1184    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1185        self.get_pool_data().pending_transactions_with_predicate(predicate)
1186    }
1187
1188    /// Returns all pending transactions of the address by sender
1189    pub fn get_pending_transactions_by_sender(
1190        &self,
1191        sender: Address,
1192    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1193        let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1194        self.get_pool_data().pending_txs_by_sender(sender_id)
1195    }
1196
1197    /// Returns the highest transaction of the address
1198    pub fn get_highest_transaction_by_sender(
1199        &self,
1200        sender: Address,
1201    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1202        let sender_id = self.sender_id(&sender)?;
1203        self.get_pool_data().get_highest_transaction_by_sender(sender_id)
1204    }
1205
1206    /// Returns the transaction with the highest nonce that is executable given the on chain nonce.
1207    pub fn get_highest_consecutive_transaction_by_sender(
1208        &self,
1209        sender: Address,
1210        on_chain_nonce: u64,
1211    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1212        let sender_id = self.sender_id(&sender)?;
1213        self.get_pool_data().get_highest_consecutive_transaction_by_sender(
1214            sender_id.into_transaction_id(on_chain_nonce),
1215        )
1216    }
1217
1218    /// Returns the transaction given a [`TransactionId`]
1219    pub fn get_transaction_by_transaction_id(
1220        &self,
1221        transaction_id: &TransactionId,
1222    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1223        self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
1224    }
1225
1226    /// Returns all transactions that where submitted with the given [`TransactionOrigin`]
1227    pub fn get_transactions_by_origin(
1228        &self,
1229        origin: TransactionOrigin,
1230    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1231        self.get_pool_data()
1232            .all()
1233            .transactions_iter()
1234            .filter(|tx| tx.origin == origin)
1235            .cloned()
1236            .collect()
1237    }
1238
1239    /// Returns all pending transactions filtered by [`TransactionOrigin`]
1240    pub fn get_pending_transactions_by_origin(
1241        &self,
1242        origin: TransactionOrigin,
1243    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1244        self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
1245    }
1246
1247    /// Returns all the transactions belonging to the hashes.
1248    ///
1249    /// If no transaction exists, it is skipped.
1250    pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1251        if txs.is_empty() {
1252            return Vec::new()
1253        }
1254        self.get_pool_data().get_all(txs).collect()
1255    }
1256
1257    /// Returns all the transactions belonging to the hashes that are propagatable.
1258    ///
1259    /// If no transaction exists, it is skipped.
1260    fn get_all_propagatable(
1261        &self,
1262        txs: &[TxHash],
1263    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1264        if txs.is_empty() {
1265            return Vec::new()
1266        }
1267        let pool = self.get_pool_data();
1268        txs.iter().filter_map(|tx| pool.get(tx).filter(|tx| tx.propagate)).collect()
1269    }
1270
1271    /// Notify about propagated transactions.
1272    pub fn on_propagated(&self, txs: PropagatedTransactions) {
1273        if txs.is_empty() {
1274            return
1275        }
1276        self.with_event_listener(|listener| {
1277            txs.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers));
1278        });
1279    }
1280
1281    /// Number of transactions in the entire pool
1282    pub fn len(&self) -> usize {
1283        self.get_pool_data().len()
1284    }
1285
1286    /// Whether the pool is empty
1287    pub fn is_empty(&self) -> bool {
1288        self.get_pool_data().is_empty()
1289    }
1290
1291    /// Returns whether or not the pool is over its configured size and transaction count limits.
1292    pub fn is_exceeded(&self) -> bool {
1293        self.pool.read().is_exceeded()
1294    }
1295
1296    /// Inserts a blob transaction into the blob store
1297    fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecarVariant) {
1298        debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
1299        if let Err(err) = self.blob_store.insert(hash, blob) {
1300            warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
1301            self.blob_store_metrics.blobstore_failed_inserts.increment(1);
1302        }
1303        self.update_blob_store_metrics();
1304    }
1305
1306    /// Delete a blob from the blob store
1307    pub fn delete_blob(&self, blob: TxHash) {
1308        let _ = self.blob_store.delete(blob);
1309    }
1310
1311    /// Delete all blobs from the blob store
1312    pub fn delete_blobs(&self, txs: Vec<TxHash>) {
1313        let _ = self.blob_store.delete_all(txs);
1314    }
1315
1316    /// Cleans up the blob store
1317    pub fn cleanup_blobs(&self) {
1318        let stat = self.blob_store.cleanup();
1319        self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
1320        self.update_blob_store_metrics();
1321    }
1322
1323    fn update_blob_store_metrics(&self) {
1324        if let Some(data_size) = self.blob_store.data_size_hint() {
1325            self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
1326        }
1327        self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
1328    }
1329
1330    /// Deletes all blob transactions that were discarded.
1331    fn delete_discarded_blobs<'a>(
1332        &'a self,
1333        transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
1334    ) {
1335        let blob_txs = transactions
1336            .into_iter()
1337            .filter(|tx| tx.transaction.is_eip4844())
1338            .map(|tx| *tx.hash())
1339            .collect();
1340        self.delete_blobs(blob_txs);
1341    }
1342}
1343
1344impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
1345    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1346        f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
1347    }
1348}
1349
1350/// Metadata for a transaction that was added to the pool.
1351///
1352/// This holds all the data needed to complete post-insertion operations (notifications,
1353/// blob storage).
1354#[derive(Debug)]
1355struct AddedTransactionMeta<T: PoolTransaction> {
1356    /// The transaction that was added to the pool
1357    added: AddedTransaction<T>,
1358    /// Optional blob sidecar for EIP-4844 transactions
1359    blob_sidecar: Option<BlobTransactionSidecarVariant>,
1360}
1361
1362/// Tracks an added transaction and all graph changes caused by adding it.
1363#[derive(Debug, Clone)]
1364pub struct AddedPendingTransaction<T: PoolTransaction> {
1365    /// Inserted transaction.
1366    pub transaction: Arc<ValidPoolTransaction<T>>,
1367    /// Replaced transaction.
1368    pub replaced: Option<Arc<ValidPoolTransaction<T>>>,
1369    /// transactions promoted to the pending queue
1370    pub promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1371    /// transactions that failed and became discarded
1372    pub discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1373}
1374
1375impl<T: PoolTransaction> AddedPendingTransaction<T> {
1376    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1377    /// [`TransactionListenerKind`].
1378    ///
1379    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1380    /// are allowed to be propagated are returned.
1381    pub(crate) fn pending_transactions(
1382        &self,
1383        kind: TransactionListenerKind,
1384    ) -> impl Iterator<Item = B256> + '_ {
1385        let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1386        PendingTransactionIter { kind, iter }
1387    }
1388}
1389
1390pub(crate) struct PendingTransactionIter<Iter> {
1391    kind: TransactionListenerKind,
1392    iter: Iter,
1393}
1394
1395impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1396where
1397    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1398    T: PoolTransaction + 'a,
1399{
1400    type Item = B256;
1401
1402    fn next(&mut self) -> Option<Self::Item> {
1403        loop {
1404            let next = self.iter.next()?;
1405            if self.kind.is_propagate_only() && !next.propagate {
1406                continue
1407            }
1408            return Some(*next.hash())
1409        }
1410    }
1411}
1412
1413/// An iterator over full pending transactions
1414pub(crate) struct FullPendingTransactionIter<Iter> {
1415    kind: TransactionListenerKind,
1416    iter: Iter,
1417}
1418
1419impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1420where
1421    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1422    T: PoolTransaction + 'a,
1423{
1424    type Item = NewTransactionEvent<T>;
1425
1426    fn next(&mut self) -> Option<Self::Item> {
1427        loop {
1428            let next = self.iter.next()?;
1429            if self.kind.is_propagate_only() && !next.propagate {
1430                continue
1431            }
1432            return Some(NewTransactionEvent {
1433                subpool: SubPool::Pending,
1434                transaction: next.clone(),
1435            })
1436        }
1437    }
1438}
1439
1440/// Represents a transaction that was added into the pool and its state
1441#[derive(Debug, Clone)]
1442pub enum AddedTransaction<T: PoolTransaction> {
1443    /// Transaction was successfully added and moved to the pending pool.
1444    Pending(AddedPendingTransaction<T>),
1445    /// Transaction was successfully added but not yet ready for processing and moved to a
1446    /// parked pool instead.
1447    Parked {
1448        /// Inserted transaction.
1449        transaction: Arc<ValidPoolTransaction<T>>,
1450        /// Replaced transaction.
1451        replaced: Option<Arc<ValidPoolTransaction<T>>>,
1452        /// The subpool it was moved to.
1453        subpool: SubPool,
1454        /// The specific reason why the transaction is queued (if applicable).
1455        queued_reason: Option<QueuedReason>,
1456    },
1457}
1458
1459impl<T: PoolTransaction> AddedTransaction<T> {
1460    /// Returns whether the transaction has been added to the pending pool.
1461    pub const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1462        match self {
1463            Self::Pending(tx) => Some(tx),
1464            _ => None,
1465        }
1466    }
1467
1468    /// Returns the replaced transaction if there was one
1469    pub const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1470        match self {
1471            Self::Pending(tx) => tx.replaced.as_ref(),
1472            Self::Parked { replaced, .. } => replaced.as_ref(),
1473        }
1474    }
1475
1476    /// Returns the discarded transactions if there were any
1477    pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1478        match self {
1479            Self::Pending(tx) => Some(&tx.discarded),
1480            Self::Parked { .. } => None,
1481        }
1482    }
1483
1484    /// Returns the hash of the replaced transaction if it is a blob transaction.
1485    pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1486        self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1487    }
1488
1489    /// Returns the hash of the transaction
1490    pub fn hash(&self) -> &TxHash {
1491        match self {
1492            Self::Pending(tx) => tx.transaction.hash(),
1493            Self::Parked { transaction, .. } => transaction.hash(),
1494        }
1495    }
1496
1497    /// Converts this type into the event type for listeners
1498    pub fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1499        match self {
1500            Self::Pending(tx) => {
1501                NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1502            }
1503            Self::Parked { transaction, subpool, .. } => {
1504                NewTransactionEvent { transaction, subpool }
1505            }
1506        }
1507    }
1508
1509    /// Returns the subpool this transaction was added to
1510    pub(crate) const fn subpool(&self) -> SubPool {
1511        match self {
1512            Self::Pending(_) => SubPool::Pending,
1513            Self::Parked { subpool, .. } => *subpool,
1514        }
1515    }
1516
1517    /// Returns the [`TransactionId`] of the added transaction
1518    #[cfg(test)]
1519    pub(crate) fn id(&self) -> &TransactionId {
1520        match self {
1521            Self::Pending(added) => added.transaction.id(),
1522            Self::Parked { transaction, .. } => transaction.id(),
1523        }
1524    }
1525
1526    /// Returns the queued reason if the transaction is parked with a queued reason.
1527    pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1528        match self {
1529            Self::Pending(_) => None,
1530            Self::Parked { queued_reason, .. } => queued_reason.as_ref(),
1531        }
1532    }
1533
1534    /// Returns the transaction state based on the subpool and queued reason.
1535    pub fn transaction_state(&self) -> AddedTransactionState {
1536        match self.subpool() {
1537            SubPool::Pending => AddedTransactionState::Pending,
1538            _ => {
1539                // For non-pending transactions, use the queued reason directly from the
1540                // AddedTransaction
1541                if let Some(reason) = self.queued_reason() {
1542                    AddedTransactionState::Queued(reason.clone())
1543                } else {
1544                    // Fallback - this shouldn't happen with the new implementation
1545                    AddedTransactionState::Queued(QueuedReason::NonceGap)
1546                }
1547            }
1548        }
1549    }
1550}
1551
1552/// The specific reason why a transaction is queued (not ready for execution)
1553#[derive(Debug, Clone, PartialEq, Eq)]
1554pub enum QueuedReason {
1555    /// Transaction has a nonce gap - missing prior transactions
1556    NonceGap,
1557    /// Transaction has parked ancestors - waiting for other transactions to be mined
1558    ParkedAncestors,
1559    /// Sender has insufficient balance to cover the transaction cost
1560    InsufficientBalance,
1561    /// Transaction exceeds the block gas limit
1562    TooMuchGas,
1563    /// Transaction doesn't meet the base fee requirement
1564    InsufficientBaseFee,
1565    /// Transaction doesn't meet the blob fee requirement (EIP-4844)
1566    InsufficientBlobFee,
1567}
1568
1569/// The state of a transaction when is was added to the pool
1570#[derive(Debug, Clone, PartialEq, Eq)]
1571pub enum AddedTransactionState {
1572    /// Ready for execution
1573    Pending,
1574    /// Not ready for execution due to a specific condition
1575    Queued(QueuedReason),
1576}
1577
1578impl AddedTransactionState {
1579    /// Returns whether the transaction was submitted as queued.
1580    pub const fn is_queued(&self) -> bool {
1581        matches!(self, Self::Queued(_))
1582    }
1583
1584    /// Returns whether the transaction was submitted as pending.
1585    pub const fn is_pending(&self) -> bool {
1586        matches!(self, Self::Pending)
1587    }
1588
1589    /// Returns the specific queued reason if the transaction is queued.
1590    pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1591        match self {
1592            Self::Queued(reason) => Some(reason),
1593            Self::Pending => None,
1594        }
1595    }
1596}
1597
1598/// The outcome of a successful transaction addition
1599#[derive(Debug, Clone, PartialEq, Eq)]
1600pub struct AddedTransactionOutcome {
1601    /// The hash of the transaction
1602    pub hash: TxHash,
1603    /// The state of the transaction
1604    pub state: AddedTransactionState,
1605}
1606
1607impl AddedTransactionOutcome {
1608    /// Returns whether the transaction was submitted as queued.
1609    pub const fn is_queued(&self) -> bool {
1610        self.state.is_queued()
1611    }
1612
1613    /// Returns whether the transaction was submitted as pending.
1614    pub const fn is_pending(&self) -> bool {
1615        self.state.is_pending()
1616    }
1617}
1618
1619/// Contains all state changes after a [`CanonicalStateUpdate`] was processed
1620#[derive(Debug)]
1621pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1622    /// Hash of the block.
1623    pub(crate) block_hash: B256,
1624    /// All mined transactions.
1625    pub(crate) mined: Vec<TxHash>,
1626    /// Transactions promoted to the pending pool.
1627    pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1628    /// transaction that were discarded during the update
1629    pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1630}
1631
1632impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1633    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1634    /// [`TransactionListenerKind`].
1635    ///
1636    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1637    /// are allowed to be propagated are returned.
1638    pub(crate) fn pending_transactions(
1639        &self,
1640        kind: TransactionListenerKind,
1641    ) -> impl Iterator<Item = B256> + '_ {
1642        let iter = self.promoted.iter();
1643        PendingTransactionIter { kind, iter }
1644    }
1645
1646    /// Returns all FULL transactions that were promoted to the pending pool and adhere to the given
1647    /// [`TransactionListenerKind`].
1648    ///
1649    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1650    /// are allowed to be propagated are returned.
1651    pub(crate) fn full_pending_transactions(
1652        &self,
1653        kind: TransactionListenerKind,
1654    ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1655        let iter = self.promoted.iter();
1656        FullPendingTransactionIter { kind, iter }
1657    }
1658}
1659
1660#[cfg(test)]
1661mod tests {
1662    use crate::{
1663        blobstore::{BlobStore, InMemoryBlobStore},
1664        identifier::SenderId,
1665        test_utils::{MockTransaction, TestPoolBuilder},
1666        validate::ValidTransaction,
1667        BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1668    };
1669    use alloy_eips::{eip4844::BlobTransactionSidecar, eip7594::BlobTransactionSidecarVariant};
1670    use alloy_primitives::Address;
1671    use std::{fs, path::PathBuf};
1672
1673    #[test]
1674    fn test_discard_blobs_on_blob_tx_eviction() {
1675        let blobs = {
1676            // Read the contents of the JSON file into a string.
1677            let json_content = fs::read_to_string(
1678                PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1679            )
1680            .expect("Failed to read the blob data file");
1681
1682            // Parse the JSON contents into a serde_json::Value.
1683            let json_value: serde_json::Value =
1684                serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1685
1686            // Extract blob data from JSON and convert it to Blob.
1687            vec![
1688                // Extract the "data" field from the JSON and parse it as a string.
1689                json_value
1690                    .get("data")
1691                    .unwrap()
1692                    .as_str()
1693                    .expect("Data is not a valid string")
1694                    .to_string(),
1695            ]
1696        };
1697
1698        // Generate a BlobTransactionSidecar from the blobs.
1699        let sidecar = BlobTransactionSidecarVariant::Eip4844(
1700            BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap(),
1701        );
1702
1703        // Define the maximum limit for blobs in the sub-pool.
1704        let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1705
1706        // Create a test pool with default configuration and the specified blob limit.
1707        let test_pool = &TestPoolBuilder::default()
1708            .with_config(PoolConfig { blob_limit, ..Default::default() })
1709            .pool;
1710
1711        // Set the block info for the pool, including a pending blob fee.
1712        test_pool
1713            .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1714
1715        // Create an in-memory blob store.
1716        let blob_store = InMemoryBlobStore::default();
1717
1718        // Loop to add transactions to the pool and test blob eviction.
1719        for n in 0..blob_limit.max_txs + 10 {
1720            // Create a mock transaction with the generated blob sidecar.
1721            let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1722
1723            // Set non zero size
1724            tx.set_size(1844674407370951);
1725
1726            // Insert the sidecar into the blob store if the current index is within the blob limit.
1727            if n < blob_limit.max_txs {
1728                blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1729            }
1730
1731            // Add the transaction to the pool with external origin and valid outcome.
1732            test_pool.add_transactions(
1733                TransactionOrigin::External,
1734                [TransactionValidationOutcome::Valid {
1735                    balance: U256::from(1_000),
1736                    state_nonce: 0,
1737                    bytecode_hash: None,
1738                    transaction: ValidTransaction::ValidWithSidecar {
1739                        transaction: tx,
1740                        sidecar: sidecar.clone(),
1741                    },
1742                    propagate: true,
1743                    authorities: None,
1744                }],
1745            );
1746        }
1747
1748        // Assert that the size of the pool's blob component is equal to the maximum blob limit.
1749        assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1750
1751        // Assert that the size of the pool's blob_size component matches the expected value.
1752        assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1753
1754        // Assert that the pool's blob store matches the expected blob store.
1755        assert_eq!(*test_pool.blob_store(), blob_store);
1756    }
1757
1758    #[test]
1759    fn test_auths_stored_in_identifiers() {
1760        // Create a test pool with default configuration.
1761        let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1762
1763        let auth = Address::new([1; 20]);
1764        let tx = MockTransaction::eip7702();
1765
1766        test_pool.add_transactions(
1767            TransactionOrigin::Local,
1768            [TransactionValidationOutcome::Valid {
1769                balance: U256::from(1_000),
1770                state_nonce: 0,
1771                bytecode_hash: None,
1772                transaction: ValidTransaction::Valid(tx),
1773                propagate: true,
1774                authorities: Some(vec![auth]),
1775            }],
1776        );
1777
1778        let identifiers = test_pool.identifiers.read();
1779        assert_eq!(identifiers.sender_id(&auth), Some(SenderId::from(1)));
1780    }
1781
1782    #[test]
1783    fn sender_queries_do_not_allocate_ids_for_unknown_addresses() {
1784        let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1785        let sender = Address::new([9; 20]);
1786
1787        assert_eq!(test_pool.sender_id(&sender), None);
1788        assert!(test_pool.get_transactions_by_sender(sender).is_empty());
1789        assert!(test_pool.get_pending_transaction_by_sender_and_nonce(sender, 0).is_none());
1790        assert!(test_pool.get_queued_transactions_by_sender(sender).is_empty());
1791        assert!(test_pool.get_pending_transactions_by_sender(sender).is_empty());
1792        assert!(test_pool.get_highest_transaction_by_sender(sender).is_none());
1793        assert!(test_pool.get_highest_consecutive_transaction_by_sender(sender, 0).is_none());
1794        assert!(test_pool.remove_transactions_by_sender(sender).is_empty());
1795        assert_eq!(test_pool.sender_id(&sender), None);
1796    }
1797}