Skip to main content

reth_transaction_pool/pool/
mod.rs

1//! Transaction Pool internals.
2//!
3//! Incoming transactions are validated before they enter the pool first. The validation outcome can
4//! have 3 states:
5//!
6//!  1. Transaction can _never_ be valid
7//!  2. Transaction is _currently_ valid
8//!  3. Transaction is _currently_ invalid, but could potentially become valid in the future
9//!
10//! However, (2.) and (3.) of a transaction can only be determined on the basis of the current
11//! state, whereas (1.) holds indefinitely. This means once the state changes (2.) and (3.) the
12//! state of a transaction needs to be reevaluated again.
13//!
14//! The transaction pool is responsible for storing new, valid transactions and providing the next
15//! best transactions sorted by their priority. Where priority is determined by the transaction's
16//! score ([`TransactionOrdering`]).
17//!
18//! Furthermore, the following characteristics fall under (3.):
19//!
20//!  a) Nonce of a transaction is higher than the expected nonce for the next transaction of its
21//! sender. A distinction is made here whether multiple transactions from the same sender have
22//! gapless nonce increments.
23//!
24//!  a)(1) If _no_ transaction is missing in a chain of multiple
25//! transactions from the same sender (all nonce in row), all of them can in principle be executed
26//! on the current state one after the other.
27//!
28//!  a)(2) If there's a nonce gap, then all
29//! transactions after the missing transaction are blocked until the missing transaction arrives.
30//!
31//!  b) Transaction does not meet the dynamic fee cap requirement introduced by EIP-1559: The
32//! fee cap of the transaction needs to be no less than the base fee of block.
33//!
34//!
35//! In essence the transaction pool is made of three separate sub-pools:
36//!
37//!  - Pending Pool: Contains all transactions that are valid on the current state and satisfy (3.
38//!    a)(1): _No_ nonce gaps. A _pending_ transaction is considered _ready_ when it has the lowest
39//!    nonce of all transactions from the same sender. Once a _ready_ transaction with nonce `n` has
40//!    been executed, the next highest transaction from the same sender `n + 1` becomes ready.
41//!
42//!  - Queued Pool: Contains all transactions that are currently blocked by missing transactions:
43//!    (3. a)(2): _With_ nonce gaps or due to lack of funds.
44//!
45//!  - Basefee Pool: To account for the dynamic base fee requirement (3. b) which could render an
46//!    EIP-1559 and all subsequent transactions of the sender currently invalid.
47//!
48//! The classification of transactions is always dependent on the current state that is changed as
49//! soon as a new block is mined. Once a new block is mined, the account changeset must be applied
50//! to the transaction pool.
51//!
52//!
53//! Depending on the use case, consumers of the [`TransactionPool`](crate::traits::TransactionPool)
54//! are interested in (2.) and/or (3.).
55
56//! A generic [`TransactionPool`](crate::traits::TransactionPool) that only handles transactions.
57//!
58//! This Pool maintains two separate sub-pools for (2.) and (3.)
59//!
60//! ## Terminology
61//!
62//!  - _Pending_: pending transactions are transactions that fall under (2.). These transactions can
63//!    currently be executed and are stored in the pending sub-pool
64//!  - _Queued_: queued transactions are transactions that fall under category (3.). Those
65//!    transactions are _currently_ waiting for state changes that eventually move them into
66//!    category (2.) and become pending.
67
68use crate::{
69    blobstore::BlobStore,
70    error::{PoolError, PoolErrorKind, PoolResult},
71    identifier::{SenderId, SenderIdentifiers, TransactionId},
72    metrics::BlobStoreMetrics,
73    pool::{
74        listener::{
75            BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76            TransactionListener,
77        },
78        state::SubPool,
79        txpool::{SenderInfo, TxPool},
80        update::UpdateOutcome,
81    },
82    traits::{
83        AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84        NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85    },
86    validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87    CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88    TransactionValidator,
89};
90
91use alloy_primitives::{
92    map::{AddressSet, HashSet},
93    Address, TxHash, B256,
94};
95use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
96use reth_eth_wire_types::HandleMempoolData;
97use reth_execution_types::ChangedAccount;
98
99use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718};
100use reth_primitives_traits::Recovered;
101use rustc_hash::FxHashMap;
102use std::{
103    fmt,
104    sync::{
105        atomic::{AtomicBool, Ordering},
106        Arc,
107    },
108    time::Instant,
109};
110use tokio::sync::mpsc;
111use tracing::{debug, trace, warn};
112mod events;
113pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
114pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
115pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
116pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
117pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
118pub use pending::PendingPool;
119
120mod best;
121pub use best::BestTransactions;
122
123mod blob;
124pub mod listener;
125mod parked;
126pub mod pending;
127pub mod size;
128pub(crate) mod state;
129pub mod txpool;
130mod update;
131
132/// Bound on number of pending transactions from `reth_network::TransactionsManager` to buffer.
133pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
134/// Bound on number of new transactions from `reth_network::TransactionsManager` to buffer.
135pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
136
137const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
138
139/// Transaction pool internals.
140pub struct PoolInner<V, T, S>
141where
142    T: TransactionOrdering,
143{
144    /// Internal mapping of addresses to plain ints.
145    identifiers: RwLock<SenderIdentifiers>,
146    /// Transaction validator.
147    validator: V,
148    /// Storage for blob transactions
149    blob_store: S,
150    /// The internal pool that manages all transactions.
151    pool: RwLock<TxPool<T>>,
152    /// Pool settings.
153    config: PoolConfig,
154    /// Manages listeners for transaction state change events.
155    event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
156    /// Tracks whether any event listeners have ever been installed.
157    has_event_listeners: AtomicBool,
158    /// Listeners for new _full_ pending transactions.
159    pending_transaction_listener: RwLock<Vec<PendingTransactionHashListener>>,
160    /// Listeners for new transactions added to the pool.
161    transaction_listener: RwLock<Vec<TransactionListener<T::Transaction>>>,
162    /// Listener for new blob transaction sidecars added to the pool.
163    blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
164    /// Metrics for the blob store
165    blob_store_metrics: BlobStoreMetrics,
166}
167
168// === impl PoolInner ===
169
170impl<V, T, S> PoolInner<V, T, S>
171where
172    V: TransactionValidator,
173    T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
174    S: BlobStore,
175{
176    /// Create a new transaction pool instance.
177    pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
178        Self {
179            identifiers: Default::default(),
180            validator,
181            event_listener: Default::default(),
182            has_event_listeners: AtomicBool::new(false),
183            pool: RwLock::new(TxPool::new(ordering, config.clone())),
184            pending_transaction_listener: Default::default(),
185            transaction_listener: Default::default(),
186            blob_transaction_sidecar_listener: Default::default(),
187            config,
188            blob_store,
189            blob_store_metrics: Default::default(),
190        }
191    }
192
193    /// Returns the configured blob store.
194    pub const fn blob_store(&self) -> &S {
195        &self.blob_store
196    }
197
198    /// Returns stats about the size of the pool.
199    pub fn size(&self) -> PoolSize {
200        self.get_pool_data().size()
201    }
202
203    /// Returns the currently tracked block
204    pub fn block_info(&self) -> BlockInfo {
205        self.get_pool_data().block_info()
206    }
207    /// Sets the currently tracked block.
208    ///
209    /// This will also notify subscribers about any transactions that were promoted to the pending
210    /// pool due to fee changes.
211    pub fn set_block_info(&self, info: BlockInfo) {
212        let outcome = self.pool.write().set_block_info(info);
213
214        // Notify subscribers about promoted transactions due to fee changes
215        self.notify_on_transaction_updates(outcome.promoted, outcome.discarded);
216    }
217
218    /// Returns the internal [`SenderId`] for this address, allocating a new mapping when the
219    /// address is first observed.
220    ///
221    /// This must only be used on paths that intentionally begin tracking a sender, such as
222    /// transaction insertion. Read-only lookups should prefer [`Self::sender_id`] to avoid
223    /// growing the sender-id map for unknown addresses.
224    pub fn get_sender_id(&self, addr: Address) -> SenderId {
225        self.identifiers.write().sender_id_or_create(addr)
226    }
227
228    /// Returns the internal [`SenderId`] for this address if it is already tracked.
229    ///
230    /// Unlike [`Self::get_sender_id`], this never allocates a new sender mapping and is therefore
231    /// suitable for read-only queries or best-effort cleanup on unknown addresses.
232    pub fn sender_id(&self, addr: &Address) -> Option<SenderId> {
233        self.identifiers.read().sender_id(addr)
234    }
235
236    /// Returns the internal [`SenderId`]s for the given addresses.
237    pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
238        self.identifiers.write().sender_ids_or_create(addrs)
239    }
240
241    /// Returns all senders in the pool
242    pub fn unique_senders(&self) -> AddressSet {
243        self.get_pool_data().unique_senders()
244    }
245
246    /// Converts the changed accounts to a map of sender ids to sender info (internal identifier
247    /// used for __tracked__ accounts)
248    fn changed_senders(
249        &self,
250        accs: impl Iterator<Item = ChangedAccount>,
251    ) -> FxHashMap<SenderId, SenderInfo> {
252        let identifiers = self.identifiers.read();
253        accs.into_iter()
254            .filter_map(|acc| {
255                let ChangedAccount { address, nonce, balance } = acc;
256                let sender_id = identifiers.sender_id(&address)?;
257                Some((sender_id, SenderInfo { state_nonce: nonce, balance }))
258            })
259            .collect()
260    }
261
262    /// Get the config the pool was configured with.
263    pub const fn config(&self) -> &PoolConfig {
264        &self.config
265    }
266
267    /// Get the validator reference.
268    pub const fn validator(&self) -> &V {
269        &self.validator
270    }
271
272    /// Adds a new transaction listener to the pool that gets notified about every new _pending_
273    /// transaction inserted into the pool
274    pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
275        let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
276        let listener = PendingTransactionHashListener { sender, kind };
277
278        let mut listeners = self.pending_transaction_listener.write();
279        // Clean up dead listeners before adding new one
280        listeners.retain(|l| !l.sender.is_closed());
281        listeners.push(listener);
282
283        rx
284    }
285
286    /// Adds a new transaction listener to the pool that gets notified about every new transaction.
287    pub fn add_new_transaction_listener(
288        &self,
289        kind: TransactionListenerKind,
290    ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
291        let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
292        let listener = TransactionListener { sender, kind };
293
294        let mut listeners = self.transaction_listener.write();
295        // Clean up dead listeners before adding new one
296        listeners.retain(|l| !l.sender.is_closed());
297        listeners.push(listener);
298
299        rx
300    }
301    /// Adds a new blob sidecar listener to the pool that gets notified about every new
302    /// eip4844 transaction's blob sidecar.
303    pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
304        let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
305        let listener = BlobTransactionSidecarListener { sender };
306        self.blob_transaction_sidecar_listener.lock().push(listener);
307        rx
308    }
309
310    /// If the pool contains the transaction, this adds a new listener that gets notified about
311    /// transaction events.
312    pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
313        if !self.get_pool_data().contains(&tx_hash) {
314            return None
315        }
316        let mut listener = self.event_listener.write();
317        let events = listener.subscribe(tx_hash);
318        self.mark_event_listener_installed();
319        Some(events)
320    }
321
322    /// Adds a listener for all transaction events.
323    pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
324        let mut listener = self.event_listener.write();
325        let events = listener.subscribe_all();
326        self.mark_event_listener_installed();
327        events
328    }
329
330    #[inline]
331    fn has_event_listeners(&self) -> bool {
332        self.has_event_listeners.load(Ordering::Relaxed)
333    }
334
335    #[inline]
336    fn mark_event_listener_installed(&self) {
337        self.has_event_listeners.store(true, Ordering::Relaxed);
338    }
339
340    #[inline]
341    fn update_event_listener_state(&self, listener: &PoolEventBroadcast<T::Transaction>) {
342        if listener.is_empty() {
343            self.has_event_listeners.store(false, Ordering::Relaxed);
344        }
345    }
346
347    #[inline]
348    fn with_event_listener<F>(&self, emit: F)
349    where
350        F: FnOnce(&mut PoolEventBroadcast<T::Transaction>),
351    {
352        if !self.has_event_listeners() {
353            return
354        }
355        let mut listener = self.event_listener.write();
356        if !listener.is_empty() {
357            emit(&mut listener);
358        }
359        self.update_event_listener_state(&listener);
360    }
361
362    /// Returns a read lock to the pool's data.
363    pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
364        self.pool.read()
365    }
366
367    /// Returns transactions in the pool that can be propagated
368    pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
369        let mut out = Vec::new();
370        self.append_pooled_transactions(&mut out);
371        out
372    }
373
374    /// Returns hashes of transactions in the pool that can be propagated.
375    pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
376        let mut out = Vec::new();
377        self.append_pooled_transactions_hashes(&mut out);
378        out
379    }
380
381    /// Returns only the first `max` transactions in the pool that can be propagated.
382    pub fn pooled_transactions_max(
383        &self,
384        max: usize,
385    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
386        let mut out = Vec::new();
387        self.append_pooled_transactions_max(max, &mut out);
388        out
389    }
390
391    /// Extends the given vector with all transactions in the pool that can be propagated.
392    pub fn append_pooled_transactions(
393        &self,
394        out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
395    ) {
396        out.extend(
397            self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned(),
398        );
399    }
400
401    /// Extends the given vector with pooled transactions for the given hashes that are allowed to
402    /// be propagated.
403    pub fn append_pooled_transaction_elements(
404        &self,
405        tx_hashes: &[TxHash],
406        limit: GetPooledTransactionLimit,
407        out: &mut Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>,
408    ) where
409        <V as TransactionValidator>::Transaction: EthPoolTransaction,
410    {
411        let transactions = self.get_all_propagatable(tx_hashes);
412        let mut size = 0;
413        for transaction in transactions {
414            let encoded_len = transaction.encoded_length();
415            let Some(pooled) = self.to_pooled_transaction(transaction) else {
416                continue;
417            };
418
419            size += encoded_len;
420            out.push(pooled.into_inner());
421
422            if limit.exceeds(size) {
423                break
424            }
425        }
426    }
427
428    /// Extends the given vector with the hashes of all transactions in the pool that can be
429    /// propagated.
430    pub fn append_pooled_transactions_hashes(&self, out: &mut Vec<TxHash>) {
431        out.extend(
432            self.get_pool_data()
433                .all()
434                .transactions_iter()
435                .filter(|tx| tx.propagate)
436                .map(|tx| *tx.hash()),
437        );
438    }
439
440    /// Extends the given vector with only the first `max` transactions in the pool that can be
441    /// propagated.
442    pub fn append_pooled_transactions_max(
443        &self,
444        max: usize,
445        out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
446    ) {
447        out.extend(
448            self.get_pool_data()
449                .all()
450                .transactions_iter()
451                .filter(|tx| tx.propagate)
452                .take(max)
453                .cloned(),
454        );
455    }
456
457    /// Returns only the first `max` hashes of transactions in the pool that can be propagated.
458    pub fn pooled_transactions_hashes_max(&self, max: usize) -> Vec<TxHash> {
459        if max == 0 {
460            return Vec::new();
461        }
462        self.get_pool_data()
463            .all()
464            .transactions_iter()
465            .filter(|tx| tx.propagate)
466            .take(max)
467            .map(|tx| *tx.hash())
468            .collect()
469    }
470
471    /// Converts the internally tracked transaction to the pooled format.
472    ///
473    /// If the transaction is an EIP-4844 transaction, the blob sidecar is fetched from the blob
474    /// store and attached to the transaction.
475    fn to_pooled_transaction(
476        &self,
477        transaction: Arc<ValidPoolTransaction<T::Transaction>>,
478    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
479    where
480        <V as TransactionValidator>::Transaction: EthPoolTransaction,
481    {
482        if transaction.is_eip4844() {
483            let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
484            transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
485        } else {
486            transaction
487                .transaction
488                .clone_into_pooled()
489                .inspect_err(|err| {
490                    debug!(
491                        target: "txpool", %err,
492                        "failed to convert transaction to pooled element; skipping",
493                    );
494                })
495                .ok()
496        }
497    }
498
499    /// Returns pooled transactions for the given transaction hashes that are allowed to be
500    /// propagated.
501    pub fn get_pooled_transaction_elements(
502        &self,
503        tx_hashes: Vec<TxHash>,
504        limit: GetPooledTransactionLimit,
505    ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
506    where
507        <V as TransactionValidator>::Transaction: EthPoolTransaction,
508    {
509        let mut elements = Vec::new();
510        self.append_pooled_transaction_elements(&tx_hashes, limit, &mut elements);
511        elements.shrink_to_fit();
512        elements
513    }
514
515    /// Returns converted pooled transaction for the given transaction hash.
516    pub fn get_pooled_transaction_element(
517        &self,
518        tx_hash: TxHash,
519    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
520    where
521        <V as TransactionValidator>::Transaction: EthPoolTransaction,
522    {
523        self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
524    }
525
526    /// Updates the entire pool after a new block was executed.
527    pub fn on_canonical_state_change(&self, update: CanonicalStateUpdate<'_, V::Block>) {
528        trace!(target: "txpool", ?update, "updating pool on canonical state change");
529
530        let block_info = update.block_info();
531        let CanonicalStateUpdate {
532            new_tip, changed_accounts, mined_transactions, update_kind, ..
533        } = update;
534        self.validator.on_new_head_block(new_tip);
535
536        let changed_senders = self.changed_senders(changed_accounts.into_iter());
537
538        // update the pool
539        let outcome = self.pool.write().on_canonical_state_change(
540            block_info,
541            mined_transactions,
542            changed_senders,
543            update_kind,
544        );
545
546        // This will discard outdated transactions based on the account's nonce
547        self.delete_discarded_blobs(outcome.discarded.iter());
548
549        // notify listeners about updates
550        self.notify_on_new_state(outcome);
551    }
552
553    /// Performs account updates on the pool.
554    ///
555    /// This will either promote or discard transactions based on the new account state.
556    ///
557    /// This should be invoked when the pool drifted and accounts are updated manually
558    pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
559        let changed_senders = self.changed_senders(accounts.into_iter());
560        let UpdateOutcome { promoted, discarded } =
561            self.pool.write().update_accounts(changed_senders);
562
563        self.notify_on_transaction_updates(promoted, discarded);
564    }
565
566    /// Add a single validated transaction into the pool.
567    ///
568    /// Returns the outcome and optionally metadata to be processed after the pool lock is
569    /// released.
570    ///
571    /// Note: this is only used internally by [`Self::add_transactions()`], all new transaction(s)
572    /// come in through that function, either as a batch or `std::iter::once`.
573    fn add_transaction(
574        &self,
575        pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
576        origin: TransactionOrigin,
577        tx: TransactionValidationOutcome<T::Transaction>,
578    ) -> (PoolResult<AddedTransactionOutcome>, Option<AddedTransactionMeta<T::Transaction>>) {
579        match tx {
580            TransactionValidationOutcome::Valid {
581                balance,
582                state_nonce,
583                transaction,
584                propagate,
585                bytecode_hash,
586                authorities,
587            } => {
588                let sender_id = self.get_sender_id(transaction.sender());
589                let transaction_id = TransactionId::new(sender_id, transaction.nonce());
590
591                // split the valid transaction and the blob sidecar if it has any
592                let (transaction, blob_sidecar) = match transaction {
593                    ValidTransaction::Valid(tx) => (tx, None),
594                    ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
595                        debug_assert!(
596                            transaction.is_eip4844(),
597                            "validator returned sidecar for non EIP-4844 transaction"
598                        );
599                        (transaction, Some(sidecar))
600                    }
601                };
602
603                let tx = ValidPoolTransaction {
604                    transaction,
605                    transaction_id,
606                    propagate,
607                    timestamp: Instant::now(),
608                    origin,
609                    authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
610                };
611
612                let added = match pool.add_transaction(tx, balance, state_nonce, bytecode_hash) {
613                    Ok(added) => added,
614                    Err(err) => return (Err(err), None),
615                };
616                let hash = *added.hash();
617                let state = added.transaction_state();
618
619                let meta = AddedTransactionMeta { added, blob_sidecar };
620
621                (Ok(AddedTransactionOutcome { hash, state }), Some(meta))
622            }
623            TransactionValidationOutcome::Invalid(tx, err) => {
624                self.with_event_listener(|listener| listener.invalid(tx.hash()));
625                (Err(PoolError::new(*tx.hash(), err)), None)
626            }
627            TransactionValidationOutcome::Error(tx_hash, err) => {
628                self.with_event_listener(|listener| listener.discarded(&tx_hash));
629                (Err(PoolError::other(tx_hash, err)), None)
630            }
631        }
632    }
633
634    /// Adds a transaction and returns the event stream.
635    pub fn add_transaction_and_subscribe(
636        &self,
637        origin: TransactionOrigin,
638        tx: TransactionValidationOutcome<T::Transaction>,
639    ) -> PoolResult<TransactionEvents> {
640        let listener = {
641            let mut listener = self.event_listener.write();
642            let events = listener.subscribe(tx.tx_hash());
643            self.mark_event_listener_installed();
644            events
645        };
646        let mut results = self.add_transactions(origin, std::iter::once(tx));
647        results.pop().expect("result length is the same as the input")?;
648        Ok(listener)
649    }
650
651    /// Adds all transactions in the iterator to the pool, returning a list of results.
652    ///
653    /// Convenience method that assigns the same origin to all transactions. Delegates to
654    /// [`Self::add_transactions_with_origins`].
655    pub fn add_transactions(
656        &self,
657        origin: TransactionOrigin,
658        transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
659    ) -> Vec<PoolResult<AddedTransactionOutcome>> {
660        self.add_transactions_with_origins(transactions.into_iter().map(|tx| (origin, tx)))
661    }
662
663    /// Adds all transactions in the iterator to the pool, each with its own
664    /// [`TransactionOrigin`], returning a list of results.
665    pub fn add_transactions_with_origins(
666        &self,
667        transactions: impl IntoIterator<
668            Item = (TransactionOrigin, TransactionValidationOutcome<T::Transaction>),
669        >,
670    ) -> Vec<PoolResult<AddedTransactionOutcome>> {
671        // Collect results and metadata while holding the pool write lock
672        let (mut results, added_metas, discarded) = {
673            let mut pool = self.pool.write();
674            let mut added_metas = Vec::new();
675
676            let results = transactions
677                .into_iter()
678                .map(|(origin, tx)| {
679                    let (result, meta) = self.add_transaction(&mut pool, origin, tx);
680
681                    // Only collect metadata for successful insertions
682                    if result.is_ok() &&
683                        let Some(meta) = meta
684                    {
685                        added_metas.push(meta);
686                    }
687
688                    result
689                })
690                .collect::<Vec<_>>();
691
692            // Enforce the pool size limits if at least one transaction was added successfully
693            let discarded = if results.iter().any(Result::is_ok) {
694                pool.discard_worst()
695            } else {
696                Default::default()
697            };
698
699            (results, added_metas, discarded)
700        };
701
702        for meta in added_metas {
703            self.on_added_transaction(meta);
704        }
705
706        if !discarded.is_empty() {
707            // Delete any blobs associated with discarded blob transactions
708            self.delete_discarded_blobs(discarded.iter());
709            self.with_event_listener(|listener| listener.discarded_many(&discarded));
710
711            let discarded_hashes =
712                discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
713
714            // A newly added transaction may be immediately discarded, so we need to
715            // adjust the result here
716            for res in &mut results {
717                if let Ok(AddedTransactionOutcome { hash, .. }) = res &&
718                    discarded_hashes.contains(hash)
719                {
720                    *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
721                }
722            }
723        };
724
725        results
726    }
727
728    /// Process a transaction that was added to the pool.
729    ///
730    /// Performs blob storage operations and sends all notifications. This should be called
731    /// after the pool write lock has been released to avoid blocking pool operations.
732    fn on_added_transaction(&self, meta: AddedTransactionMeta<T::Transaction>) {
733        // Handle blob sidecar storage and notifications for EIP-4844 transactions
734        if let Some(sidecar) = meta.blob_sidecar {
735            let hash = *meta.added.hash();
736            self.on_new_blob_sidecar(&hash, &sidecar);
737            self.insert_blob(hash, sidecar);
738        }
739
740        // Delete replaced blob sidecar if any
741        if let Some(replaced) = meta.added.replaced_blob_transaction() {
742            debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
743            self.delete_blob(replaced);
744        }
745
746        // Delete discarded blob sidecars if any, this doesnt do any IO.
747        if let Some(discarded) = meta.added.discarded_transactions() {
748            self.delete_discarded_blobs(discarded.iter());
749        }
750
751        // Notify pending transaction listeners
752        if let Some(pending) = meta.added.as_pending() {
753            self.on_new_pending_transaction(pending);
754        }
755
756        // Notify event listeners
757        self.notify_event_listeners(&meta.added);
758
759        // Notify new transaction listeners
760        self.on_new_transaction(meta.added.into_new_transaction_event());
761    }
762
763    /// Notify all listeners about a new pending transaction.
764    ///
765    /// See also [`Self::add_pending_listener`]
766    ///
767    /// CAUTION: This function is only intended to be used manually in order to use this type's
768    /// pending transaction receivers when manually implementing the
769    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
770    /// [`TransactionPool::pending_transactions_listener_for`](crate::TransactionPool).
771    pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
772        let mut needs_cleanup = false;
773
774        {
775            let listeners = self.pending_transaction_listener.read();
776            for listener in listeners.iter() {
777                if !listener.send_all(pending.pending_transactions(listener.kind)) {
778                    needs_cleanup = true;
779                }
780            }
781        }
782
783        // Clean up dead listeners if we detected any closed channels
784        if needs_cleanup {
785            self.pending_transaction_listener
786                .write()
787                .retain(|listener| !listener.sender.is_closed());
788        }
789    }
790
791    /// Notify all listeners about a newly inserted pending transaction.
792    ///
793    /// See also [`Self::add_new_transaction_listener`]
794    ///
795    /// CAUTION: This function is only intended to be used manually in order to use this type's
796    /// transaction receivers when manually implementing the
797    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
798    /// [`TransactionPool::new_transactions_listener_for`](crate::TransactionPool).
799    pub fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
800        let mut needs_cleanup = false;
801
802        {
803            let listeners = self.transaction_listener.read();
804            for listener in listeners.iter() {
805                if listener.kind.is_propagate_only() && !event.transaction.propagate {
806                    if listener.sender.is_closed() {
807                        needs_cleanup = true;
808                    }
809                    // Skip non-propagate transactions for propagate-only listeners
810                    continue
811                }
812
813                if !listener.send(event.clone()) {
814                    needs_cleanup = true;
815                }
816            }
817        }
818
819        // Clean up dead listeners if we detected any closed channels
820        if needs_cleanup {
821            self.transaction_listener.write().retain(|listener| !listener.sender.is_closed());
822        }
823    }
824
825    /// Notify all listeners about a blob sidecar for a newly inserted blob (eip4844) transaction.
826    fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecarVariant) {
827        let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
828        if sidecar_listeners.is_empty() {
829            return
830        }
831        let sidecar = Arc::new(sidecar.clone());
832        sidecar_listeners.retain_mut(|listener| {
833            let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
834            match listener.sender.try_send(new_blob_event) {
835                Ok(()) => true,
836                Err(err) => {
837                    if matches!(err, mpsc::error::TrySendError::Full(_)) {
838                        debug!(
839                            target: "txpool",
840                            "[{:?}] failed to send blob sidecar; channel full",
841                            sidecar,
842                        );
843                        true
844                    } else {
845                        false
846                    }
847                }
848            }
849        })
850    }
851
852    /// Notifies transaction listeners about changes once a block was processed.
853    fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
854        trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
855
856        // notify about promoted pending transactions - emit hashes
857        let mut needs_pending_cleanup = false;
858        {
859            let listeners = self.pending_transaction_listener.read();
860            for listener in listeners.iter() {
861                if !listener.send_all(outcome.pending_transactions(listener.kind)) {
862                    needs_pending_cleanup = true;
863                }
864            }
865        }
866        if needs_pending_cleanup {
867            self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
868        }
869
870        // emit full transactions
871        let mut needs_tx_cleanup = false;
872        {
873            let listeners = self.transaction_listener.read();
874            for listener in listeners.iter() {
875                if !listener.send_all(outcome.full_pending_transactions(listener.kind)) {
876                    needs_tx_cleanup = true;
877                }
878            }
879        }
880        if needs_tx_cleanup {
881            self.transaction_listener.write().retain(|l| !l.sender.is_closed());
882        }
883
884        let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
885
886        // broadcast specific transaction events
887        self.with_event_listener(|listener| {
888            for tx in &mined {
889                listener.mined(tx, block_hash);
890            }
891            for tx in &promoted {
892                listener.pending(tx.hash(), None);
893            }
894            for tx in &discarded {
895                listener.discarded(tx.hash());
896            }
897        })
898    }
899
900    /// Notifies all listeners about the transaction movements.
901    ///
902    /// This will emit events according to the provided changes.
903    ///
904    /// CAUTION: This function is only intended to be used manually in order to use this type's
905    /// [`TransactionEvents`] receivers when manually implementing the
906    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
907    /// [`TransactionPool::transaction_event_listener`](crate::TransactionPool).
908    pub fn notify_on_transaction_updates(
909        &self,
910        promoted: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
911        discarded: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
912    ) {
913        // Notify about promoted pending transactions (similar to notify_on_new_state)
914        if !promoted.is_empty() {
915            let mut needs_pending_cleanup = false;
916            {
917                let listeners = self.pending_transaction_listener.read();
918                for listener in listeners.iter() {
919                    let promoted_hashes = promoted.iter().filter_map(|tx| {
920                        if listener.kind.is_propagate_only() && !tx.propagate {
921                            None
922                        } else {
923                            Some(*tx.hash())
924                        }
925                    });
926                    if !listener.send_all(promoted_hashes) {
927                        needs_pending_cleanup = true;
928                    }
929                }
930            }
931            if needs_pending_cleanup {
932                self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
933            }
934
935            // in this case we should also emit promoted transactions in full
936            let mut needs_tx_cleanup = false;
937            {
938                let listeners = self.transaction_listener.read();
939                for listener in listeners.iter() {
940                    let promoted_txs = promoted.iter().filter_map(|tx| {
941                        if listener.kind.is_propagate_only() && !tx.propagate {
942                            None
943                        } else {
944                            Some(NewTransactionEvent::pending(tx.clone()))
945                        }
946                    });
947                    if !listener.send_all(promoted_txs) {
948                        needs_tx_cleanup = true;
949                    }
950                }
951            }
952            if needs_tx_cleanup {
953                self.transaction_listener.write().retain(|l| !l.sender.is_closed());
954            }
955        }
956
957        self.with_event_listener(|listener| {
958            for tx in &promoted {
959                listener.pending(tx.hash(), None);
960            }
961            for tx in &discarded {
962                listener.discarded(tx.hash());
963            }
964        });
965
966        if !discarded.is_empty() {
967            // This deletes outdated blob txs from the blob store, based on the account's nonce.
968            // This is called during txpool maintenance when the pool drifted.
969            self.delete_discarded_blobs(discarded.iter());
970        }
971    }
972
973    /// Fire events for the newly added transaction if there are any.
974    ///
975    /// See also [`Self::add_transaction_event_listener`].
976    ///
977    /// CAUTION: This function is only intended to be used manually in order to use this type's
978    /// [`TransactionEvents`] receivers when manually implementing the
979    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
980    /// [`TransactionPool::transaction_event_listener`](crate::TransactionPool).
981    pub fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
982        self.with_event_listener(|listener| match tx {
983            AddedTransaction::Pending(tx) => {
984                let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
985
986                listener.pending(transaction.hash(), replaced.clone());
987                for tx in promoted {
988                    listener.pending(tx.hash(), None);
989                }
990                for tx in discarded {
991                    listener.discarded(tx.hash());
992                }
993            }
994            AddedTransaction::Parked { transaction, replaced, queued_reason, .. } => {
995                listener.queued(transaction.hash(), queued_reason.clone());
996                if let Some(replaced) = replaced {
997                    listener.replaced(replaced.clone(), *transaction.hash());
998                }
999            }
1000        });
1001    }
1002
1003    /// Returns an iterator that yields transactions that are ready to be included in the block.
1004    pub fn best_transactions(&self) -> BestTransactions<T> {
1005        self.get_pool_data().best_transactions()
1006    }
1007
1008    /// Returns an iterator that yields transactions that are ready to be included in the block with
1009    /// the given base fee and optional blob fee attributes.
1010    pub fn best_transactions_with_attributes(
1011        &self,
1012        best_transactions_attributes: BestTransactionsAttributes,
1013    ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
1014    {
1015        self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
1016    }
1017
1018    /// Returns only the first `max` transactions in the pending pool.
1019    pub fn pending_transactions_max(
1020        &self,
1021        max: usize,
1022    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1023        self.get_pool_data().pending_transactions_iter().take(max).collect()
1024    }
1025
1026    /// Returns all transactions from the pending sub-pool
1027    pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1028        self.get_pool_data().pending_transactions()
1029    }
1030
1031    /// Returns all transactions from parked pools
1032    pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1033        self.get_pool_data().queued_transactions()
1034    }
1035
1036    /// Returns all transactions in the pool
1037    pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
1038        let pool = self.get_pool_data();
1039        AllPoolTransactions {
1040            pending: pool.pending_transactions(),
1041            queued: pool.queued_transactions(),
1042        }
1043    }
1044
1045    /// Returns _all_ transactions in the pool
1046    pub fn all_transaction_hashes(&self) -> Vec<TxHash> {
1047        self.get_pool_data().all().transactions_iter().map(|tx| *tx.hash()).collect()
1048    }
1049
1050    /// Removes and returns all matching transactions from the pool.
1051    ///
1052    /// This behaves as if the transactions got discarded (_not_ mined), effectively introducing a
1053    /// nonce gap for the given transactions.
1054    pub fn remove_transactions(
1055        &self,
1056        hashes: Vec<TxHash>,
1057    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1058        if hashes.is_empty() {
1059            return Vec::new()
1060        }
1061        let removed = self.pool.write().remove_transactions(hashes);
1062
1063        self.with_event_listener(|listener| listener.discarded_many(&removed));
1064
1065        removed
1066    }
1067
1068    /// Removes and returns all matching transactions and their dependent transactions from the
1069    /// pool.
1070    pub fn remove_transactions_and_descendants(
1071        &self,
1072        hashes: Vec<TxHash>,
1073    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1074        if hashes.is_empty() {
1075            return Vec::new()
1076        }
1077        let removed = self.pool.write().remove_transactions_and_descendants(hashes);
1078
1079        self.with_event_listener(|listener| {
1080            for tx in &removed {
1081                listener.discarded(tx.hash());
1082            }
1083        });
1084
1085        removed
1086    }
1087
1088    /// Removes and returns all transactions by the specified sender from the pool.
1089    pub fn remove_transactions_by_sender(
1090        &self,
1091        sender: Address,
1092    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1093        let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1094        let removed = self.pool.write().remove_transactions_by_sender(sender_id);
1095
1096        self.with_event_listener(|listener| listener.discarded_many(&removed));
1097
1098        removed
1099    }
1100
1101    /// Prunes and returns all matching transactions from the pool.
1102    ///
1103    /// This removes the transactions as if they were mined: descendant transactions are **not**
1104    /// parked and remain eligible for inclusion.
1105    pub fn prune_transactions(
1106        &self,
1107        hashes: Vec<TxHash>,
1108    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1109        if hashes.is_empty() {
1110            return Vec::new()
1111        }
1112
1113        self.pool.write().prune_transactions(hashes)
1114    }
1115
1116    /// Removes and returns all transactions that are present in the pool.
1117    pub fn retain_unknown<A>(&self, announcement: &mut A)
1118    where
1119        A: HandleMempoolData,
1120    {
1121        if announcement.is_empty() {
1122            return
1123        }
1124        let pool = self.get_pool_data();
1125        announcement.retain_by_hash(|tx| !pool.contains(tx))
1126    }
1127
1128    /// Returns the transaction by hash.
1129    pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1130        self.get_pool_data().get(tx_hash)
1131    }
1132
1133    /// Returns all transactions of the address
1134    pub fn get_transactions_by_sender(
1135        &self,
1136        sender: Address,
1137    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1138        let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1139        self.get_pool_data().get_transactions_by_sender(sender_id)
1140    }
1141
1142    /// Returns a pending transaction sent by the given sender with the given nonce.
1143    pub fn get_pending_transaction_by_sender_and_nonce(
1144        &self,
1145        sender: Address,
1146        nonce: u64,
1147    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1148        let sender_id = self.sender_id(&sender)?;
1149        self.get_pool_data().get_pending_transaction_by_sender_and_nonce(sender_id, nonce)
1150    }
1151
1152    /// Returns all queued transactions of the address by sender
1153    pub fn get_queued_transactions_by_sender(
1154        &self,
1155        sender: Address,
1156    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1157        let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1158        self.get_pool_data().queued_txs_by_sender(sender_id)
1159    }
1160
1161    /// Returns all pending transactions filtered by predicate
1162    pub fn pending_transactions_with_predicate(
1163        &self,
1164        predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
1165    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1166        self.get_pool_data().pending_transactions_with_predicate(predicate)
1167    }
1168
1169    /// Returns all pending transactions of the address by sender
1170    pub fn get_pending_transactions_by_sender(
1171        &self,
1172        sender: Address,
1173    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1174        let Some(sender_id) = self.sender_id(&sender) else { return Vec::new() };
1175        self.get_pool_data().pending_txs_by_sender(sender_id)
1176    }
1177
1178    /// Returns the highest transaction of the address
1179    pub fn get_highest_transaction_by_sender(
1180        &self,
1181        sender: Address,
1182    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1183        let sender_id = self.sender_id(&sender)?;
1184        self.get_pool_data().get_highest_transaction_by_sender(sender_id)
1185    }
1186
1187    /// Returns the transaction with the highest nonce that is executable given the on chain nonce.
1188    pub fn get_highest_consecutive_transaction_by_sender(
1189        &self,
1190        sender: Address,
1191        on_chain_nonce: u64,
1192    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1193        let sender_id = self.sender_id(&sender)?;
1194        self.get_pool_data().get_highest_consecutive_transaction_by_sender(
1195            sender_id.into_transaction_id(on_chain_nonce),
1196        )
1197    }
1198
1199    /// Returns the transaction given a [`TransactionId`]
1200    pub fn get_transaction_by_transaction_id(
1201        &self,
1202        transaction_id: &TransactionId,
1203    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1204        self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
1205    }
1206
1207    /// Returns all transactions that where submitted with the given [`TransactionOrigin`]
1208    pub fn get_transactions_by_origin(
1209        &self,
1210        origin: TransactionOrigin,
1211    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1212        self.get_pool_data()
1213            .all()
1214            .transactions_iter()
1215            .filter(|tx| tx.origin == origin)
1216            .cloned()
1217            .collect()
1218    }
1219
1220    /// Returns all pending transactions filtered by [`TransactionOrigin`]
1221    pub fn get_pending_transactions_by_origin(
1222        &self,
1223        origin: TransactionOrigin,
1224    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1225        self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
1226    }
1227
1228    /// Returns all the transactions belonging to the hashes.
1229    ///
1230    /// If no transaction exists, it is skipped.
1231    pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1232        if txs.is_empty() {
1233            return Vec::new()
1234        }
1235        self.get_pool_data().get_all(txs).collect()
1236    }
1237
1238    /// Returns all the transactions belonging to the hashes that are propagatable.
1239    ///
1240    /// If no transaction exists, it is skipped.
1241    fn get_all_propagatable(
1242        &self,
1243        txs: &[TxHash],
1244    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1245        if txs.is_empty() {
1246            return Vec::new()
1247        }
1248        let pool = self.get_pool_data();
1249        txs.iter().filter_map(|tx| pool.get(tx).filter(|tx| tx.propagate)).collect()
1250    }
1251
1252    /// Notify about propagated transactions.
1253    pub fn on_propagated(&self, txs: PropagatedTransactions) {
1254        if txs.is_empty() {
1255            return
1256        }
1257        self.with_event_listener(|listener| {
1258            txs.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers));
1259        });
1260    }
1261
1262    /// Number of transactions in the entire pool
1263    pub fn len(&self) -> usize {
1264        self.get_pool_data().len()
1265    }
1266
1267    /// Whether the pool is empty
1268    pub fn is_empty(&self) -> bool {
1269        self.get_pool_data().is_empty()
1270    }
1271
1272    /// Returns whether or not the pool is over its configured size and transaction count limits.
1273    pub fn is_exceeded(&self) -> bool {
1274        self.pool.read().is_exceeded()
1275    }
1276
1277    /// Inserts a blob transaction into the blob store
1278    fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecarVariant) {
1279        debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
1280        if let Err(err) = self.blob_store.insert(hash, blob) {
1281            warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
1282            self.blob_store_metrics.blobstore_failed_inserts.increment(1);
1283        }
1284        self.update_blob_store_metrics();
1285    }
1286
1287    /// Delete a blob from the blob store
1288    pub fn delete_blob(&self, blob: TxHash) {
1289        let _ = self.blob_store.delete(blob);
1290    }
1291
1292    /// Delete all blobs from the blob store
1293    pub fn delete_blobs(&self, txs: Vec<TxHash>) {
1294        let _ = self.blob_store.delete_all(txs);
1295    }
1296
1297    /// Cleans up the blob store
1298    pub fn cleanup_blobs(&self) {
1299        let stat = self.blob_store.cleanup();
1300        self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
1301        self.update_blob_store_metrics();
1302    }
1303
1304    fn update_blob_store_metrics(&self) {
1305        if let Some(data_size) = self.blob_store.data_size_hint() {
1306            self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
1307        }
1308        self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
1309    }
1310
1311    /// Deletes all blob transactions that were discarded.
1312    fn delete_discarded_blobs<'a>(
1313        &'a self,
1314        transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
1315    ) {
1316        let blob_txs = transactions
1317            .into_iter()
1318            .filter(|tx| tx.transaction.is_eip4844())
1319            .map(|tx| *tx.hash())
1320            .collect();
1321        self.delete_blobs(blob_txs);
1322    }
1323}
1324
1325impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
1326    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1327        f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
1328    }
1329}
1330
1331/// Metadata for a transaction that was added to the pool.
1332///
1333/// This holds all the data needed to complete post-insertion operations (notifications,
1334/// blob storage).
1335#[derive(Debug)]
1336struct AddedTransactionMeta<T: PoolTransaction> {
1337    /// The transaction that was added to the pool
1338    added: AddedTransaction<T>,
1339    /// Optional blob sidecar for EIP-4844 transactions
1340    blob_sidecar: Option<BlobTransactionSidecarVariant>,
1341}
1342
1343/// Tracks an added transaction and all graph changes caused by adding it.
1344#[derive(Debug, Clone)]
1345pub struct AddedPendingTransaction<T: PoolTransaction> {
1346    /// Inserted transaction.
1347    pub transaction: Arc<ValidPoolTransaction<T>>,
1348    /// Replaced transaction.
1349    pub replaced: Option<Arc<ValidPoolTransaction<T>>>,
1350    /// transactions promoted to the pending queue
1351    pub promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1352    /// transactions that failed and became discarded
1353    pub discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1354}
1355
1356impl<T: PoolTransaction> AddedPendingTransaction<T> {
1357    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1358    /// [`TransactionListenerKind`].
1359    ///
1360    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1361    /// are allowed to be propagated are returned.
1362    pub(crate) fn pending_transactions(
1363        &self,
1364        kind: TransactionListenerKind,
1365    ) -> impl Iterator<Item = B256> + '_ {
1366        let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1367        PendingTransactionIter { kind, iter }
1368    }
1369}
1370
1371pub(crate) struct PendingTransactionIter<Iter> {
1372    kind: TransactionListenerKind,
1373    iter: Iter,
1374}
1375
1376impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1377where
1378    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1379    T: PoolTransaction + 'a,
1380{
1381    type Item = B256;
1382
1383    fn next(&mut self) -> Option<Self::Item> {
1384        loop {
1385            let next = self.iter.next()?;
1386            if self.kind.is_propagate_only() && !next.propagate {
1387                continue
1388            }
1389            return Some(*next.hash())
1390        }
1391    }
1392}
1393
1394/// An iterator over full pending transactions
1395pub(crate) struct FullPendingTransactionIter<Iter> {
1396    kind: TransactionListenerKind,
1397    iter: Iter,
1398}
1399
1400impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1401where
1402    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1403    T: PoolTransaction + 'a,
1404{
1405    type Item = NewTransactionEvent<T>;
1406
1407    fn next(&mut self) -> Option<Self::Item> {
1408        loop {
1409            let next = self.iter.next()?;
1410            if self.kind.is_propagate_only() && !next.propagate {
1411                continue
1412            }
1413            return Some(NewTransactionEvent {
1414                subpool: SubPool::Pending,
1415                transaction: next.clone(),
1416            })
1417        }
1418    }
1419}
1420
1421/// Represents a transaction that was added into the pool and its state
1422#[derive(Debug, Clone)]
1423pub enum AddedTransaction<T: PoolTransaction> {
1424    /// Transaction was successfully added and moved to the pending pool.
1425    Pending(AddedPendingTransaction<T>),
1426    /// Transaction was successfully added but not yet ready for processing and moved to a
1427    /// parked pool instead.
1428    Parked {
1429        /// Inserted transaction.
1430        transaction: Arc<ValidPoolTransaction<T>>,
1431        /// Replaced transaction.
1432        replaced: Option<Arc<ValidPoolTransaction<T>>>,
1433        /// The subpool it was moved to.
1434        subpool: SubPool,
1435        /// The specific reason why the transaction is queued (if applicable).
1436        queued_reason: Option<QueuedReason>,
1437    },
1438}
1439
1440impl<T: PoolTransaction> AddedTransaction<T> {
1441    /// Returns whether the transaction has been added to the pending pool.
1442    pub const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1443        match self {
1444            Self::Pending(tx) => Some(tx),
1445            _ => None,
1446        }
1447    }
1448
1449    /// Returns the replaced transaction if there was one
1450    pub const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1451        match self {
1452            Self::Pending(tx) => tx.replaced.as_ref(),
1453            Self::Parked { replaced, .. } => replaced.as_ref(),
1454        }
1455    }
1456
1457    /// Returns the discarded transactions if there were any
1458    pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1459        match self {
1460            Self::Pending(tx) => Some(&tx.discarded),
1461            Self::Parked { .. } => None,
1462        }
1463    }
1464
1465    /// Returns the hash of the replaced transaction if it is a blob transaction.
1466    pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1467        self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1468    }
1469
1470    /// Returns the hash of the transaction
1471    pub fn hash(&self) -> &TxHash {
1472        match self {
1473            Self::Pending(tx) => tx.transaction.hash(),
1474            Self::Parked { transaction, .. } => transaction.hash(),
1475        }
1476    }
1477
1478    /// Converts this type into the event type for listeners
1479    pub fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1480        match self {
1481            Self::Pending(tx) => {
1482                NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1483            }
1484            Self::Parked { transaction, subpool, .. } => {
1485                NewTransactionEvent { transaction, subpool }
1486            }
1487        }
1488    }
1489
1490    /// Returns the subpool this transaction was added to
1491    pub(crate) const fn subpool(&self) -> SubPool {
1492        match self {
1493            Self::Pending(_) => SubPool::Pending,
1494            Self::Parked { subpool, .. } => *subpool,
1495        }
1496    }
1497
1498    /// Returns the [`TransactionId`] of the added transaction
1499    #[cfg(test)]
1500    pub(crate) fn id(&self) -> &TransactionId {
1501        match self {
1502            Self::Pending(added) => added.transaction.id(),
1503            Self::Parked { transaction, .. } => transaction.id(),
1504        }
1505    }
1506
1507    /// Returns the queued reason if the transaction is parked with a queued reason.
1508    pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1509        match self {
1510            Self::Pending(_) => None,
1511            Self::Parked { queued_reason, .. } => queued_reason.as_ref(),
1512        }
1513    }
1514
1515    /// Returns the transaction state based on the subpool and queued reason.
1516    pub fn transaction_state(&self) -> AddedTransactionState {
1517        match self.subpool() {
1518            SubPool::Pending => AddedTransactionState::Pending,
1519            _ => {
1520                // For non-pending transactions, use the queued reason directly from the
1521                // AddedTransaction
1522                if let Some(reason) = self.queued_reason() {
1523                    AddedTransactionState::Queued(reason.clone())
1524                } else {
1525                    // Fallback - this shouldn't happen with the new implementation
1526                    AddedTransactionState::Queued(QueuedReason::NonceGap)
1527                }
1528            }
1529        }
1530    }
1531}
1532
1533/// The specific reason why a transaction is queued (not ready for execution)
1534#[derive(Debug, Clone, PartialEq, Eq)]
1535pub enum QueuedReason {
1536    /// Transaction has a nonce gap - missing prior transactions
1537    NonceGap,
1538    /// Transaction has parked ancestors - waiting for other transactions to be mined
1539    ParkedAncestors,
1540    /// Sender has insufficient balance to cover the transaction cost
1541    InsufficientBalance,
1542    /// Transaction exceeds the block gas limit
1543    TooMuchGas,
1544    /// Transaction doesn't meet the base fee requirement
1545    InsufficientBaseFee,
1546    /// Transaction doesn't meet the blob fee requirement (EIP-4844)
1547    InsufficientBlobFee,
1548}
1549
1550/// The state of a transaction when is was added to the pool
1551#[derive(Debug, Clone, PartialEq, Eq)]
1552pub enum AddedTransactionState {
1553    /// Ready for execution
1554    Pending,
1555    /// Not ready for execution due to a specific condition
1556    Queued(QueuedReason),
1557}
1558
1559impl AddedTransactionState {
1560    /// Returns whether the transaction was submitted as queued.
1561    pub const fn is_queued(&self) -> bool {
1562        matches!(self, Self::Queued(_))
1563    }
1564
1565    /// Returns whether the transaction was submitted as pending.
1566    pub const fn is_pending(&self) -> bool {
1567        matches!(self, Self::Pending)
1568    }
1569
1570    /// Returns the specific queued reason if the transaction is queued.
1571    pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1572        match self {
1573            Self::Queued(reason) => Some(reason),
1574            Self::Pending => None,
1575        }
1576    }
1577}
1578
1579/// The outcome of a successful transaction addition
1580#[derive(Debug, Clone, PartialEq, Eq)]
1581pub struct AddedTransactionOutcome {
1582    /// The hash of the transaction
1583    pub hash: TxHash,
1584    /// The state of the transaction
1585    pub state: AddedTransactionState,
1586}
1587
1588impl AddedTransactionOutcome {
1589    /// Returns whether the transaction was submitted as queued.
1590    pub const fn is_queued(&self) -> bool {
1591        self.state.is_queued()
1592    }
1593
1594    /// Returns whether the transaction was submitted as pending.
1595    pub const fn is_pending(&self) -> bool {
1596        self.state.is_pending()
1597    }
1598}
1599
1600/// Contains all state changes after a [`CanonicalStateUpdate`] was processed
1601#[derive(Debug)]
1602pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1603    /// Hash of the block.
1604    pub(crate) block_hash: B256,
1605    /// All mined transactions.
1606    pub(crate) mined: Vec<TxHash>,
1607    /// Transactions promoted to the pending pool.
1608    pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1609    /// transaction that were discarded during the update
1610    pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1611}
1612
1613impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1614    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1615    /// [`TransactionListenerKind`].
1616    ///
1617    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1618    /// are allowed to be propagated are returned.
1619    pub(crate) fn pending_transactions(
1620        &self,
1621        kind: TransactionListenerKind,
1622    ) -> impl Iterator<Item = B256> + '_ {
1623        let iter = self.promoted.iter();
1624        PendingTransactionIter { kind, iter }
1625    }
1626
1627    /// Returns all FULL transactions that were promoted to the pending pool and adhere to the given
1628    /// [`TransactionListenerKind`].
1629    ///
1630    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1631    /// are allowed to be propagated are returned.
1632    pub(crate) fn full_pending_transactions(
1633        &self,
1634        kind: TransactionListenerKind,
1635    ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1636        let iter = self.promoted.iter();
1637        FullPendingTransactionIter { kind, iter }
1638    }
1639}
1640
1641#[cfg(test)]
1642mod tests {
1643    use crate::{
1644        blobstore::{BlobStore, InMemoryBlobStore},
1645        identifier::SenderId,
1646        test_utils::{MockTransaction, TestPoolBuilder},
1647        validate::ValidTransaction,
1648        BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1649    };
1650    use alloy_eips::{eip4844::BlobTransactionSidecar, eip7594::BlobTransactionSidecarVariant};
1651    use alloy_primitives::Address;
1652    use std::{fs, path::PathBuf};
1653
1654    #[test]
1655    fn test_discard_blobs_on_blob_tx_eviction() {
1656        let blobs = {
1657            // Read the contents of the JSON file into a string.
1658            let json_content = fs::read_to_string(
1659                PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1660            )
1661            .expect("Failed to read the blob data file");
1662
1663            // Parse the JSON contents into a serde_json::Value.
1664            let json_value: serde_json::Value =
1665                serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1666
1667            // Extract blob data from JSON and convert it to Blob.
1668            vec![
1669                // Extract the "data" field from the JSON and parse it as a string.
1670                json_value
1671                    .get("data")
1672                    .unwrap()
1673                    .as_str()
1674                    .expect("Data is not a valid string")
1675                    .to_string(),
1676            ]
1677        };
1678
1679        // Generate a BlobTransactionSidecar from the blobs.
1680        let sidecar = BlobTransactionSidecarVariant::Eip4844(
1681            BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap(),
1682        );
1683
1684        // Define the maximum limit for blobs in the sub-pool.
1685        let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1686
1687        // Create a test pool with default configuration and the specified blob limit.
1688        let test_pool = &TestPoolBuilder::default()
1689            .with_config(PoolConfig { blob_limit, ..Default::default() })
1690            .pool;
1691
1692        // Set the block info for the pool, including a pending blob fee.
1693        test_pool
1694            .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1695
1696        // Create an in-memory blob store.
1697        let blob_store = InMemoryBlobStore::default();
1698
1699        // Loop to add transactions to the pool and test blob eviction.
1700        for n in 0..blob_limit.max_txs + 10 {
1701            // Create a mock transaction with the generated blob sidecar.
1702            let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1703
1704            // Set non zero size
1705            tx.set_size(1844674407370951);
1706
1707            // Insert the sidecar into the blob store if the current index is within the blob limit.
1708            if n < blob_limit.max_txs {
1709                blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1710            }
1711
1712            // Add the transaction to the pool with external origin and valid outcome.
1713            test_pool.add_transactions(
1714                TransactionOrigin::External,
1715                [TransactionValidationOutcome::Valid {
1716                    balance: U256::from(1_000),
1717                    state_nonce: 0,
1718                    bytecode_hash: None,
1719                    transaction: ValidTransaction::ValidWithSidecar {
1720                        transaction: tx,
1721                        sidecar: sidecar.clone(),
1722                    },
1723                    propagate: true,
1724                    authorities: None,
1725                }],
1726            );
1727        }
1728
1729        // Assert that the size of the pool's blob component is equal to the maximum blob limit.
1730        assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1731
1732        // Assert that the size of the pool's blob_size component matches the expected value.
1733        assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1734
1735        // Assert that the pool's blob store matches the expected blob store.
1736        assert_eq!(*test_pool.blob_store(), blob_store);
1737    }
1738
1739    #[test]
1740    fn test_auths_stored_in_identifiers() {
1741        // Create a test pool with default configuration.
1742        let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1743
1744        let auth = Address::new([1; 20]);
1745        let tx = MockTransaction::eip7702();
1746
1747        test_pool.add_transactions(
1748            TransactionOrigin::Local,
1749            [TransactionValidationOutcome::Valid {
1750                balance: U256::from(1_000),
1751                state_nonce: 0,
1752                bytecode_hash: None,
1753                transaction: ValidTransaction::Valid(tx),
1754                propagate: true,
1755                authorities: Some(vec![auth]),
1756            }],
1757        );
1758
1759        let identifiers = test_pool.identifiers.read();
1760        assert_eq!(identifiers.sender_id(&auth), Some(SenderId::from(1)));
1761    }
1762
1763    #[test]
1764    fn sender_queries_do_not_allocate_ids_for_unknown_addresses() {
1765        let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1766        let sender = Address::new([9; 20]);
1767
1768        assert_eq!(test_pool.sender_id(&sender), None);
1769        assert!(test_pool.get_transactions_by_sender(sender).is_empty());
1770        assert!(test_pool.get_pending_transaction_by_sender_and_nonce(sender, 0).is_none());
1771        assert!(test_pool.get_queued_transactions_by_sender(sender).is_empty());
1772        assert!(test_pool.get_pending_transactions_by_sender(sender).is_empty());
1773        assert!(test_pool.get_highest_transaction_by_sender(sender).is_none());
1774        assert!(test_pool.get_highest_consecutive_transaction_by_sender(sender, 0).is_none());
1775        assert!(test_pool.remove_transactions_by_sender(sender).is_empty());
1776        assert_eq!(test_pool.sender_id(&sender), None);
1777    }
1778}