Skip to main content

reth_transaction_pool/pool/
mod.rs

1//! Transaction Pool internals.
2//!
3//! Incoming transactions are validated before they enter the pool first. The validation outcome can
4//! have 3 states:
5//!
6//!  1. Transaction can _never_ be valid
7//!  2. Transaction is _currently_ valid
8//!  3. Transaction is _currently_ invalid, but could potentially become valid in the future
9//!
10//! However, (2.) and (3.) of a transaction can only be determined on the basis of the current
11//! state, whereas (1.) holds indefinitely. This means once the state changes (2.) and (3.) the
12//! state of a transaction needs to be reevaluated again.
13//!
14//! The transaction pool is responsible for storing new, valid transactions and providing the next
15//! best transactions sorted by their priority. Where priority is determined by the transaction's
16//! score ([`TransactionOrdering`]).
17//!
18//! Furthermore, the following characteristics fall under (3.):
19//!
20//!  a) Nonce of a transaction is higher than the expected nonce for the next transaction of its
21//! sender. A distinction is made here whether multiple transactions from the same sender have
22//! gapless nonce increments.
23//!
24//!  a)(1) If _no_ transaction is missing in a chain of multiple
25//! transactions from the same sender (all nonce in row), all of them can in principle be executed
26//! on the current state one after the other.
27//!
28//!  a)(2) If there's a nonce gap, then all
29//! transactions after the missing transaction are blocked until the missing transaction arrives.
30//!
31//!  b) Transaction does not meet the dynamic fee cap requirement introduced by EIP-1559: The
32//! fee cap of the transaction needs to be no less than the base fee of block.
33//!
34//!
35//! In essence the transaction pool is made of three separate sub-pools:
36//!
37//!  - Pending Pool: Contains all transactions that are valid on the current state and satisfy (3.
38//!    a)(1): _No_ nonce gaps. A _pending_ transaction is considered _ready_ when it has the lowest
39//!    nonce of all transactions from the same sender. Once a _ready_ transaction with nonce `n` has
40//!    been executed, the next highest transaction from the same sender `n + 1` becomes ready.
41//!
42//!  - Queued Pool: Contains all transactions that are currently blocked by missing transactions:
43//!    (3. a)(2): _With_ nonce gaps or due to lack of funds.
44//!
45//!  - Basefee Pool: To account for the dynamic base fee requirement (3. b) which could render an
46//!    EIP-1559 and all subsequent transactions of the sender currently invalid.
47//!
48//! The classification of transactions is always dependent on the current state that is changed as
49//! soon as a new block is mined. Once a new block is mined, the account changeset must be applied
50//! to the transaction pool.
51//!
52//!
53//! Depending on the use case, consumers of the [`TransactionPool`](crate::traits::TransactionPool)
54//! are interested in (2.) and/or (3.).
55
56//! A generic [`TransactionPool`](crate::traits::TransactionPool) that only handles transactions.
57//!
58//! This Pool maintains two separate sub-pools for (2.) and (3.)
59//!
60//! ## Terminology
61//!
62//!  - _Pending_: pending transactions are transactions that fall under (2.). These transactions can
63//!    currently be executed and are stored in the pending sub-pool
64//!  - _Queued_: queued transactions are transactions that fall under category (3.). Those
65//!    transactions are _currently_ waiting for state changes that eventually move them into
66//!    category (2.) and become pending.
67
68use crate::{
69    blobstore::BlobStore,
70    error::{PoolError, PoolErrorKind, PoolResult},
71    identifier::{SenderId, SenderIdentifiers, TransactionId},
72    metrics::BlobStoreMetrics,
73    pool::{
74        listener::{
75            BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76            TransactionListener,
77        },
78        state::SubPool,
79        txpool::{SenderInfo, TxPool},
80        update::UpdateOutcome,
81    },
82    traits::{
83        AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84        NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85    },
86    validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87    CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88    TransactionValidator,
89};
90
91use alloy_primitives::{
92    map::{AddressSet, HashSet},
93    Address, TxHash, B256,
94};
95use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
96use reth_eth_wire_types::HandleMempoolData;
97use reth_execution_types::ChangedAccount;
98
99use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718};
100use reth_primitives_traits::Recovered;
101use rustc_hash::FxHashMap;
102use std::{
103    fmt,
104    sync::{
105        atomic::{AtomicBool, Ordering},
106        Arc,
107    },
108    time::Instant,
109};
110use tokio::sync::mpsc;
111use tracing::{debug, trace, warn};
112mod events;
113pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
114pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
115pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
116pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
117pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
118pub use pending::PendingPool;
119
120mod best;
121pub use best::BestTransactions;
122
123mod blob;
124pub mod listener;
125mod parked;
126pub mod pending;
127pub mod size;
128pub(crate) mod state;
129pub mod txpool;
130mod update;
131
132/// Bound on number of pending transactions from `reth_network::TransactionsManager` to buffer.
133pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
134/// Bound on number of new transactions from `reth_network::TransactionsManager` to buffer.
135pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
136
137const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
138
139/// Transaction pool internals.
140pub struct PoolInner<V, T, S>
141where
142    T: TransactionOrdering,
143{
144    /// Internal mapping of addresses to plain ints.
145    identifiers: RwLock<SenderIdentifiers>,
146    /// Transaction validator.
147    validator: V,
148    /// Storage for blob transactions
149    blob_store: S,
150    /// The internal pool that manages all transactions.
151    pool: RwLock<TxPool<T>>,
152    /// Pool settings.
153    config: PoolConfig,
154    /// Manages listeners for transaction state change events.
155    event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
156    /// Tracks whether any event listeners have ever been installed.
157    has_event_listeners: AtomicBool,
158    /// Listeners for new _full_ pending transactions.
159    pending_transaction_listener: RwLock<Vec<PendingTransactionHashListener>>,
160    /// Listeners for new transactions added to the pool.
161    transaction_listener: RwLock<Vec<TransactionListener<T::Transaction>>>,
162    /// Listener for new blob transaction sidecars added to the pool.
163    blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
164    /// Metrics for the blob store
165    blob_store_metrics: BlobStoreMetrics,
166}
167
168// === impl PoolInner ===
169
170impl<V, T, S> PoolInner<V, T, S>
171where
172    V: TransactionValidator,
173    T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
174    S: BlobStore,
175{
176    /// Create a new transaction pool instance.
177    pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
178        Self {
179            identifiers: Default::default(),
180            validator,
181            event_listener: Default::default(),
182            has_event_listeners: AtomicBool::new(false),
183            pool: RwLock::new(TxPool::new(ordering, config.clone())),
184            pending_transaction_listener: Default::default(),
185            transaction_listener: Default::default(),
186            blob_transaction_sidecar_listener: Default::default(),
187            config,
188            blob_store,
189            blob_store_metrics: Default::default(),
190        }
191    }
192
193    /// Returns the configured blob store.
194    pub const fn blob_store(&self) -> &S {
195        &self.blob_store
196    }
197
198    /// Returns stats about the size of the pool.
199    pub fn size(&self) -> PoolSize {
200        self.get_pool_data().size()
201    }
202
203    /// Returns the currently tracked block
204    pub fn block_info(&self) -> BlockInfo {
205        self.get_pool_data().block_info()
206    }
207    /// Sets the currently tracked block.
208    ///
209    /// This will also notify subscribers about any transactions that were promoted to the pending
210    /// pool due to fee changes.
211    pub fn set_block_info(&self, info: BlockInfo) {
212        let outcome = self.pool.write().set_block_info(info);
213
214        // Notify subscribers about promoted transactions due to fee changes
215        self.notify_on_transaction_updates(outcome.promoted, outcome.discarded);
216    }
217
218    /// Returns the internal [`SenderId`] for this address
219    pub fn get_sender_id(&self, addr: Address) -> SenderId {
220        self.identifiers.write().sender_id_or_create(addr)
221    }
222
223    /// Returns the internal [`SenderId`]s for the given addresses.
224    pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
225        self.identifiers.write().sender_ids_or_create(addrs)
226    }
227
228    /// Returns all senders in the pool
229    pub fn unique_senders(&self) -> AddressSet {
230        self.get_pool_data().unique_senders()
231    }
232
233    /// Converts the changed accounts to a map of sender ids to sender info (internal identifier
234    /// used for __tracked__ accounts)
235    fn changed_senders(
236        &self,
237        accs: impl Iterator<Item = ChangedAccount>,
238    ) -> FxHashMap<SenderId, SenderInfo> {
239        let identifiers = self.identifiers.read();
240        accs.into_iter()
241            .filter_map(|acc| {
242                let ChangedAccount { address, nonce, balance } = acc;
243                let sender_id = identifiers.sender_id(&address)?;
244                Some((sender_id, SenderInfo { state_nonce: nonce, balance }))
245            })
246            .collect()
247    }
248
249    /// Get the config the pool was configured with.
250    pub const fn config(&self) -> &PoolConfig {
251        &self.config
252    }
253
254    /// Get the validator reference.
255    pub const fn validator(&self) -> &V {
256        &self.validator
257    }
258
259    /// Adds a new transaction listener to the pool that gets notified about every new _pending_
260    /// transaction inserted into the pool
261    pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
262        let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
263        let listener = PendingTransactionHashListener { sender, kind };
264
265        let mut listeners = self.pending_transaction_listener.write();
266        // Clean up dead listeners before adding new one
267        listeners.retain(|l| !l.sender.is_closed());
268        listeners.push(listener);
269
270        rx
271    }
272
273    /// Adds a new transaction listener to the pool that gets notified about every new transaction.
274    pub fn add_new_transaction_listener(
275        &self,
276        kind: TransactionListenerKind,
277    ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
278        let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
279        let listener = TransactionListener { sender, kind };
280
281        let mut listeners = self.transaction_listener.write();
282        // Clean up dead listeners before adding new one
283        listeners.retain(|l| !l.sender.is_closed());
284        listeners.push(listener);
285
286        rx
287    }
288    /// Adds a new blob sidecar listener to the pool that gets notified about every new
289    /// eip4844 transaction's blob sidecar.
290    pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
291        let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
292        let listener = BlobTransactionSidecarListener { sender };
293        self.blob_transaction_sidecar_listener.lock().push(listener);
294        rx
295    }
296
297    /// If the pool contains the transaction, this adds a new listener that gets notified about
298    /// transaction events.
299    pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
300        if !self.get_pool_data().contains(&tx_hash) {
301            return None
302        }
303        let mut listener = self.event_listener.write();
304        let events = listener.subscribe(tx_hash);
305        self.mark_event_listener_installed();
306        Some(events)
307    }
308
309    /// Adds a listener for all transaction events.
310    pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
311        let mut listener = self.event_listener.write();
312        let events = listener.subscribe_all();
313        self.mark_event_listener_installed();
314        events
315    }
316
317    #[inline]
318    fn has_event_listeners(&self) -> bool {
319        self.has_event_listeners.load(Ordering::Relaxed)
320    }
321
322    #[inline]
323    fn mark_event_listener_installed(&self) {
324        self.has_event_listeners.store(true, Ordering::Relaxed);
325    }
326
327    #[inline]
328    fn update_event_listener_state(&self, listener: &PoolEventBroadcast<T::Transaction>) {
329        if listener.is_empty() {
330            self.has_event_listeners.store(false, Ordering::Relaxed);
331        }
332    }
333
334    #[inline]
335    fn with_event_listener<F>(&self, emit: F)
336    where
337        F: FnOnce(&mut PoolEventBroadcast<T::Transaction>),
338    {
339        if !self.has_event_listeners() {
340            return
341        }
342        let mut listener = self.event_listener.write();
343        if !listener.is_empty() {
344            emit(&mut listener);
345        }
346        self.update_event_listener_state(&listener);
347    }
348
349    /// Returns a read lock to the pool's data.
350    pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
351        self.pool.read()
352    }
353
354    /// Returns transactions in the pool that can be propagated
355    pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
356        let mut out = Vec::new();
357        self.append_pooled_transactions(&mut out);
358        out
359    }
360
361    /// Returns hashes of transactions in the pool that can be propagated.
362    pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
363        let mut out = Vec::new();
364        self.append_pooled_transactions_hashes(&mut out);
365        out
366    }
367
368    /// Returns only the first `max` transactions in the pool that can be propagated.
369    pub fn pooled_transactions_max(
370        &self,
371        max: usize,
372    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
373        let mut out = Vec::new();
374        self.append_pooled_transactions_max(max, &mut out);
375        out
376    }
377
378    /// Extends the given vector with all transactions in the pool that can be propagated.
379    pub fn append_pooled_transactions(
380        &self,
381        out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
382    ) {
383        out.extend(
384            self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned(),
385        );
386    }
387
388    /// Extends the given vector with pooled transactions for the given hashes that are allowed to
389    /// be propagated.
390    pub fn append_pooled_transaction_elements(
391        &self,
392        tx_hashes: &[TxHash],
393        limit: GetPooledTransactionLimit,
394        out: &mut Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>,
395    ) where
396        <V as TransactionValidator>::Transaction: EthPoolTransaction,
397    {
398        let transactions = self.get_all_propagatable(tx_hashes);
399        let mut size = 0;
400        for transaction in transactions {
401            let encoded_len = transaction.encoded_length();
402            let Some(pooled) = self.to_pooled_transaction(transaction) else {
403                continue;
404            };
405
406            size += encoded_len;
407            out.push(pooled.into_inner());
408
409            if limit.exceeds(size) {
410                break
411            }
412        }
413    }
414
415    /// Extends the given vector with the hashes of all transactions in the pool that can be
416    /// propagated.
417    pub fn append_pooled_transactions_hashes(&self, out: &mut Vec<TxHash>) {
418        out.extend(
419            self.get_pool_data()
420                .all()
421                .transactions_iter()
422                .filter(|tx| tx.propagate)
423                .map(|tx| *tx.hash()),
424        );
425    }
426
427    /// Extends the given vector with only the first `max` transactions in the pool that can be
428    /// propagated.
429    pub fn append_pooled_transactions_max(
430        &self,
431        max: usize,
432        out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
433    ) {
434        out.extend(
435            self.get_pool_data()
436                .all()
437                .transactions_iter()
438                .filter(|tx| tx.propagate)
439                .take(max)
440                .cloned(),
441        );
442    }
443
444    /// Returns only the first `max` hashes of transactions in the pool that can be propagated.
445    pub fn pooled_transactions_hashes_max(&self, max: usize) -> Vec<TxHash> {
446        if max == 0 {
447            return Vec::new();
448        }
449        self.get_pool_data()
450            .all()
451            .transactions_iter()
452            .filter(|tx| tx.propagate)
453            .take(max)
454            .map(|tx| *tx.hash())
455            .collect()
456    }
457
458    /// Converts the internally tracked transaction to the pooled format.
459    ///
460    /// If the transaction is an EIP-4844 transaction, the blob sidecar is fetched from the blob
461    /// store and attached to the transaction.
462    fn to_pooled_transaction(
463        &self,
464        transaction: Arc<ValidPoolTransaction<T::Transaction>>,
465    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
466    where
467        <V as TransactionValidator>::Transaction: EthPoolTransaction,
468    {
469        if transaction.is_eip4844() {
470            let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
471            transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
472        } else {
473            transaction
474                .transaction
475                .clone_into_pooled()
476                .inspect_err(|err| {
477                    debug!(
478                        target: "txpool", %err,
479                        "failed to convert transaction to pooled element; skipping",
480                    );
481                })
482                .ok()
483        }
484    }
485
486    /// Returns pooled transactions for the given transaction hashes that are allowed to be
487    /// propagated.
488    pub fn get_pooled_transaction_elements(
489        &self,
490        tx_hashes: Vec<TxHash>,
491        limit: GetPooledTransactionLimit,
492    ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
493    where
494        <V as TransactionValidator>::Transaction: EthPoolTransaction,
495    {
496        let mut elements = Vec::new();
497        self.append_pooled_transaction_elements(&tx_hashes, limit, &mut elements);
498        elements.shrink_to_fit();
499        elements
500    }
501
502    /// Returns converted pooled transaction for the given transaction hash.
503    pub fn get_pooled_transaction_element(
504        &self,
505        tx_hash: TxHash,
506    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
507    where
508        <V as TransactionValidator>::Transaction: EthPoolTransaction,
509    {
510        self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
511    }
512
513    /// Updates the entire pool after a new block was executed.
514    pub fn on_canonical_state_change(&self, update: CanonicalStateUpdate<'_, V::Block>) {
515        trace!(target: "txpool", ?update, "updating pool on canonical state change");
516
517        let block_info = update.block_info();
518        let CanonicalStateUpdate {
519            new_tip, changed_accounts, mined_transactions, update_kind, ..
520        } = update;
521        self.validator.on_new_head_block(new_tip);
522
523        let changed_senders = self.changed_senders(changed_accounts.into_iter());
524
525        // update the pool
526        let outcome = self.pool.write().on_canonical_state_change(
527            block_info,
528            mined_transactions,
529            changed_senders,
530            update_kind,
531        );
532
533        // This will discard outdated transactions based on the account's nonce
534        self.delete_discarded_blobs(outcome.discarded.iter());
535
536        // notify listeners about updates
537        self.notify_on_new_state(outcome);
538    }
539
540    /// Performs account updates on the pool.
541    ///
542    /// This will either promote or discard transactions based on the new account state.
543    ///
544    /// This should be invoked when the pool drifted and accounts are updated manually
545    pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
546        let changed_senders = self.changed_senders(accounts.into_iter());
547        let UpdateOutcome { promoted, discarded } =
548            self.pool.write().update_accounts(changed_senders);
549
550        self.notify_on_transaction_updates(promoted, discarded);
551    }
552
553    /// Add a single validated transaction into the pool.
554    ///
555    /// Returns the outcome and optionally metadata to be processed after the pool lock is
556    /// released.
557    ///
558    /// Note: this is only used internally by [`Self::add_transactions()`], all new transaction(s)
559    /// come in through that function, either as a batch or `std::iter::once`.
560    fn add_transaction(
561        &self,
562        pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
563        origin: TransactionOrigin,
564        tx: TransactionValidationOutcome<T::Transaction>,
565    ) -> (PoolResult<AddedTransactionOutcome>, Option<AddedTransactionMeta<T::Transaction>>) {
566        match tx {
567            TransactionValidationOutcome::Valid {
568                balance,
569                state_nonce,
570                transaction,
571                propagate,
572                bytecode_hash,
573                authorities,
574            } => {
575                let sender_id = self.get_sender_id(transaction.sender());
576                let transaction_id = TransactionId::new(sender_id, transaction.nonce());
577
578                // split the valid transaction and the blob sidecar if it has any
579                let (transaction, blob_sidecar) = match transaction {
580                    ValidTransaction::Valid(tx) => (tx, None),
581                    ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
582                        debug_assert!(
583                            transaction.is_eip4844(),
584                            "validator returned sidecar for non EIP-4844 transaction"
585                        );
586                        (transaction, Some(sidecar))
587                    }
588                };
589
590                let tx = ValidPoolTransaction {
591                    transaction,
592                    transaction_id,
593                    propagate,
594                    timestamp: Instant::now(),
595                    origin,
596                    authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
597                };
598
599                let added = match pool.add_transaction(tx, balance, state_nonce, bytecode_hash) {
600                    Ok(added) => added,
601                    Err(err) => return (Err(err), None),
602                };
603                let hash = *added.hash();
604                let state = added.transaction_state();
605
606                let meta = AddedTransactionMeta { added, blob_sidecar };
607
608                (Ok(AddedTransactionOutcome { hash, state }), Some(meta))
609            }
610            TransactionValidationOutcome::Invalid(tx, err) => {
611                self.with_event_listener(|listener| listener.invalid(tx.hash()));
612                (Err(PoolError::new(*tx.hash(), err)), None)
613            }
614            TransactionValidationOutcome::Error(tx_hash, err) => {
615                self.with_event_listener(|listener| listener.discarded(&tx_hash));
616                (Err(PoolError::other(tx_hash, err)), None)
617            }
618        }
619    }
620
621    /// Adds a transaction and returns the event stream.
622    pub fn add_transaction_and_subscribe(
623        &self,
624        origin: TransactionOrigin,
625        tx: TransactionValidationOutcome<T::Transaction>,
626    ) -> PoolResult<TransactionEvents> {
627        let listener = {
628            let mut listener = self.event_listener.write();
629            let events = listener.subscribe(tx.tx_hash());
630            self.mark_event_listener_installed();
631            events
632        };
633        let mut results = self.add_transactions(origin, std::iter::once(tx));
634        results.pop().expect("result length is the same as the input")?;
635        Ok(listener)
636    }
637
638    /// Adds all transactions in the iterator to the pool, returning a list of results.
639    ///
640    /// Convenience method that assigns the same origin to all transactions. Delegates to
641    /// [`Self::add_transactions_with_origins`].
642    pub fn add_transactions(
643        &self,
644        origin: TransactionOrigin,
645        transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
646    ) -> Vec<PoolResult<AddedTransactionOutcome>> {
647        self.add_transactions_with_origins(transactions.into_iter().map(|tx| (origin, tx)))
648    }
649
650    /// Adds all transactions in the iterator to the pool, each with its own
651    /// [`TransactionOrigin`], returning a list of results.
652    pub fn add_transactions_with_origins(
653        &self,
654        transactions: impl IntoIterator<
655            Item = (TransactionOrigin, TransactionValidationOutcome<T::Transaction>),
656        >,
657    ) -> Vec<PoolResult<AddedTransactionOutcome>> {
658        // Collect results and metadata while holding the pool write lock
659        let (mut results, added_metas, discarded) = {
660            let mut pool = self.pool.write();
661            let mut added_metas = Vec::new();
662
663            let results = transactions
664                .into_iter()
665                .map(|(origin, tx)| {
666                    let (result, meta) = self.add_transaction(&mut pool, origin, tx);
667
668                    // Only collect metadata for successful insertions
669                    if result.is_ok() &&
670                        let Some(meta) = meta
671                    {
672                        added_metas.push(meta);
673                    }
674
675                    result
676                })
677                .collect::<Vec<_>>();
678
679            // Enforce the pool size limits if at least one transaction was added successfully
680            let discarded = if results.iter().any(Result::is_ok) {
681                pool.discard_worst()
682            } else {
683                Default::default()
684            };
685
686            (results, added_metas, discarded)
687        };
688
689        for meta in added_metas {
690            self.on_added_transaction(meta);
691        }
692
693        if !discarded.is_empty() {
694            // Delete any blobs associated with discarded blob transactions
695            self.delete_discarded_blobs(discarded.iter());
696            self.with_event_listener(|listener| listener.discarded_many(&discarded));
697
698            let discarded_hashes =
699                discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
700
701            // A newly added transaction may be immediately discarded, so we need to
702            // adjust the result here
703            for res in &mut results {
704                if let Ok(AddedTransactionOutcome { hash, .. }) = res &&
705                    discarded_hashes.contains(hash)
706                {
707                    *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
708                }
709            }
710        };
711
712        results
713    }
714
715    /// Process a transaction that was added to the pool.
716    ///
717    /// Performs blob storage operations and sends all notifications. This should be called
718    /// after the pool write lock has been released to avoid blocking pool operations.
719    fn on_added_transaction(&self, meta: AddedTransactionMeta<T::Transaction>) {
720        // Handle blob sidecar storage and notifications for EIP-4844 transactions
721        if let Some(sidecar) = meta.blob_sidecar {
722            let hash = *meta.added.hash();
723            self.on_new_blob_sidecar(&hash, &sidecar);
724            self.insert_blob(hash, sidecar);
725        }
726
727        // Delete replaced blob sidecar if any
728        if let Some(replaced) = meta.added.replaced_blob_transaction() {
729            debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
730            self.delete_blob(replaced);
731        }
732
733        // Delete discarded blob sidecars if any, this doesnt do any IO.
734        if let Some(discarded) = meta.added.discarded_transactions() {
735            self.delete_discarded_blobs(discarded.iter());
736        }
737
738        // Notify pending transaction listeners
739        if let Some(pending) = meta.added.as_pending() {
740            self.on_new_pending_transaction(pending);
741        }
742
743        // Notify event listeners
744        self.notify_event_listeners(&meta.added);
745
746        // Notify new transaction listeners
747        self.on_new_transaction(meta.added.into_new_transaction_event());
748    }
749
750    /// Notify all listeners about a new pending transaction.
751    ///
752    /// See also [`Self::add_pending_listener`]
753    ///
754    /// CAUTION: This function is only intended to be used manually in order to use this type's
755    /// pending transaction receivers when manually implementing the
756    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
757    /// [`TransactionPool::pending_transactions_listener_for`](crate::TransactionPool).
758    pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
759        let mut needs_cleanup = false;
760
761        {
762            let listeners = self.pending_transaction_listener.read();
763            for listener in listeners.iter() {
764                if !listener.send_all(pending.pending_transactions(listener.kind)) {
765                    needs_cleanup = true;
766                }
767            }
768        }
769
770        // Clean up dead listeners if we detected any closed channels
771        if needs_cleanup {
772            self.pending_transaction_listener
773                .write()
774                .retain(|listener| !listener.sender.is_closed());
775        }
776    }
777
778    /// Notify all listeners about a newly inserted pending transaction.
779    ///
780    /// See also [`Self::add_new_transaction_listener`]
781    ///
782    /// CAUTION: This function is only intended to be used manually in order to use this type's
783    /// transaction receivers when manually implementing the
784    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
785    /// [`TransactionPool::new_transactions_listener_for`](crate::TransactionPool).
786    pub fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
787        let mut needs_cleanup = false;
788
789        {
790            let listeners = self.transaction_listener.read();
791            for listener in listeners.iter() {
792                if listener.kind.is_propagate_only() && !event.transaction.propagate {
793                    if listener.sender.is_closed() {
794                        needs_cleanup = true;
795                    }
796                    // Skip non-propagate transactions for propagate-only listeners
797                    continue
798                }
799
800                if !listener.send(event.clone()) {
801                    needs_cleanup = true;
802                }
803            }
804        }
805
806        // Clean up dead listeners if we detected any closed channels
807        if needs_cleanup {
808            self.transaction_listener.write().retain(|listener| !listener.sender.is_closed());
809        }
810    }
811
812    /// Notify all listeners about a blob sidecar for a newly inserted blob (eip4844) transaction.
813    fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecarVariant) {
814        let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
815        if sidecar_listeners.is_empty() {
816            return
817        }
818        let sidecar = Arc::new(sidecar.clone());
819        sidecar_listeners.retain_mut(|listener| {
820            let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
821            match listener.sender.try_send(new_blob_event) {
822                Ok(()) => true,
823                Err(err) => {
824                    if matches!(err, mpsc::error::TrySendError::Full(_)) {
825                        debug!(
826                            target: "txpool",
827                            "[{:?}] failed to send blob sidecar; channel full",
828                            sidecar,
829                        );
830                        true
831                    } else {
832                        false
833                    }
834                }
835            }
836        })
837    }
838
839    /// Notifies transaction listeners about changes once a block was processed.
840    fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
841        trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
842
843        // notify about promoted pending transactions - emit hashes
844        let mut needs_pending_cleanup = false;
845        {
846            let listeners = self.pending_transaction_listener.read();
847            for listener in listeners.iter() {
848                if !listener.send_all(outcome.pending_transactions(listener.kind)) {
849                    needs_pending_cleanup = true;
850                }
851            }
852        }
853        if needs_pending_cleanup {
854            self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
855        }
856
857        // emit full transactions
858        let mut needs_tx_cleanup = false;
859        {
860            let listeners = self.transaction_listener.read();
861            for listener in listeners.iter() {
862                if !listener.send_all(outcome.full_pending_transactions(listener.kind)) {
863                    needs_tx_cleanup = true;
864                }
865            }
866        }
867        if needs_tx_cleanup {
868            self.transaction_listener.write().retain(|l| !l.sender.is_closed());
869        }
870
871        let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
872
873        // broadcast specific transaction events
874        self.with_event_listener(|listener| {
875            for tx in &mined {
876                listener.mined(tx, block_hash);
877            }
878            for tx in &promoted {
879                listener.pending(tx.hash(), None);
880            }
881            for tx in &discarded {
882                listener.discarded(tx.hash());
883            }
884        })
885    }
886
887    /// Notifies all listeners about the transaction movements.
888    ///
889    /// This will emit events according to the provided changes.
890    ///
891    /// CAUTION: This function is only intended to be used manually in order to use this type's
892    /// [`TransactionEvents`] receivers when manually implementing the
893    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
894    /// [`TransactionPool::transaction_event_listener`](crate::TransactionPool).
895    #[allow(clippy::type_complexity)]
896    pub fn notify_on_transaction_updates(
897        &self,
898        promoted: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
899        discarded: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
900    ) {
901        // Notify about promoted pending transactions (similar to notify_on_new_state)
902        if !promoted.is_empty() {
903            let mut needs_pending_cleanup = false;
904            {
905                let listeners = self.pending_transaction_listener.read();
906                for listener in listeners.iter() {
907                    let promoted_hashes = promoted.iter().filter_map(|tx| {
908                        if listener.kind.is_propagate_only() && !tx.propagate {
909                            None
910                        } else {
911                            Some(*tx.hash())
912                        }
913                    });
914                    if !listener.send_all(promoted_hashes) {
915                        needs_pending_cleanup = true;
916                    }
917                }
918            }
919            if needs_pending_cleanup {
920                self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
921            }
922
923            // in this case we should also emit promoted transactions in full
924            let mut needs_tx_cleanup = false;
925            {
926                let listeners = self.transaction_listener.read();
927                for listener in listeners.iter() {
928                    let promoted_txs = promoted.iter().filter_map(|tx| {
929                        if listener.kind.is_propagate_only() && !tx.propagate {
930                            None
931                        } else {
932                            Some(NewTransactionEvent::pending(tx.clone()))
933                        }
934                    });
935                    if !listener.send_all(promoted_txs) {
936                        needs_tx_cleanup = true;
937                    }
938                }
939            }
940            if needs_tx_cleanup {
941                self.transaction_listener.write().retain(|l| !l.sender.is_closed());
942            }
943        }
944
945        self.with_event_listener(|listener| {
946            for tx in &promoted {
947                listener.pending(tx.hash(), None);
948            }
949            for tx in &discarded {
950                listener.discarded(tx.hash());
951            }
952        });
953
954        if !discarded.is_empty() {
955            // This deletes outdated blob txs from the blob store, based on the account's nonce.
956            // This is called during txpool maintenance when the pool drifted.
957            self.delete_discarded_blobs(discarded.iter());
958        }
959    }
960
961    /// Fire events for the newly added transaction if there are any.
962    ///
963    /// See also [`Self::add_transaction_event_listener`].
964    ///
965    /// CAUTION: This function is only intended to be used manually in order to use this type's
966    /// [`TransactionEvents`] receivers when manually implementing the
967    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
968    /// [`TransactionPool::transaction_event_listener`](crate::TransactionPool).
969    pub fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
970        self.with_event_listener(|listener| match tx {
971            AddedTransaction::Pending(tx) => {
972                let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
973
974                listener.pending(transaction.hash(), replaced.clone());
975                for tx in promoted {
976                    listener.pending(tx.hash(), None);
977                }
978                for tx in discarded {
979                    listener.discarded(tx.hash());
980                }
981            }
982            AddedTransaction::Parked { transaction, replaced, queued_reason, .. } => {
983                listener.queued(transaction.hash(), queued_reason.clone());
984                if let Some(replaced) = replaced {
985                    listener.replaced(replaced.clone(), *transaction.hash());
986                }
987            }
988        });
989    }
990
991    /// Returns an iterator that yields transactions that are ready to be included in the block.
992    pub fn best_transactions(&self) -> BestTransactions<T> {
993        self.get_pool_data().best_transactions()
994    }
995
996    /// Returns an iterator that yields transactions that are ready to be included in the block with
997    /// the given base fee and optional blob fee attributes.
998    pub fn best_transactions_with_attributes(
999        &self,
1000        best_transactions_attributes: BestTransactionsAttributes,
1001    ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
1002    {
1003        self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
1004    }
1005
1006    /// Returns only the first `max` transactions in the pending pool.
1007    pub fn pending_transactions_max(
1008        &self,
1009        max: usize,
1010    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1011        self.get_pool_data().pending_transactions_iter().take(max).collect()
1012    }
1013
1014    /// Returns all transactions from the pending sub-pool
1015    pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1016        self.get_pool_data().pending_transactions()
1017    }
1018
1019    /// Returns all transactions from parked pools
1020    pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1021        self.get_pool_data().queued_transactions()
1022    }
1023
1024    /// Returns all transactions in the pool
1025    pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
1026        let pool = self.get_pool_data();
1027        AllPoolTransactions {
1028            pending: pool.pending_transactions(),
1029            queued: pool.queued_transactions(),
1030        }
1031    }
1032
1033    /// Returns _all_ transactions in the pool
1034    pub fn all_transaction_hashes(&self) -> Vec<TxHash> {
1035        self.get_pool_data().all().transactions_iter().map(|tx| *tx.hash()).collect()
1036    }
1037
1038    /// Removes and returns all matching transactions from the pool.
1039    ///
1040    /// This behaves as if the transactions got discarded (_not_ mined), effectively introducing a
1041    /// nonce gap for the given transactions.
1042    pub fn remove_transactions(
1043        &self,
1044        hashes: Vec<TxHash>,
1045    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1046        if hashes.is_empty() {
1047            return Vec::new()
1048        }
1049        let removed = self.pool.write().remove_transactions(hashes);
1050
1051        self.with_event_listener(|listener| listener.discarded_many(&removed));
1052
1053        removed
1054    }
1055
1056    /// Removes and returns all matching transactions and their dependent transactions from the
1057    /// pool.
1058    pub fn remove_transactions_and_descendants(
1059        &self,
1060        hashes: Vec<TxHash>,
1061    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1062        if hashes.is_empty() {
1063            return Vec::new()
1064        }
1065        let removed = self.pool.write().remove_transactions_and_descendants(hashes);
1066
1067        self.with_event_listener(|listener| {
1068            for tx in &removed {
1069                listener.discarded(tx.hash());
1070            }
1071        });
1072
1073        removed
1074    }
1075
1076    /// Removes and returns all transactions by the specified sender from the pool.
1077    pub fn remove_transactions_by_sender(
1078        &self,
1079        sender: Address,
1080    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1081        let sender_id = self.get_sender_id(sender);
1082        let removed = self.pool.write().remove_transactions_by_sender(sender_id);
1083
1084        self.with_event_listener(|listener| listener.discarded_many(&removed));
1085
1086        removed
1087    }
1088
1089    /// Prunes and returns all matching transactions from the pool.
1090    ///
1091    /// This removes the transactions as if they were mined: descendant transactions are **not**
1092    /// parked and remain eligible for inclusion.
1093    pub fn prune_transactions(
1094        &self,
1095        hashes: Vec<TxHash>,
1096    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1097        if hashes.is_empty() {
1098            return Vec::new()
1099        }
1100
1101        self.pool.write().prune_transactions(hashes)
1102    }
1103
1104    /// Removes and returns all transactions that are present in the pool.
1105    pub fn retain_unknown<A>(&self, announcement: &mut A)
1106    where
1107        A: HandleMempoolData,
1108    {
1109        if announcement.is_empty() {
1110            return
1111        }
1112        let pool = self.get_pool_data();
1113        announcement.retain_by_hash(|tx| !pool.contains(tx))
1114    }
1115
1116    /// Returns the transaction by hash.
1117    pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1118        self.get_pool_data().get(tx_hash)
1119    }
1120
1121    /// Returns all transactions of the address
1122    pub fn get_transactions_by_sender(
1123        &self,
1124        sender: Address,
1125    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1126        let sender_id = self.get_sender_id(sender);
1127        self.get_pool_data().get_transactions_by_sender(sender_id)
1128    }
1129
1130    /// Returns a pending transaction sent by the given sender with the given nonce.
1131    pub fn get_pending_transaction_by_sender_and_nonce(
1132        &self,
1133        sender: Address,
1134        nonce: u64,
1135    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1136        let sender_id = self.get_sender_id(sender);
1137        self.get_pool_data().get_pending_transaction_by_sender_and_nonce(sender_id, nonce)
1138    }
1139
1140    /// Returns all queued transactions of the address by sender
1141    pub fn get_queued_transactions_by_sender(
1142        &self,
1143        sender: Address,
1144    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1145        let sender_id = self.get_sender_id(sender);
1146        self.get_pool_data().queued_txs_by_sender(sender_id)
1147    }
1148
1149    /// Returns all pending transactions filtered by predicate
1150    pub fn pending_transactions_with_predicate(
1151        &self,
1152        predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
1153    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1154        self.get_pool_data().pending_transactions_with_predicate(predicate)
1155    }
1156
1157    /// Returns all pending transactions of the address by sender
1158    pub fn get_pending_transactions_by_sender(
1159        &self,
1160        sender: Address,
1161    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1162        let sender_id = self.get_sender_id(sender);
1163        self.get_pool_data().pending_txs_by_sender(sender_id)
1164    }
1165
1166    /// Returns the highest transaction of the address
1167    pub fn get_highest_transaction_by_sender(
1168        &self,
1169        sender: Address,
1170    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1171        let sender_id = self.get_sender_id(sender);
1172        self.get_pool_data().get_highest_transaction_by_sender(sender_id)
1173    }
1174
1175    /// Returns the transaction with the highest nonce that is executable given the on chain nonce.
1176    pub fn get_highest_consecutive_transaction_by_sender(
1177        &self,
1178        sender: Address,
1179        on_chain_nonce: u64,
1180    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1181        let sender_id = self.get_sender_id(sender);
1182        self.get_pool_data().get_highest_consecutive_transaction_by_sender(
1183            sender_id.into_transaction_id(on_chain_nonce),
1184        )
1185    }
1186
1187    /// Returns the transaction given a [`TransactionId`]
1188    pub fn get_transaction_by_transaction_id(
1189        &self,
1190        transaction_id: &TransactionId,
1191    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1192        self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
1193    }
1194
1195    /// Returns all transactions that where submitted with the given [`TransactionOrigin`]
1196    pub fn get_transactions_by_origin(
1197        &self,
1198        origin: TransactionOrigin,
1199    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1200        self.get_pool_data()
1201            .all()
1202            .transactions_iter()
1203            .filter(|tx| tx.origin == origin)
1204            .cloned()
1205            .collect()
1206    }
1207
1208    /// Returns all pending transactions filtered by [`TransactionOrigin`]
1209    pub fn get_pending_transactions_by_origin(
1210        &self,
1211        origin: TransactionOrigin,
1212    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1213        self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
1214    }
1215
1216    /// Returns all the transactions belonging to the hashes.
1217    ///
1218    /// If no transaction exists, it is skipped.
1219    pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1220        if txs.is_empty() {
1221            return Vec::new()
1222        }
1223        self.get_pool_data().get_all(txs).collect()
1224    }
1225
1226    /// Returns all the transactions belonging to the hashes that are propagatable.
1227    ///
1228    /// If no transaction exists, it is skipped.
1229    fn get_all_propagatable(
1230        &self,
1231        txs: &[TxHash],
1232    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1233        if txs.is_empty() {
1234            return Vec::new()
1235        }
1236        let pool = self.get_pool_data();
1237        txs.iter().filter_map(|tx| pool.get(tx).filter(|tx| tx.propagate)).collect()
1238    }
1239
1240    /// Notify about propagated transactions.
1241    pub fn on_propagated(&self, txs: PropagatedTransactions) {
1242        if txs.is_empty() {
1243            return
1244        }
1245        self.with_event_listener(|listener| {
1246            txs.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers));
1247        });
1248    }
1249
1250    /// Number of transactions in the entire pool
1251    pub fn len(&self) -> usize {
1252        self.get_pool_data().len()
1253    }
1254
1255    /// Whether the pool is empty
1256    pub fn is_empty(&self) -> bool {
1257        self.get_pool_data().is_empty()
1258    }
1259
1260    /// Returns whether or not the pool is over its configured size and transaction count limits.
1261    pub fn is_exceeded(&self) -> bool {
1262        self.pool.read().is_exceeded()
1263    }
1264
1265    /// Inserts a blob transaction into the blob store
1266    fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecarVariant) {
1267        debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
1268        if let Err(err) = self.blob_store.insert(hash, blob) {
1269            warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
1270            self.blob_store_metrics.blobstore_failed_inserts.increment(1);
1271        }
1272        self.update_blob_store_metrics();
1273    }
1274
1275    /// Delete a blob from the blob store
1276    pub fn delete_blob(&self, blob: TxHash) {
1277        let _ = self.blob_store.delete(blob);
1278    }
1279
1280    /// Delete all blobs from the blob store
1281    pub fn delete_blobs(&self, txs: Vec<TxHash>) {
1282        let _ = self.blob_store.delete_all(txs);
1283    }
1284
1285    /// Cleans up the blob store
1286    pub fn cleanup_blobs(&self) {
1287        let stat = self.blob_store.cleanup();
1288        self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
1289        self.update_blob_store_metrics();
1290    }
1291
1292    fn update_blob_store_metrics(&self) {
1293        if let Some(data_size) = self.blob_store.data_size_hint() {
1294            self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
1295        }
1296        self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
1297    }
1298
1299    /// Deletes all blob transactions that were discarded.
1300    fn delete_discarded_blobs<'a>(
1301        &'a self,
1302        transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
1303    ) {
1304        let blob_txs = transactions
1305            .into_iter()
1306            .filter(|tx| tx.transaction.is_eip4844())
1307            .map(|tx| *tx.hash())
1308            .collect();
1309        self.delete_blobs(blob_txs);
1310    }
1311}
1312
1313impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
1314    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1315        f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
1316    }
1317}
1318
1319/// Metadata for a transaction that was added to the pool.
1320///
1321/// This holds all the data needed to complete post-insertion operations (notifications,
1322/// blob storage).
1323#[derive(Debug)]
1324struct AddedTransactionMeta<T: PoolTransaction> {
1325    /// The transaction that was added to the pool
1326    added: AddedTransaction<T>,
1327    /// Optional blob sidecar for EIP-4844 transactions
1328    blob_sidecar: Option<BlobTransactionSidecarVariant>,
1329}
1330
1331/// Tracks an added transaction and all graph changes caused by adding it.
1332#[derive(Debug, Clone)]
1333pub struct AddedPendingTransaction<T: PoolTransaction> {
1334    /// Inserted transaction.
1335    pub transaction: Arc<ValidPoolTransaction<T>>,
1336    /// Replaced transaction.
1337    pub replaced: Option<Arc<ValidPoolTransaction<T>>>,
1338    /// transactions promoted to the pending queue
1339    pub promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1340    /// transactions that failed and became discarded
1341    pub discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1342}
1343
1344impl<T: PoolTransaction> AddedPendingTransaction<T> {
1345    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1346    /// [`TransactionListenerKind`].
1347    ///
1348    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1349    /// are allowed to be propagated are returned.
1350    pub(crate) fn pending_transactions(
1351        &self,
1352        kind: TransactionListenerKind,
1353    ) -> impl Iterator<Item = B256> + '_ {
1354        let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1355        PendingTransactionIter { kind, iter }
1356    }
1357}
1358
1359pub(crate) struct PendingTransactionIter<Iter> {
1360    kind: TransactionListenerKind,
1361    iter: Iter,
1362}
1363
1364impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1365where
1366    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1367    T: PoolTransaction + 'a,
1368{
1369    type Item = B256;
1370
1371    fn next(&mut self) -> Option<Self::Item> {
1372        loop {
1373            let next = self.iter.next()?;
1374            if self.kind.is_propagate_only() && !next.propagate {
1375                continue
1376            }
1377            return Some(*next.hash())
1378        }
1379    }
1380}
1381
1382/// An iterator over full pending transactions
1383pub(crate) struct FullPendingTransactionIter<Iter> {
1384    kind: TransactionListenerKind,
1385    iter: Iter,
1386}
1387
1388impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1389where
1390    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1391    T: PoolTransaction + 'a,
1392{
1393    type Item = NewTransactionEvent<T>;
1394
1395    fn next(&mut self) -> Option<Self::Item> {
1396        loop {
1397            let next = self.iter.next()?;
1398            if self.kind.is_propagate_only() && !next.propagate {
1399                continue
1400            }
1401            return Some(NewTransactionEvent {
1402                subpool: SubPool::Pending,
1403                transaction: next.clone(),
1404            })
1405        }
1406    }
1407}
1408
1409/// Represents a transaction that was added into the pool and its state
1410#[derive(Debug, Clone)]
1411pub enum AddedTransaction<T: PoolTransaction> {
1412    /// Transaction was successfully added and moved to the pending pool.
1413    Pending(AddedPendingTransaction<T>),
1414    /// Transaction was successfully added but not yet ready for processing and moved to a
1415    /// parked pool instead.
1416    Parked {
1417        /// Inserted transaction.
1418        transaction: Arc<ValidPoolTransaction<T>>,
1419        /// Replaced transaction.
1420        replaced: Option<Arc<ValidPoolTransaction<T>>>,
1421        /// The subpool it was moved to.
1422        subpool: SubPool,
1423        /// The specific reason why the transaction is queued (if applicable).
1424        queued_reason: Option<QueuedReason>,
1425    },
1426}
1427
1428impl<T: PoolTransaction> AddedTransaction<T> {
1429    /// Returns whether the transaction has been added to the pending pool.
1430    pub const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1431        match self {
1432            Self::Pending(tx) => Some(tx),
1433            _ => None,
1434        }
1435    }
1436
1437    /// Returns the replaced transaction if there was one
1438    pub const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1439        match self {
1440            Self::Pending(tx) => tx.replaced.as_ref(),
1441            Self::Parked { replaced, .. } => replaced.as_ref(),
1442        }
1443    }
1444
1445    /// Returns the discarded transactions if there were any
1446    pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1447        match self {
1448            Self::Pending(tx) => Some(&tx.discarded),
1449            Self::Parked { .. } => None,
1450        }
1451    }
1452
1453    /// Returns the hash of the replaced transaction if it is a blob transaction.
1454    pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1455        self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1456    }
1457
1458    /// Returns the hash of the transaction
1459    pub fn hash(&self) -> &TxHash {
1460        match self {
1461            Self::Pending(tx) => tx.transaction.hash(),
1462            Self::Parked { transaction, .. } => transaction.hash(),
1463        }
1464    }
1465
1466    /// Converts this type into the event type for listeners
1467    pub fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1468        match self {
1469            Self::Pending(tx) => {
1470                NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1471            }
1472            Self::Parked { transaction, subpool, .. } => {
1473                NewTransactionEvent { transaction, subpool }
1474            }
1475        }
1476    }
1477
1478    /// Returns the subpool this transaction was added to
1479    pub(crate) const fn subpool(&self) -> SubPool {
1480        match self {
1481            Self::Pending(_) => SubPool::Pending,
1482            Self::Parked { subpool, .. } => *subpool,
1483        }
1484    }
1485
1486    /// Returns the [`TransactionId`] of the added transaction
1487    #[cfg(test)]
1488    pub(crate) fn id(&self) -> &TransactionId {
1489        match self {
1490            Self::Pending(added) => added.transaction.id(),
1491            Self::Parked { transaction, .. } => transaction.id(),
1492        }
1493    }
1494
1495    /// Returns the queued reason if the transaction is parked with a queued reason.
1496    pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1497        match self {
1498            Self::Pending(_) => None,
1499            Self::Parked { queued_reason, .. } => queued_reason.as_ref(),
1500        }
1501    }
1502
1503    /// Returns the transaction state based on the subpool and queued reason.
1504    pub fn transaction_state(&self) -> AddedTransactionState {
1505        match self.subpool() {
1506            SubPool::Pending => AddedTransactionState::Pending,
1507            _ => {
1508                // For non-pending transactions, use the queued reason directly from the
1509                // AddedTransaction
1510                if let Some(reason) = self.queued_reason() {
1511                    AddedTransactionState::Queued(reason.clone())
1512                } else {
1513                    // Fallback - this shouldn't happen with the new implementation
1514                    AddedTransactionState::Queued(QueuedReason::NonceGap)
1515                }
1516            }
1517        }
1518    }
1519}
1520
1521/// The specific reason why a transaction is queued (not ready for execution)
1522#[derive(Debug, Clone, PartialEq, Eq)]
1523pub enum QueuedReason {
1524    /// Transaction has a nonce gap - missing prior transactions
1525    NonceGap,
1526    /// Transaction has parked ancestors - waiting for other transactions to be mined
1527    ParkedAncestors,
1528    /// Sender has insufficient balance to cover the transaction cost
1529    InsufficientBalance,
1530    /// Transaction exceeds the block gas limit
1531    TooMuchGas,
1532    /// Transaction doesn't meet the base fee requirement
1533    InsufficientBaseFee,
1534    /// Transaction doesn't meet the blob fee requirement (EIP-4844)
1535    InsufficientBlobFee,
1536}
1537
1538/// The state of a transaction when is was added to the pool
1539#[derive(Debug, Clone, PartialEq, Eq)]
1540pub enum AddedTransactionState {
1541    /// Ready for execution
1542    Pending,
1543    /// Not ready for execution due to a specific condition
1544    Queued(QueuedReason),
1545}
1546
1547impl AddedTransactionState {
1548    /// Returns whether the transaction was submitted as queued.
1549    pub const fn is_queued(&self) -> bool {
1550        matches!(self, Self::Queued(_))
1551    }
1552
1553    /// Returns whether the transaction was submitted as pending.
1554    pub const fn is_pending(&self) -> bool {
1555        matches!(self, Self::Pending)
1556    }
1557
1558    /// Returns the specific queued reason if the transaction is queued.
1559    pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1560        match self {
1561            Self::Queued(reason) => Some(reason),
1562            Self::Pending => None,
1563        }
1564    }
1565}
1566
1567/// The outcome of a successful transaction addition
1568#[derive(Debug, Clone, PartialEq, Eq)]
1569pub struct AddedTransactionOutcome {
1570    /// The hash of the transaction
1571    pub hash: TxHash,
1572    /// The state of the transaction
1573    pub state: AddedTransactionState,
1574}
1575
1576impl AddedTransactionOutcome {
1577    /// Returns whether the transaction was submitted as queued.
1578    pub const fn is_queued(&self) -> bool {
1579        self.state.is_queued()
1580    }
1581
1582    /// Returns whether the transaction was submitted as pending.
1583    pub const fn is_pending(&self) -> bool {
1584        self.state.is_pending()
1585    }
1586}
1587
1588/// Contains all state changes after a [`CanonicalStateUpdate`] was processed
1589#[derive(Debug)]
1590pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1591    /// Hash of the block.
1592    pub(crate) block_hash: B256,
1593    /// All mined transactions.
1594    pub(crate) mined: Vec<TxHash>,
1595    /// Transactions promoted to the pending pool.
1596    pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1597    /// transaction that were discarded during the update
1598    pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1599}
1600
1601impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1602    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1603    /// [`TransactionListenerKind`].
1604    ///
1605    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1606    /// are allowed to be propagated are returned.
1607    pub(crate) fn pending_transactions(
1608        &self,
1609        kind: TransactionListenerKind,
1610    ) -> impl Iterator<Item = B256> + '_ {
1611        let iter = self.promoted.iter();
1612        PendingTransactionIter { kind, iter }
1613    }
1614
1615    /// Returns all FULL transactions that were promoted to the pending pool and adhere to the given
1616    /// [`TransactionListenerKind`].
1617    ///
1618    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1619    /// are allowed to be propagated are returned.
1620    pub(crate) fn full_pending_transactions(
1621        &self,
1622        kind: TransactionListenerKind,
1623    ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1624        let iter = self.promoted.iter();
1625        FullPendingTransactionIter { kind, iter }
1626    }
1627}
1628
1629#[cfg(test)]
1630mod tests {
1631    use crate::{
1632        blobstore::{BlobStore, InMemoryBlobStore},
1633        identifier::SenderId,
1634        test_utils::{MockTransaction, TestPoolBuilder},
1635        validate::ValidTransaction,
1636        BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1637    };
1638    use alloy_eips::{eip4844::BlobTransactionSidecar, eip7594::BlobTransactionSidecarVariant};
1639    use alloy_primitives::Address;
1640    use std::{fs, path::PathBuf};
1641
1642    #[test]
1643    fn test_discard_blobs_on_blob_tx_eviction() {
1644        let blobs = {
1645            // Read the contents of the JSON file into a string.
1646            let json_content = fs::read_to_string(
1647                PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1648            )
1649            .expect("Failed to read the blob data file");
1650
1651            // Parse the JSON contents into a serde_json::Value.
1652            let json_value: serde_json::Value =
1653                serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1654
1655            // Extract blob data from JSON and convert it to Blob.
1656            vec![
1657                // Extract the "data" field from the JSON and parse it as a string.
1658                json_value
1659                    .get("data")
1660                    .unwrap()
1661                    .as_str()
1662                    .expect("Data is not a valid string")
1663                    .to_string(),
1664            ]
1665        };
1666
1667        // Generate a BlobTransactionSidecar from the blobs.
1668        let sidecar = BlobTransactionSidecarVariant::Eip4844(
1669            BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap(),
1670        );
1671
1672        // Define the maximum limit for blobs in the sub-pool.
1673        let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1674
1675        // Create a test pool with default configuration and the specified blob limit.
1676        let test_pool = &TestPoolBuilder::default()
1677            .with_config(PoolConfig { blob_limit, ..Default::default() })
1678            .pool;
1679
1680        // Set the block info for the pool, including a pending blob fee.
1681        test_pool
1682            .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1683
1684        // Create an in-memory blob store.
1685        let blob_store = InMemoryBlobStore::default();
1686
1687        // Loop to add transactions to the pool and test blob eviction.
1688        for n in 0..blob_limit.max_txs + 10 {
1689            // Create a mock transaction with the generated blob sidecar.
1690            let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1691
1692            // Set non zero size
1693            tx.set_size(1844674407370951);
1694
1695            // Insert the sidecar into the blob store if the current index is within the blob limit.
1696            if n < blob_limit.max_txs {
1697                blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1698            }
1699
1700            // Add the transaction to the pool with external origin and valid outcome.
1701            test_pool.add_transactions(
1702                TransactionOrigin::External,
1703                [TransactionValidationOutcome::Valid {
1704                    balance: U256::from(1_000),
1705                    state_nonce: 0,
1706                    bytecode_hash: None,
1707                    transaction: ValidTransaction::ValidWithSidecar {
1708                        transaction: tx,
1709                        sidecar: sidecar.clone(),
1710                    },
1711                    propagate: true,
1712                    authorities: None,
1713                }],
1714            );
1715        }
1716
1717        // Assert that the size of the pool's blob component is equal to the maximum blob limit.
1718        assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1719
1720        // Assert that the size of the pool's blob_size component matches the expected value.
1721        assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1722
1723        // Assert that the pool's blob store matches the expected blob store.
1724        assert_eq!(*test_pool.blob_store(), blob_store);
1725    }
1726
1727    #[test]
1728    fn test_auths_stored_in_identifiers() {
1729        // Create a test pool with default configuration.
1730        let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1731
1732        let auth = Address::new([1; 20]);
1733        let tx = MockTransaction::eip7702();
1734
1735        test_pool.add_transactions(
1736            TransactionOrigin::Local,
1737            [TransactionValidationOutcome::Valid {
1738                balance: U256::from(1_000),
1739                state_nonce: 0,
1740                bytecode_hash: None,
1741                transaction: ValidTransaction::Valid(tx),
1742                propagate: true,
1743                authorities: Some(vec![auth]),
1744            }],
1745        );
1746
1747        let identifiers = test_pool.identifiers.read();
1748        assert_eq!(identifiers.sender_id(&auth), Some(SenderId::from(1)));
1749    }
1750}