Skip to main content

reth_transaction_pool/pool/
mod.rs

1//! Transaction Pool internals.
2//!
3//! Incoming transactions are validated before they enter the pool first. The validation outcome can
4//! have 3 states:
5//!
6//!  1. Transaction can _never_ be valid
7//!  2. Transaction is _currently_ valid
8//!  3. Transaction is _currently_ invalid, but could potentially become valid in the future
9//!
10//! However, (2.) and (3.) of a transaction can only be determined on the basis of the current
11//! state, whereas (1.) holds indefinitely. This means once the state changes (2.) and (3.) the
12//! state of a transaction needs to be reevaluated again.
13//!
14//! The transaction pool is responsible for storing new, valid transactions and providing the next
15//! best transactions sorted by their priority. Where priority is determined by the transaction's
16//! score ([`TransactionOrdering`]).
17//!
18//! Furthermore, the following characteristics fall under (3.):
19//!
20//!  a) Nonce of a transaction is higher than the expected nonce for the next transaction of its
21//! sender. A distinction is made here whether multiple transactions from the same sender have
22//! gapless nonce increments.
23//!
24//!  a)(1) If _no_ transaction is missing in a chain of multiple
25//! transactions from the same sender (all nonce in row), all of them can in principle be executed
26//! on the current state one after the other.
27//!
28//!  a)(2) If there's a nonce gap, then all
29//! transactions after the missing transaction are blocked until the missing transaction arrives.
30//!
31//!  b) Transaction does not meet the dynamic fee cap requirement introduced by EIP-1559: The
32//! fee cap of the transaction needs to be no less than the base fee of block.
33//!
34//!
35//! In essence the transaction pool is made of three separate sub-pools:
36//!
37//!  - Pending Pool: Contains all transactions that are valid on the current state and satisfy (3.
38//!    a)(1): _No_ nonce gaps. A _pending_ transaction is considered _ready_ when it has the lowest
39//!    nonce of all transactions from the same sender. Once a _ready_ transaction with nonce `n` has
40//!    been executed, the next highest transaction from the same sender `n + 1` becomes ready.
41//!
42//!  - Queued Pool: Contains all transactions that are currently blocked by missing transactions:
43//!    (3. a)(2): _With_ nonce gaps or due to lack of funds.
44//!
45//!  - Basefee Pool: To account for the dynamic base fee requirement (3. b) which could render an
46//!    EIP-1559 and all subsequent transactions of the sender currently invalid.
47//!
48//! The classification of transactions is always dependent on the current state that is changed as
49//! soon as a new block is mined. Once a new block is mined, the account changeset must be applied
50//! to the transaction pool.
51//!
52//!
53//! Depending on the use case, consumers of the [`TransactionPool`](crate::traits::TransactionPool)
54//! are interested in (2.) and/or (3.).
55
56//! A generic [`TransactionPool`](crate::traits::TransactionPool) that only handles transactions.
57//!
58//! This Pool maintains two separate sub-pools for (2.) and (3.)
59//!
60//! ## Terminology
61//!
62//!  - _Pending_: pending transactions are transactions that fall under (2.). These transactions can
63//!    currently be executed and are stored in the pending sub-pool
64//!  - _Queued_: queued transactions are transactions that fall under category (3.). Those
65//!    transactions are _currently_ waiting for state changes that eventually move them into
66//!    category (2.) and become pending.
67
68use crate::{
69    blobstore::BlobStore,
70    error::{PoolError, PoolErrorKind, PoolResult},
71    identifier::{SenderId, SenderIdentifiers, TransactionId},
72    metrics::BlobStoreMetrics,
73    pool::{
74        listener::{
75            BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76            TransactionListener,
77        },
78        state::SubPool,
79        txpool::{SenderInfo, TxPool},
80        update::UpdateOutcome,
81    },
82    traits::{
83        AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84        NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85    },
86    validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87    CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88    TransactionValidator,
89};
90
91use alloy_primitives::{
92    map::{AddressSet, HashSet},
93    Address, TxHash, B256,
94};
95use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
96use reth_eth_wire_types::HandleMempoolData;
97use reth_execution_types::ChangedAccount;
98
99use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718};
100use reth_primitives_traits::Recovered;
101use rustc_hash::FxHashMap;
102use std::{
103    fmt,
104    sync::{
105        atomic::{AtomicBool, Ordering},
106        Arc,
107    },
108    time::Instant,
109};
110use tokio::sync::mpsc;
111use tracing::{debug, trace, warn};
112mod events;
113pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
114pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
115pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
116pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
117pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
118pub use pending::PendingPool;
119
120mod best;
121pub use best::BestTransactions;
122
123mod blob;
124pub mod listener;
125mod parked;
126pub mod pending;
127pub mod size;
128pub(crate) mod state;
129pub mod txpool;
130mod update;
131
132/// Bound on number of pending transactions from `reth_network::TransactionsManager` to buffer.
133pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
134/// Bound on number of new transactions from `reth_network::TransactionsManager` to buffer.
135pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
136
137const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
138
139/// Transaction pool internals.
140pub struct PoolInner<V, T, S>
141where
142    T: TransactionOrdering,
143{
144    /// Internal mapping of addresses to plain ints.
145    identifiers: RwLock<SenderIdentifiers>,
146    /// Transaction validator.
147    validator: V,
148    /// Storage for blob transactions
149    blob_store: S,
150    /// The internal pool that manages all transactions.
151    pool: RwLock<TxPool<T>>,
152    /// Pool settings.
153    config: PoolConfig,
154    /// Manages listeners for transaction state change events.
155    event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
156    /// Tracks whether any event listeners have ever been installed.
157    has_event_listeners: AtomicBool,
158    /// Listeners for new _full_ pending transactions.
159    pending_transaction_listener: RwLock<Vec<PendingTransactionHashListener>>,
160    /// Listeners for new transactions added to the pool.
161    transaction_listener: RwLock<Vec<TransactionListener<T::Transaction>>>,
162    /// Listener for new blob transaction sidecars added to the pool.
163    blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
164    /// Metrics for the blob store
165    blob_store_metrics: BlobStoreMetrics,
166}
167
168// === impl PoolInner ===
169
170impl<V, T, S> PoolInner<V, T, S>
171where
172    V: TransactionValidator,
173    T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
174    S: BlobStore,
175{
176    /// Create a new transaction pool instance.
177    pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
178        Self {
179            identifiers: Default::default(),
180            validator,
181            event_listener: Default::default(),
182            has_event_listeners: AtomicBool::new(false),
183            pool: RwLock::new(TxPool::new(ordering, config.clone())),
184            pending_transaction_listener: Default::default(),
185            transaction_listener: Default::default(),
186            blob_transaction_sidecar_listener: Default::default(),
187            config,
188            blob_store,
189            blob_store_metrics: Default::default(),
190        }
191    }
192
193    /// Returns the configured blob store.
194    pub const fn blob_store(&self) -> &S {
195        &self.blob_store
196    }
197
198    /// Returns stats about the size of the pool.
199    pub fn size(&self) -> PoolSize {
200        self.get_pool_data().size()
201    }
202
203    /// Returns the currently tracked block
204    pub fn block_info(&self) -> BlockInfo {
205        self.get_pool_data().block_info()
206    }
207    /// Sets the currently tracked block
208    pub fn set_block_info(&self, info: BlockInfo) {
209        self.pool.write().set_block_info(info)
210    }
211
212    /// Returns the internal [`SenderId`] for this address
213    pub fn get_sender_id(&self, addr: Address) -> SenderId {
214        self.identifiers.write().sender_id_or_create(addr)
215    }
216
217    /// Returns the internal [`SenderId`]s for the given addresses.
218    pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
219        self.identifiers.write().sender_ids_or_create(addrs)
220    }
221
222    /// Returns all senders in the pool
223    pub fn unique_senders(&self) -> AddressSet {
224        self.get_pool_data().unique_senders()
225    }
226
227    /// Converts the changed accounts to a map of sender ids to sender info (internal identifier
228    /// used for __tracked__ accounts)
229    fn changed_senders(
230        &self,
231        accs: impl Iterator<Item = ChangedAccount>,
232    ) -> FxHashMap<SenderId, SenderInfo> {
233        let identifiers = self.identifiers.read();
234        accs.into_iter()
235            .filter_map(|acc| {
236                let ChangedAccount { address, nonce, balance } = acc;
237                let sender_id = identifiers.sender_id(&address)?;
238                Some((sender_id, SenderInfo { state_nonce: nonce, balance }))
239            })
240            .collect()
241    }
242
243    /// Get the config the pool was configured with.
244    pub const fn config(&self) -> &PoolConfig {
245        &self.config
246    }
247
248    /// Get the validator reference.
249    pub const fn validator(&self) -> &V {
250        &self.validator
251    }
252
253    /// Adds a new transaction listener to the pool that gets notified about every new _pending_
254    /// transaction inserted into the pool
255    pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
256        let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
257        let listener = PendingTransactionHashListener { sender, kind };
258
259        let mut listeners = self.pending_transaction_listener.write();
260        // Clean up dead listeners before adding new one
261        listeners.retain(|l| !l.sender.is_closed());
262        listeners.push(listener);
263
264        rx
265    }
266
267    /// Adds a new transaction listener to the pool that gets notified about every new transaction.
268    pub fn add_new_transaction_listener(
269        &self,
270        kind: TransactionListenerKind,
271    ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
272        let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
273        let listener = TransactionListener { sender, kind };
274
275        let mut listeners = self.transaction_listener.write();
276        // Clean up dead listeners before adding new one
277        listeners.retain(|l| !l.sender.is_closed());
278        listeners.push(listener);
279
280        rx
281    }
282    /// Adds a new blob sidecar listener to the pool that gets notified about every new
283    /// eip4844 transaction's blob sidecar.
284    pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
285        let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
286        let listener = BlobTransactionSidecarListener { sender };
287        self.blob_transaction_sidecar_listener.lock().push(listener);
288        rx
289    }
290
291    /// If the pool contains the transaction, this adds a new listener that gets notified about
292    /// transaction events.
293    pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
294        if !self.get_pool_data().contains(&tx_hash) {
295            return None
296        }
297        let mut listener = self.event_listener.write();
298        let events = listener.subscribe(tx_hash);
299        self.mark_event_listener_installed();
300        Some(events)
301    }
302
303    /// Adds a listener for all transaction events.
304    pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
305        let mut listener = self.event_listener.write();
306        let events = listener.subscribe_all();
307        self.mark_event_listener_installed();
308        events
309    }
310
311    #[inline]
312    fn has_event_listeners(&self) -> bool {
313        self.has_event_listeners.load(Ordering::Relaxed)
314    }
315
316    #[inline]
317    fn mark_event_listener_installed(&self) {
318        self.has_event_listeners.store(true, Ordering::Relaxed);
319    }
320
321    #[inline]
322    fn update_event_listener_state(&self, listener: &PoolEventBroadcast<T::Transaction>) {
323        if listener.is_empty() {
324            self.has_event_listeners.store(false, Ordering::Relaxed);
325        }
326    }
327
328    #[inline]
329    fn with_event_listener<F>(&self, emit: F)
330    where
331        F: FnOnce(&mut PoolEventBroadcast<T::Transaction>),
332    {
333        if !self.has_event_listeners() {
334            return
335        }
336        let mut listener = self.event_listener.write();
337        if !listener.is_empty() {
338            emit(&mut listener);
339        }
340        self.update_event_listener_state(&listener);
341    }
342
343    /// Returns a read lock to the pool's data.
344    pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
345        self.pool.read()
346    }
347
348    /// Returns transactions in the pool that can be propagated
349    pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
350        let mut out = Vec::new();
351        self.append_pooled_transactions(&mut out);
352        out
353    }
354
355    /// Returns hashes of transactions in the pool that can be propagated.
356    pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
357        let mut out = Vec::new();
358        self.append_pooled_transactions_hashes(&mut out);
359        out
360    }
361
362    /// Returns only the first `max` transactions in the pool that can be propagated.
363    pub fn pooled_transactions_max(
364        &self,
365        max: usize,
366    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
367        let mut out = Vec::new();
368        self.append_pooled_transactions_max(max, &mut out);
369        out
370    }
371
372    /// Extends the given vector with all transactions in the pool that can be propagated.
373    pub fn append_pooled_transactions(
374        &self,
375        out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
376    ) {
377        out.extend(
378            self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned(),
379        );
380    }
381
382    /// Extends the given vector with pooled transactions for the given hashes that are allowed to
383    /// be propagated.
384    pub fn append_pooled_transaction_elements(
385        &self,
386        tx_hashes: &[TxHash],
387        limit: GetPooledTransactionLimit,
388        out: &mut Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>,
389    ) where
390        <V as TransactionValidator>::Transaction: EthPoolTransaction,
391    {
392        let transactions = self.get_all_propagatable(tx_hashes);
393        let mut size = 0;
394        for transaction in transactions {
395            let encoded_len = transaction.encoded_length();
396            let Some(pooled) = self.to_pooled_transaction(transaction) else {
397                continue;
398            };
399
400            size += encoded_len;
401            out.push(pooled.into_inner());
402
403            if limit.exceeds(size) {
404                break
405            }
406        }
407    }
408
409    /// Extends the given vector with the hashes of all transactions in the pool that can be
410    /// propagated.
411    pub fn append_pooled_transactions_hashes(&self, out: &mut Vec<TxHash>) {
412        out.extend(
413            self.get_pool_data()
414                .all()
415                .transactions_iter()
416                .filter(|tx| tx.propagate)
417                .map(|tx| *tx.hash()),
418        );
419    }
420
421    /// Extends the given vector with only the first `max` transactions in the pool that can be
422    /// propagated.
423    pub fn append_pooled_transactions_max(
424        &self,
425        max: usize,
426        out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
427    ) {
428        out.extend(
429            self.get_pool_data()
430                .all()
431                .transactions_iter()
432                .filter(|tx| tx.propagate)
433                .take(max)
434                .cloned(),
435        );
436    }
437
438    /// Returns only the first `max` hashes of transactions in the pool that can be propagated.
439    pub fn pooled_transactions_hashes_max(&self, max: usize) -> Vec<TxHash> {
440        if max == 0 {
441            return Vec::new();
442        }
443        self.get_pool_data()
444            .all()
445            .transactions_iter()
446            .filter(|tx| tx.propagate)
447            .take(max)
448            .map(|tx| *tx.hash())
449            .collect()
450    }
451
452    /// Converts the internally tracked transaction to the pooled format.
453    ///
454    /// If the transaction is an EIP-4844 transaction, the blob sidecar is fetched from the blob
455    /// store and attached to the transaction.
456    fn to_pooled_transaction(
457        &self,
458        transaction: Arc<ValidPoolTransaction<T::Transaction>>,
459    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
460    where
461        <V as TransactionValidator>::Transaction: EthPoolTransaction,
462    {
463        if transaction.is_eip4844() {
464            let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
465            transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
466        } else {
467            transaction
468                .transaction
469                .clone_into_pooled()
470                .inspect_err(|err| {
471                    debug!(
472                        target: "txpool", %err,
473                        "failed to convert transaction to pooled element; skipping",
474                    );
475                })
476                .ok()
477        }
478    }
479
480    /// Returns pooled transactions for the given transaction hashes that are allowed to be
481    /// propagated.
482    pub fn get_pooled_transaction_elements(
483        &self,
484        tx_hashes: Vec<TxHash>,
485        limit: GetPooledTransactionLimit,
486    ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
487    where
488        <V as TransactionValidator>::Transaction: EthPoolTransaction,
489    {
490        let mut elements = Vec::new();
491        self.append_pooled_transaction_elements(&tx_hashes, limit, &mut elements);
492        elements.shrink_to_fit();
493        elements
494    }
495
496    /// Returns converted pooled transaction for the given transaction hash.
497    pub fn get_pooled_transaction_element(
498        &self,
499        tx_hash: TxHash,
500    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
501    where
502        <V as TransactionValidator>::Transaction: EthPoolTransaction,
503    {
504        self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
505    }
506
507    /// Updates the entire pool after a new block was executed.
508    pub fn on_canonical_state_change(&self, update: CanonicalStateUpdate<'_, V::Block>) {
509        trace!(target: "txpool", ?update, "updating pool on canonical state change");
510
511        let block_info = update.block_info();
512        let CanonicalStateUpdate {
513            new_tip, changed_accounts, mined_transactions, update_kind, ..
514        } = update;
515        self.validator.on_new_head_block(new_tip);
516
517        let changed_senders = self.changed_senders(changed_accounts.into_iter());
518
519        // update the pool
520        let outcome = self.pool.write().on_canonical_state_change(
521            block_info,
522            mined_transactions,
523            changed_senders,
524            update_kind,
525        );
526
527        // This will discard outdated transactions based on the account's nonce
528        self.delete_discarded_blobs(outcome.discarded.iter());
529
530        // notify listeners about updates
531        self.notify_on_new_state(outcome);
532    }
533
534    /// Performs account updates on the pool.
535    ///
536    /// This will either promote or discard transactions based on the new account state.
537    ///
538    /// This should be invoked when the pool drifted and accounts are updated manually
539    pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
540        let changed_senders = self.changed_senders(accounts.into_iter());
541        let UpdateOutcome { promoted, discarded } =
542            self.pool.write().update_accounts(changed_senders);
543
544        self.notify_on_transaction_updates(promoted, discarded);
545    }
546
547    /// Add a single validated transaction into the pool.
548    ///
549    /// Returns the outcome and optionally metadata to be processed after the pool lock is
550    /// released.
551    ///
552    /// Note: this is only used internally by [`Self::add_transactions()`], all new transaction(s)
553    /// come in through that function, either as a batch or `std::iter::once`.
554    fn add_transaction(
555        &self,
556        pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
557        origin: TransactionOrigin,
558        tx: TransactionValidationOutcome<T::Transaction>,
559    ) -> (PoolResult<AddedTransactionOutcome>, Option<AddedTransactionMeta<T::Transaction>>) {
560        match tx {
561            TransactionValidationOutcome::Valid {
562                balance,
563                state_nonce,
564                transaction,
565                propagate,
566                bytecode_hash,
567                authorities,
568            } => {
569                let sender_id = self.get_sender_id(transaction.sender());
570                let transaction_id = TransactionId::new(sender_id, transaction.nonce());
571
572                // split the valid transaction and the blob sidecar if it has any
573                let (transaction, blob_sidecar) = match transaction {
574                    ValidTransaction::Valid(tx) => (tx, None),
575                    ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
576                        debug_assert!(
577                            transaction.is_eip4844(),
578                            "validator returned sidecar for non EIP-4844 transaction"
579                        );
580                        (transaction, Some(sidecar))
581                    }
582                };
583
584                let tx = ValidPoolTransaction {
585                    transaction,
586                    transaction_id,
587                    propagate,
588                    timestamp: Instant::now(),
589                    origin,
590                    authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
591                };
592
593                let added = match pool.add_transaction(tx, balance, state_nonce, bytecode_hash) {
594                    Ok(added) => added,
595                    Err(err) => return (Err(err), None),
596                };
597                let hash = *added.hash();
598                let state = added.transaction_state();
599
600                let meta = AddedTransactionMeta { added, blob_sidecar };
601
602                (Ok(AddedTransactionOutcome { hash, state }), Some(meta))
603            }
604            TransactionValidationOutcome::Invalid(tx, err) => {
605                self.with_event_listener(|listener| listener.invalid(tx.hash()));
606                (Err(PoolError::new(*tx.hash(), err)), None)
607            }
608            TransactionValidationOutcome::Error(tx_hash, err) => {
609                self.with_event_listener(|listener| listener.discarded(&tx_hash));
610                (Err(PoolError::other(tx_hash, err)), None)
611            }
612        }
613    }
614
615    /// Adds a transaction and returns the event stream.
616    pub fn add_transaction_and_subscribe(
617        &self,
618        origin: TransactionOrigin,
619        tx: TransactionValidationOutcome<T::Transaction>,
620    ) -> PoolResult<TransactionEvents> {
621        let listener = {
622            let mut listener = self.event_listener.write();
623            let events = listener.subscribe(tx.tx_hash());
624            self.mark_event_listener_installed();
625            events
626        };
627        let mut results = self.add_transactions(origin, std::iter::once(tx));
628        results.pop().expect("result length is the same as the input")?;
629        Ok(listener)
630    }
631
632    /// Adds all transactions in the iterator to the pool, returning a list of results.
633    ///
634    /// Convenience method that assigns the same origin to all transactions. Delegates to
635    /// [`Self::add_transactions_with_origins`].
636    pub fn add_transactions(
637        &self,
638        origin: TransactionOrigin,
639        transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
640    ) -> Vec<PoolResult<AddedTransactionOutcome>> {
641        self.add_transactions_with_origins(transactions.into_iter().map(|tx| (origin, tx)))
642    }
643
644    /// Adds all transactions in the iterator to the pool, each with its own
645    /// [`TransactionOrigin`], returning a list of results.
646    pub fn add_transactions_with_origins(
647        &self,
648        transactions: impl IntoIterator<
649            Item = (TransactionOrigin, TransactionValidationOutcome<T::Transaction>),
650        >,
651    ) -> Vec<PoolResult<AddedTransactionOutcome>> {
652        // Collect results and metadata while holding the pool write lock
653        let (mut results, added_metas, discarded) = {
654            let mut pool = self.pool.write();
655            let mut added_metas = Vec::new();
656
657            let results = transactions
658                .into_iter()
659                .map(|(origin, tx)| {
660                    let (result, meta) = self.add_transaction(&mut pool, origin, tx);
661
662                    // Only collect metadata for successful insertions
663                    if result.is_ok() &&
664                        let Some(meta) = meta
665                    {
666                        added_metas.push(meta);
667                    }
668
669                    result
670                })
671                .collect::<Vec<_>>();
672
673            // Enforce the pool size limits if at least one transaction was added successfully
674            let discarded = if results.iter().any(Result::is_ok) {
675                pool.discard_worst()
676            } else {
677                Default::default()
678            };
679
680            (results, added_metas, discarded)
681        };
682
683        for meta in added_metas {
684            self.on_added_transaction(meta);
685        }
686
687        if !discarded.is_empty() {
688            // Delete any blobs associated with discarded blob transactions
689            self.delete_discarded_blobs(discarded.iter());
690            self.with_event_listener(|listener| listener.discarded_many(&discarded));
691
692            let discarded_hashes =
693                discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
694
695            // A newly added transaction may be immediately discarded, so we need to
696            // adjust the result here
697            for res in &mut results {
698                if let Ok(AddedTransactionOutcome { hash, .. }) = res &&
699                    discarded_hashes.contains(hash)
700                {
701                    *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
702                }
703            }
704        };
705
706        results
707    }
708
709    /// Process a transaction that was added to the pool.
710    ///
711    /// Performs blob storage operations and sends all notifications. This should be called
712    /// after the pool write lock has been released to avoid blocking pool operations.
713    fn on_added_transaction(&self, meta: AddedTransactionMeta<T::Transaction>) {
714        // Handle blob sidecar storage and notifications for EIP-4844 transactions
715        if let Some(sidecar) = meta.blob_sidecar {
716            let hash = *meta.added.hash();
717            self.on_new_blob_sidecar(&hash, &sidecar);
718            self.insert_blob(hash, sidecar);
719        }
720
721        // Delete replaced blob sidecar if any
722        if let Some(replaced) = meta.added.replaced_blob_transaction() {
723            debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
724            self.delete_blob(replaced);
725        }
726
727        // Delete discarded blob sidecars if any, this doesnt do any IO.
728        if let Some(discarded) = meta.added.discarded_transactions() {
729            self.delete_discarded_blobs(discarded.iter());
730        }
731
732        // Notify pending transaction listeners
733        if let Some(pending) = meta.added.as_pending() {
734            self.on_new_pending_transaction(pending);
735        }
736
737        // Notify event listeners
738        self.notify_event_listeners(&meta.added);
739
740        // Notify new transaction listeners
741        self.on_new_transaction(meta.added.into_new_transaction_event());
742    }
743
744    /// Notify all listeners about a new pending transaction.
745    ///
746    /// See also [`Self::add_pending_listener`]
747    ///
748    /// CAUTION: This function is only intended to be used manually in order to use this type's
749    /// pending transaction receivers when manually implementing the
750    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
751    /// [`TransactionPool::pending_transactions_listener_for`](crate::TransactionPool).
752    pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
753        let mut needs_cleanup = false;
754
755        {
756            let listeners = self.pending_transaction_listener.read();
757            for listener in listeners.iter() {
758                if !listener.send_all(pending.pending_transactions(listener.kind)) {
759                    needs_cleanup = true;
760                }
761            }
762        }
763
764        // Clean up dead listeners if we detected any closed channels
765        if needs_cleanup {
766            self.pending_transaction_listener
767                .write()
768                .retain(|listener| !listener.sender.is_closed());
769        }
770    }
771
772    /// Notify all listeners about a newly inserted pending transaction.
773    ///
774    /// See also [`Self::add_new_transaction_listener`]
775    ///
776    /// CAUTION: This function is only intended to be used manually in order to use this type's
777    /// transaction receivers when manually implementing the
778    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
779    /// [`TransactionPool::new_transactions_listener_for`](crate::TransactionPool).
780    pub fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
781        let mut needs_cleanup = false;
782
783        {
784            let listeners = self.transaction_listener.read();
785            for listener in listeners.iter() {
786                if listener.kind.is_propagate_only() && !event.transaction.propagate {
787                    if listener.sender.is_closed() {
788                        needs_cleanup = true;
789                    }
790                    // Skip non-propagate transactions for propagate-only listeners
791                    continue
792                }
793
794                if !listener.send(event.clone()) {
795                    needs_cleanup = true;
796                }
797            }
798        }
799
800        // Clean up dead listeners if we detected any closed channels
801        if needs_cleanup {
802            self.transaction_listener.write().retain(|listener| !listener.sender.is_closed());
803        }
804    }
805
806    /// Notify all listeners about a blob sidecar for a newly inserted blob (eip4844) transaction.
807    fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecarVariant) {
808        let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
809        if sidecar_listeners.is_empty() {
810            return
811        }
812        let sidecar = Arc::new(sidecar.clone());
813        sidecar_listeners.retain_mut(|listener| {
814            let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
815            match listener.sender.try_send(new_blob_event) {
816                Ok(()) => true,
817                Err(err) => {
818                    if matches!(err, mpsc::error::TrySendError::Full(_)) {
819                        debug!(
820                            target: "txpool",
821                            "[{:?}] failed to send blob sidecar; channel full",
822                            sidecar,
823                        );
824                        true
825                    } else {
826                        false
827                    }
828                }
829            }
830        })
831    }
832
833    /// Notifies transaction listeners about changes once a block was processed.
834    fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
835        trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
836
837        // notify about promoted pending transactions - emit hashes
838        let mut needs_pending_cleanup = false;
839        {
840            let listeners = self.pending_transaction_listener.read();
841            for listener in listeners.iter() {
842                if !listener.send_all(outcome.pending_transactions(listener.kind)) {
843                    needs_pending_cleanup = true;
844                }
845            }
846        }
847        if needs_pending_cleanup {
848            self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
849        }
850
851        // emit full transactions
852        let mut needs_tx_cleanup = false;
853        {
854            let listeners = self.transaction_listener.read();
855            for listener in listeners.iter() {
856                if !listener.send_all(outcome.full_pending_transactions(listener.kind)) {
857                    needs_tx_cleanup = true;
858                }
859            }
860        }
861        if needs_tx_cleanup {
862            self.transaction_listener.write().retain(|l| !l.sender.is_closed());
863        }
864
865        let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
866
867        // broadcast specific transaction events
868        self.with_event_listener(|listener| {
869            for tx in &mined {
870                listener.mined(tx, block_hash);
871            }
872            for tx in &promoted {
873                listener.pending(tx.hash(), None);
874            }
875            for tx in &discarded {
876                listener.discarded(tx.hash());
877            }
878        })
879    }
880
881    /// Notifies all listeners about the transaction movements.
882    ///
883    /// This will emit events according to the provided changes.
884    ///
885    /// CAUTION: This function is only intended to be used manually in order to use this type's
886    /// [`TransactionEvents`] receivers when manually implementing the
887    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
888    /// [`TransactionPool::transaction_event_listener`](crate::TransactionPool).
889    #[allow(clippy::type_complexity)]
890    pub fn notify_on_transaction_updates(
891        &self,
892        promoted: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
893        discarded: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
894    ) {
895        // Notify about promoted pending transactions (similar to notify_on_new_state)
896        if !promoted.is_empty() {
897            let mut needs_pending_cleanup = false;
898            {
899                let listeners = self.pending_transaction_listener.read();
900                for listener in listeners.iter() {
901                    let promoted_hashes = promoted.iter().filter_map(|tx| {
902                        if listener.kind.is_propagate_only() && !tx.propagate {
903                            None
904                        } else {
905                            Some(*tx.hash())
906                        }
907                    });
908                    if !listener.send_all(promoted_hashes) {
909                        needs_pending_cleanup = true;
910                    }
911                }
912            }
913            if needs_pending_cleanup {
914                self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
915            }
916
917            // in this case we should also emit promoted transactions in full
918            let mut needs_tx_cleanup = false;
919            {
920                let listeners = self.transaction_listener.read();
921                for listener in listeners.iter() {
922                    let promoted_txs = promoted.iter().filter_map(|tx| {
923                        if listener.kind.is_propagate_only() && !tx.propagate {
924                            None
925                        } else {
926                            Some(NewTransactionEvent::pending(tx.clone()))
927                        }
928                    });
929                    if !listener.send_all(promoted_txs) {
930                        needs_tx_cleanup = true;
931                    }
932                }
933            }
934            if needs_tx_cleanup {
935                self.transaction_listener.write().retain(|l| !l.sender.is_closed());
936            }
937        }
938
939        self.with_event_listener(|listener| {
940            for tx in &promoted {
941                listener.pending(tx.hash(), None);
942            }
943            for tx in &discarded {
944                listener.discarded(tx.hash());
945            }
946        });
947
948        if !discarded.is_empty() {
949            // This deletes outdated blob txs from the blob store, based on the account's nonce.
950            // This is called during txpool maintenance when the pool drifted.
951            self.delete_discarded_blobs(discarded.iter());
952        }
953    }
954
955    /// Fire events for the newly added transaction if there are any.
956    ///
957    /// See also [`Self::add_transaction_event_listener`].
958    ///
959    /// CAUTION: This function is only intended to be used manually in order to use this type's
960    /// [`TransactionEvents`] receivers when manually implementing the
961    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
962    /// [`TransactionPool::transaction_event_listener`](crate::TransactionPool).
963    pub fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
964        self.with_event_listener(|listener| match tx {
965            AddedTransaction::Pending(tx) => {
966                let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
967
968                listener.pending(transaction.hash(), replaced.clone());
969                for tx in promoted {
970                    listener.pending(tx.hash(), None);
971                }
972                for tx in discarded {
973                    listener.discarded(tx.hash());
974                }
975            }
976            AddedTransaction::Parked { transaction, replaced, queued_reason, .. } => {
977                listener.queued(transaction.hash(), queued_reason.clone());
978                if let Some(replaced) = replaced {
979                    listener.replaced(replaced.clone(), *transaction.hash());
980                }
981            }
982        });
983    }
984
985    /// Returns an iterator that yields transactions that are ready to be included in the block.
986    pub fn best_transactions(&self) -> BestTransactions<T> {
987        self.get_pool_data().best_transactions()
988    }
989
990    /// Returns an iterator that yields transactions that are ready to be included in the block with
991    /// the given base fee and optional blob fee attributes.
992    pub fn best_transactions_with_attributes(
993        &self,
994        best_transactions_attributes: BestTransactionsAttributes,
995    ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
996    {
997        self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
998    }
999
1000    /// Returns only the first `max` transactions in the pending pool.
1001    pub fn pending_transactions_max(
1002        &self,
1003        max: usize,
1004    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1005        self.get_pool_data().pending_transactions_iter().take(max).collect()
1006    }
1007
1008    /// Returns all transactions from the pending sub-pool
1009    pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1010        self.get_pool_data().pending_transactions()
1011    }
1012
1013    /// Returns all transactions from parked pools
1014    pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1015        self.get_pool_data().queued_transactions()
1016    }
1017
1018    /// Returns all transactions in the pool
1019    pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
1020        let pool = self.get_pool_data();
1021        AllPoolTransactions {
1022            pending: pool.pending_transactions(),
1023            queued: pool.queued_transactions(),
1024        }
1025    }
1026
1027    /// Returns _all_ transactions in the pool
1028    pub fn all_transaction_hashes(&self) -> Vec<TxHash> {
1029        self.get_pool_data().all().transactions_iter().map(|tx| *tx.hash()).collect()
1030    }
1031
1032    /// Removes and returns all matching transactions from the pool.
1033    ///
1034    /// This behaves as if the transactions got discarded (_not_ mined), effectively introducing a
1035    /// nonce gap for the given transactions.
1036    pub fn remove_transactions(
1037        &self,
1038        hashes: Vec<TxHash>,
1039    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1040        if hashes.is_empty() {
1041            return Vec::new()
1042        }
1043        let removed = self.pool.write().remove_transactions(hashes);
1044
1045        self.with_event_listener(|listener| listener.discarded_many(&removed));
1046
1047        removed
1048    }
1049
1050    /// Removes and returns all matching transactions and their dependent transactions from the
1051    /// pool.
1052    pub fn remove_transactions_and_descendants(
1053        &self,
1054        hashes: Vec<TxHash>,
1055    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1056        if hashes.is_empty() {
1057            return Vec::new()
1058        }
1059        let removed = self.pool.write().remove_transactions_and_descendants(hashes);
1060
1061        self.with_event_listener(|listener| {
1062            for tx in &removed {
1063                listener.discarded(tx.hash());
1064            }
1065        });
1066
1067        removed
1068    }
1069
1070    /// Removes and returns all transactions by the specified sender from the pool.
1071    pub fn remove_transactions_by_sender(
1072        &self,
1073        sender: Address,
1074    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1075        let sender_id = self.get_sender_id(sender);
1076        let removed = self.pool.write().remove_transactions_by_sender(sender_id);
1077
1078        self.with_event_listener(|listener| listener.discarded_many(&removed));
1079
1080        removed
1081    }
1082
1083    /// Prunes and returns all matching transactions from the pool.
1084    ///
1085    /// This removes the transactions as if they were mined: descendant transactions are **not**
1086    /// parked and remain eligible for inclusion.
1087    pub fn prune_transactions(
1088        &self,
1089        hashes: Vec<TxHash>,
1090    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1091        if hashes.is_empty() {
1092            return Vec::new()
1093        }
1094
1095        self.pool.write().prune_transactions(hashes)
1096    }
1097
1098    /// Removes and returns all transactions that are present in the pool.
1099    pub fn retain_unknown<A>(&self, announcement: &mut A)
1100    where
1101        A: HandleMempoolData,
1102    {
1103        if announcement.is_empty() {
1104            return
1105        }
1106        let pool = self.get_pool_data();
1107        announcement.retain_by_hash(|tx| !pool.contains(tx))
1108    }
1109
1110    /// Returns the transaction by hash.
1111    pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1112        self.get_pool_data().get(tx_hash)
1113    }
1114
1115    /// Returns all transactions of the address
1116    pub fn get_transactions_by_sender(
1117        &self,
1118        sender: Address,
1119    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1120        let sender_id = self.get_sender_id(sender);
1121        self.get_pool_data().get_transactions_by_sender(sender_id)
1122    }
1123
1124    /// Returns a pending transaction sent by the given sender with the given nonce.
1125    pub fn get_pending_transaction_by_sender_and_nonce(
1126        &self,
1127        sender: Address,
1128        nonce: u64,
1129    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1130        let sender_id = self.get_sender_id(sender);
1131        self.get_pool_data().get_pending_transaction_by_sender_and_nonce(sender_id, nonce)
1132    }
1133
1134    /// Returns all queued transactions of the address by sender
1135    pub fn get_queued_transactions_by_sender(
1136        &self,
1137        sender: Address,
1138    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1139        let sender_id = self.get_sender_id(sender);
1140        self.get_pool_data().queued_txs_by_sender(sender_id)
1141    }
1142
1143    /// Returns all pending transactions filtered by predicate
1144    pub fn pending_transactions_with_predicate(
1145        &self,
1146        predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
1147    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1148        self.get_pool_data().pending_transactions_with_predicate(predicate)
1149    }
1150
1151    /// Returns all pending transactions of the address by sender
1152    pub fn get_pending_transactions_by_sender(
1153        &self,
1154        sender: Address,
1155    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1156        let sender_id = self.get_sender_id(sender);
1157        self.get_pool_data().pending_txs_by_sender(sender_id)
1158    }
1159
1160    /// Returns the highest transaction of the address
1161    pub fn get_highest_transaction_by_sender(
1162        &self,
1163        sender: Address,
1164    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1165        let sender_id = self.get_sender_id(sender);
1166        self.get_pool_data().get_highest_transaction_by_sender(sender_id)
1167    }
1168
1169    /// Returns the transaction with the highest nonce that is executable given the on chain nonce.
1170    pub fn get_highest_consecutive_transaction_by_sender(
1171        &self,
1172        sender: Address,
1173        on_chain_nonce: u64,
1174    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1175        let sender_id = self.get_sender_id(sender);
1176        self.get_pool_data().get_highest_consecutive_transaction_by_sender(
1177            sender_id.into_transaction_id(on_chain_nonce),
1178        )
1179    }
1180
1181    /// Returns the transaction given a [`TransactionId`]
1182    pub fn get_transaction_by_transaction_id(
1183        &self,
1184        transaction_id: &TransactionId,
1185    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1186        self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
1187    }
1188
1189    /// Returns all transactions that where submitted with the given [`TransactionOrigin`]
1190    pub fn get_transactions_by_origin(
1191        &self,
1192        origin: TransactionOrigin,
1193    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1194        self.get_pool_data()
1195            .all()
1196            .transactions_iter()
1197            .filter(|tx| tx.origin == origin)
1198            .cloned()
1199            .collect()
1200    }
1201
1202    /// Returns all pending transactions filtered by [`TransactionOrigin`]
1203    pub fn get_pending_transactions_by_origin(
1204        &self,
1205        origin: TransactionOrigin,
1206    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1207        self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
1208    }
1209
1210    /// Returns all the transactions belonging to the hashes.
1211    ///
1212    /// If no transaction exists, it is skipped.
1213    pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1214        if txs.is_empty() {
1215            return Vec::new()
1216        }
1217        self.get_pool_data().get_all(txs).collect()
1218    }
1219
1220    /// Returns all the transactions belonging to the hashes that are propagatable.
1221    ///
1222    /// If no transaction exists, it is skipped.
1223    fn get_all_propagatable(
1224        &self,
1225        txs: &[TxHash],
1226    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1227        if txs.is_empty() {
1228            return Vec::new()
1229        }
1230        let pool = self.get_pool_data();
1231        txs.iter().filter_map(|tx| pool.get(tx).filter(|tx| tx.propagate)).collect()
1232    }
1233
1234    /// Notify about propagated transactions.
1235    pub fn on_propagated(&self, txs: PropagatedTransactions) {
1236        if txs.0.is_empty() {
1237            return
1238        }
1239        self.with_event_listener(|listener| {
1240            txs.0.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers));
1241        });
1242    }
1243
1244    /// Number of transactions in the entire pool
1245    pub fn len(&self) -> usize {
1246        self.get_pool_data().len()
1247    }
1248
1249    /// Whether the pool is empty
1250    pub fn is_empty(&self) -> bool {
1251        self.get_pool_data().is_empty()
1252    }
1253
1254    /// Returns whether or not the pool is over its configured size and transaction count limits.
1255    pub fn is_exceeded(&self) -> bool {
1256        self.pool.read().is_exceeded()
1257    }
1258
1259    /// Inserts a blob transaction into the blob store
1260    fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecarVariant) {
1261        debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
1262        if let Err(err) = self.blob_store.insert(hash, blob) {
1263            warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
1264            self.blob_store_metrics.blobstore_failed_inserts.increment(1);
1265        }
1266        self.update_blob_store_metrics();
1267    }
1268
1269    /// Delete a blob from the blob store
1270    pub fn delete_blob(&self, blob: TxHash) {
1271        let _ = self.blob_store.delete(blob);
1272    }
1273
1274    /// Delete all blobs from the blob store
1275    pub fn delete_blobs(&self, txs: Vec<TxHash>) {
1276        let _ = self.blob_store.delete_all(txs);
1277    }
1278
1279    /// Cleans up the blob store
1280    pub fn cleanup_blobs(&self) {
1281        let stat = self.blob_store.cleanup();
1282        self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
1283        self.update_blob_store_metrics();
1284    }
1285
1286    fn update_blob_store_metrics(&self) {
1287        if let Some(data_size) = self.blob_store.data_size_hint() {
1288            self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
1289        }
1290        self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
1291    }
1292
1293    /// Deletes all blob transactions that were discarded.
1294    fn delete_discarded_blobs<'a>(
1295        &'a self,
1296        transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
1297    ) {
1298        let blob_txs = transactions
1299            .into_iter()
1300            .filter(|tx| tx.transaction.is_eip4844())
1301            .map(|tx| *tx.hash())
1302            .collect();
1303        self.delete_blobs(blob_txs);
1304    }
1305}
1306
1307impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
1308    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1309        f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
1310    }
1311}
1312
1313/// Metadata for a transaction that was added to the pool.
1314///
1315/// This holds all the data needed to complete post-insertion operations (notifications,
1316/// blob storage).
1317#[derive(Debug)]
1318struct AddedTransactionMeta<T: PoolTransaction> {
1319    /// The transaction that was added to the pool
1320    added: AddedTransaction<T>,
1321    /// Optional blob sidecar for EIP-4844 transactions
1322    blob_sidecar: Option<BlobTransactionSidecarVariant>,
1323}
1324
1325/// Tracks an added transaction and all graph changes caused by adding it.
1326#[derive(Debug, Clone)]
1327pub struct AddedPendingTransaction<T: PoolTransaction> {
1328    /// Inserted transaction.
1329    pub transaction: Arc<ValidPoolTransaction<T>>,
1330    /// Replaced transaction.
1331    pub replaced: Option<Arc<ValidPoolTransaction<T>>>,
1332    /// transactions promoted to the pending queue
1333    pub promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1334    /// transactions that failed and became discarded
1335    pub discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1336}
1337
1338impl<T: PoolTransaction> AddedPendingTransaction<T> {
1339    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1340    /// [`TransactionListenerKind`].
1341    ///
1342    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1343    /// are allowed to be propagated are returned.
1344    pub(crate) fn pending_transactions(
1345        &self,
1346        kind: TransactionListenerKind,
1347    ) -> impl Iterator<Item = B256> + '_ {
1348        let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1349        PendingTransactionIter { kind, iter }
1350    }
1351}
1352
1353pub(crate) struct PendingTransactionIter<Iter> {
1354    kind: TransactionListenerKind,
1355    iter: Iter,
1356}
1357
1358impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1359where
1360    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1361    T: PoolTransaction + 'a,
1362{
1363    type Item = B256;
1364
1365    fn next(&mut self) -> Option<Self::Item> {
1366        loop {
1367            let next = self.iter.next()?;
1368            if self.kind.is_propagate_only() && !next.propagate {
1369                continue
1370            }
1371            return Some(*next.hash())
1372        }
1373    }
1374}
1375
1376/// An iterator over full pending transactions
1377pub(crate) struct FullPendingTransactionIter<Iter> {
1378    kind: TransactionListenerKind,
1379    iter: Iter,
1380}
1381
1382impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1383where
1384    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1385    T: PoolTransaction + 'a,
1386{
1387    type Item = NewTransactionEvent<T>;
1388
1389    fn next(&mut self) -> Option<Self::Item> {
1390        loop {
1391            let next = self.iter.next()?;
1392            if self.kind.is_propagate_only() && !next.propagate {
1393                continue
1394            }
1395            return Some(NewTransactionEvent {
1396                subpool: SubPool::Pending,
1397                transaction: next.clone(),
1398            })
1399        }
1400    }
1401}
1402
1403/// Represents a transaction that was added into the pool and its state
1404#[derive(Debug, Clone)]
1405pub enum AddedTransaction<T: PoolTransaction> {
1406    /// Transaction was successfully added and moved to the pending pool.
1407    Pending(AddedPendingTransaction<T>),
1408    /// Transaction was successfully added but not yet ready for processing and moved to a
1409    /// parked pool instead.
1410    Parked {
1411        /// Inserted transaction.
1412        transaction: Arc<ValidPoolTransaction<T>>,
1413        /// Replaced transaction.
1414        replaced: Option<Arc<ValidPoolTransaction<T>>>,
1415        /// The subpool it was moved to.
1416        subpool: SubPool,
1417        /// The specific reason why the transaction is queued (if applicable).
1418        queued_reason: Option<QueuedReason>,
1419    },
1420}
1421
1422impl<T: PoolTransaction> AddedTransaction<T> {
1423    /// Returns whether the transaction has been added to the pending pool.
1424    pub const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1425        match self {
1426            Self::Pending(tx) => Some(tx),
1427            _ => None,
1428        }
1429    }
1430
1431    /// Returns the replaced transaction if there was one
1432    pub const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1433        match self {
1434            Self::Pending(tx) => tx.replaced.as_ref(),
1435            Self::Parked { replaced, .. } => replaced.as_ref(),
1436        }
1437    }
1438
1439    /// Returns the discarded transactions if there were any
1440    pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1441        match self {
1442            Self::Pending(tx) => Some(&tx.discarded),
1443            Self::Parked { .. } => None,
1444        }
1445    }
1446
1447    /// Returns the hash of the replaced transaction if it is a blob transaction.
1448    pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1449        self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1450    }
1451
1452    /// Returns the hash of the transaction
1453    pub fn hash(&self) -> &TxHash {
1454        match self {
1455            Self::Pending(tx) => tx.transaction.hash(),
1456            Self::Parked { transaction, .. } => transaction.hash(),
1457        }
1458    }
1459
1460    /// Converts this type into the event type for listeners
1461    pub fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1462        match self {
1463            Self::Pending(tx) => {
1464                NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1465            }
1466            Self::Parked { transaction, subpool, .. } => {
1467                NewTransactionEvent { transaction, subpool }
1468            }
1469        }
1470    }
1471
1472    /// Returns the subpool this transaction was added to
1473    pub(crate) const fn subpool(&self) -> SubPool {
1474        match self {
1475            Self::Pending(_) => SubPool::Pending,
1476            Self::Parked { subpool, .. } => *subpool,
1477        }
1478    }
1479
1480    /// Returns the [`TransactionId`] of the added transaction
1481    #[cfg(test)]
1482    pub(crate) fn id(&self) -> &TransactionId {
1483        match self {
1484            Self::Pending(added) => added.transaction.id(),
1485            Self::Parked { transaction, .. } => transaction.id(),
1486        }
1487    }
1488
1489    /// Returns the queued reason if the transaction is parked with a queued reason.
1490    pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1491        match self {
1492            Self::Pending(_) => None,
1493            Self::Parked { queued_reason, .. } => queued_reason.as_ref(),
1494        }
1495    }
1496
1497    /// Returns the transaction state based on the subpool and queued reason.
1498    pub fn transaction_state(&self) -> AddedTransactionState {
1499        match self.subpool() {
1500            SubPool::Pending => AddedTransactionState::Pending,
1501            _ => {
1502                // For non-pending transactions, use the queued reason directly from the
1503                // AddedTransaction
1504                if let Some(reason) = self.queued_reason() {
1505                    AddedTransactionState::Queued(reason.clone())
1506                } else {
1507                    // Fallback - this shouldn't happen with the new implementation
1508                    AddedTransactionState::Queued(QueuedReason::NonceGap)
1509                }
1510            }
1511        }
1512    }
1513}
1514
1515/// The specific reason why a transaction is queued (not ready for execution)
1516#[derive(Debug, Clone, PartialEq, Eq)]
1517pub enum QueuedReason {
1518    /// Transaction has a nonce gap - missing prior transactions
1519    NonceGap,
1520    /// Transaction has parked ancestors - waiting for other transactions to be mined
1521    ParkedAncestors,
1522    /// Sender has insufficient balance to cover the transaction cost
1523    InsufficientBalance,
1524    /// Transaction exceeds the block gas limit
1525    TooMuchGas,
1526    /// Transaction doesn't meet the base fee requirement
1527    InsufficientBaseFee,
1528    /// Transaction doesn't meet the blob fee requirement (EIP-4844)
1529    InsufficientBlobFee,
1530}
1531
1532/// The state of a transaction when is was added to the pool
1533#[derive(Debug, Clone, PartialEq, Eq)]
1534pub enum AddedTransactionState {
1535    /// Ready for execution
1536    Pending,
1537    /// Not ready for execution due to a specific condition
1538    Queued(QueuedReason),
1539}
1540
1541impl AddedTransactionState {
1542    /// Returns whether the transaction was submitted as queued.
1543    pub const fn is_queued(&self) -> bool {
1544        matches!(self, Self::Queued(_))
1545    }
1546
1547    /// Returns whether the transaction was submitted as pending.
1548    pub const fn is_pending(&self) -> bool {
1549        matches!(self, Self::Pending)
1550    }
1551
1552    /// Returns the specific queued reason if the transaction is queued.
1553    pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1554        match self {
1555            Self::Queued(reason) => Some(reason),
1556            Self::Pending => None,
1557        }
1558    }
1559}
1560
1561/// The outcome of a successful transaction addition
1562#[derive(Debug, Clone, PartialEq, Eq)]
1563pub struct AddedTransactionOutcome {
1564    /// The hash of the transaction
1565    pub hash: TxHash,
1566    /// The state of the transaction
1567    pub state: AddedTransactionState,
1568}
1569
1570impl AddedTransactionOutcome {
1571    /// Returns whether the transaction was submitted as queued.
1572    pub const fn is_queued(&self) -> bool {
1573        self.state.is_queued()
1574    }
1575
1576    /// Returns whether the transaction was submitted as pending.
1577    pub const fn is_pending(&self) -> bool {
1578        self.state.is_pending()
1579    }
1580}
1581
1582/// Contains all state changes after a [`CanonicalStateUpdate`] was processed
1583#[derive(Debug)]
1584pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1585    /// Hash of the block.
1586    pub(crate) block_hash: B256,
1587    /// All mined transactions.
1588    pub(crate) mined: Vec<TxHash>,
1589    /// Transactions promoted to the pending pool.
1590    pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1591    /// transaction that were discarded during the update
1592    pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1593}
1594
1595impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1596    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1597    /// [`TransactionListenerKind`].
1598    ///
1599    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1600    /// are allowed to be propagated are returned.
1601    pub(crate) fn pending_transactions(
1602        &self,
1603        kind: TransactionListenerKind,
1604    ) -> impl Iterator<Item = B256> + '_ {
1605        let iter = self.promoted.iter();
1606        PendingTransactionIter { kind, iter }
1607    }
1608
1609    /// Returns all FULL transactions that were promoted to the pending pool and adhere to the given
1610    /// [`TransactionListenerKind`].
1611    ///
1612    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1613    /// are allowed to be propagated are returned.
1614    pub(crate) fn full_pending_transactions(
1615        &self,
1616        kind: TransactionListenerKind,
1617    ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1618        let iter = self.promoted.iter();
1619        FullPendingTransactionIter { kind, iter }
1620    }
1621}
1622
1623#[cfg(test)]
1624mod tests {
1625    use crate::{
1626        blobstore::{BlobStore, InMemoryBlobStore},
1627        identifier::SenderId,
1628        test_utils::{MockTransaction, TestPoolBuilder},
1629        validate::ValidTransaction,
1630        BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1631    };
1632    use alloy_eips::{eip4844::BlobTransactionSidecar, eip7594::BlobTransactionSidecarVariant};
1633    use alloy_primitives::Address;
1634    use std::{fs, path::PathBuf};
1635
1636    #[test]
1637    fn test_discard_blobs_on_blob_tx_eviction() {
1638        let blobs = {
1639            // Read the contents of the JSON file into a string.
1640            let json_content = fs::read_to_string(
1641                PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1642            )
1643            .expect("Failed to read the blob data file");
1644
1645            // Parse the JSON contents into a serde_json::Value.
1646            let json_value: serde_json::Value =
1647                serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1648
1649            // Extract blob data from JSON and convert it to Blob.
1650            vec![
1651                // Extract the "data" field from the JSON and parse it as a string.
1652                json_value
1653                    .get("data")
1654                    .unwrap()
1655                    .as_str()
1656                    .expect("Data is not a valid string")
1657                    .to_string(),
1658            ]
1659        };
1660
1661        // Generate a BlobTransactionSidecar from the blobs.
1662        let sidecar = BlobTransactionSidecarVariant::Eip4844(
1663            BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap(),
1664        );
1665
1666        // Define the maximum limit for blobs in the sub-pool.
1667        let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1668
1669        // Create a test pool with default configuration and the specified blob limit.
1670        let test_pool = &TestPoolBuilder::default()
1671            .with_config(PoolConfig { blob_limit, ..Default::default() })
1672            .pool;
1673
1674        // Set the block info for the pool, including a pending blob fee.
1675        test_pool
1676            .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1677
1678        // Create an in-memory blob store.
1679        let blob_store = InMemoryBlobStore::default();
1680
1681        // Loop to add transactions to the pool and test blob eviction.
1682        for n in 0..blob_limit.max_txs + 10 {
1683            // Create a mock transaction with the generated blob sidecar.
1684            let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1685
1686            // Set non zero size
1687            tx.set_size(1844674407370951);
1688
1689            // Insert the sidecar into the blob store if the current index is within the blob limit.
1690            if n < blob_limit.max_txs {
1691                blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1692            }
1693
1694            // Add the transaction to the pool with external origin and valid outcome.
1695            test_pool.add_transactions(
1696                TransactionOrigin::External,
1697                [TransactionValidationOutcome::Valid {
1698                    balance: U256::from(1_000),
1699                    state_nonce: 0,
1700                    bytecode_hash: None,
1701                    transaction: ValidTransaction::ValidWithSidecar {
1702                        transaction: tx,
1703                        sidecar: sidecar.clone(),
1704                    },
1705                    propagate: true,
1706                    authorities: None,
1707                }],
1708            );
1709        }
1710
1711        // Assert that the size of the pool's blob component is equal to the maximum blob limit.
1712        assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1713
1714        // Assert that the size of the pool's blob_size component matches the expected value.
1715        assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1716
1717        // Assert that the pool's blob store matches the expected blob store.
1718        assert_eq!(*test_pool.blob_store(), blob_store);
1719    }
1720
1721    #[test]
1722    fn test_auths_stored_in_identifiers() {
1723        // Create a test pool with default configuration.
1724        let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1725
1726        let auth = Address::new([1; 20]);
1727        let tx = MockTransaction::eip7702();
1728
1729        test_pool.add_transactions(
1730            TransactionOrigin::Local,
1731            [TransactionValidationOutcome::Valid {
1732                balance: U256::from(1_000),
1733                state_nonce: 0,
1734                bytecode_hash: None,
1735                transaction: ValidTransaction::Valid(tx),
1736                propagate: true,
1737                authorities: Some(vec![auth]),
1738            }],
1739        );
1740
1741        let identifiers = test_pool.identifiers.read();
1742        assert_eq!(identifiers.sender_id(&auth), Some(SenderId::from(1)));
1743    }
1744}