reth_transaction_pool/pool/
mod.rs

1//! Transaction Pool internals.
2//!
3//! Incoming transactions are validated before they enter the pool first. The validation outcome can
4//! have 3 states:
5//!
6//!  1. Transaction can _never_ be valid
7//!  2. Transaction is _currently_ valid
8//!  3. Transaction is _currently_ invalid, but could potentially become valid in the future
9//!
10//! However, (2.) and (3.) of a transaction can only be determined on the basis of the current
11//! state, whereas (1.) holds indefinitely. This means once the state changes (2.) and (3.) the
12//! state of a transaction needs to be reevaluated again.
13//!
14//! The transaction pool is responsible for storing new, valid transactions and providing the next
15//! best transactions sorted by their priority. Where priority is determined by the transaction's
16//! score ([`TransactionOrdering`]).
17//!
18//! Furthermore, the following characteristics fall under (3.):
19//!
20//!  a) Nonce of a transaction is higher than the expected nonce for the next transaction of its
21//! sender. A distinction is made here whether multiple transactions from the same sender have
22//! gapless nonce increments.
23//!
24//!  a)(1) If _no_ transaction is missing in a chain of multiple
25//! transactions from the same sender (all nonce in row), all of them can in principle be executed
26//! on the current state one after the other.
27//!
28//!  a)(2) If there's a nonce gap, then all
29//! transactions after the missing transaction are blocked until the missing transaction arrives.
30//!
31//!  b) Transaction does not meet the dynamic fee cap requirement introduced by EIP-1559: The
32//! fee cap of the transaction needs to be no less than the base fee of block.
33//!
34//!
35//! In essence the transaction pool is made of three separate sub-pools:
36//!
37//!  - Pending Pool: Contains all transactions that are valid on the current state and satisfy (3.
38//!    a)(1): _No_ nonce gaps. A _pending_ transaction is considered _ready_ when it has the lowest
39//!    nonce of all transactions from the same sender. Once a _ready_ transaction with nonce `n` has
40//!    been executed, the next highest transaction from the same sender `n + 1` becomes ready.
41//!
42//!  - Queued Pool: Contains all transactions that are currently blocked by missing transactions:
43//!    (3. a)(2): _With_ nonce gaps or due to lack of funds.
44//!
45//!  - Basefee Pool: To account for the dynamic base fee requirement (3. b) which could render an
46//!    EIP-1559 and all subsequent transactions of the sender currently invalid.
47//!
48//! The classification of transactions is always dependent on the current state that is changed as
49//! soon as a new block is mined. Once a new block is mined, the account changeset must be applied
50//! to the transaction pool.
51//!
52//!
53//! Depending on the use case, consumers of the [`TransactionPool`](crate::traits::TransactionPool)
54//! are interested in (2.) and/or (3.).
55
56//! A generic [`TransactionPool`](crate::traits::TransactionPool) that only handles transactions.
57//!
58//! This Pool maintains two separate sub-pools for (2.) and (3.)
59//!
60//! ## Terminology
61//!
62//!  - _Pending_: pending transactions are transactions that fall under (2.). These transactions can
63//!    currently be executed and are stored in the pending sub-pool
64//!  - _Queued_: queued transactions are transactions that fall under category (3.). Those
65//!    transactions are _currently_ waiting for state changes that eventually move them into
66//!    category (2.) and become pending.
67
68use crate::{
69    blobstore::BlobStore,
70    error::{PoolError, PoolErrorKind, PoolResult},
71    identifier::{SenderId, SenderIdentifiers, TransactionId},
72    metrics::BlobStoreMetrics,
73    pool::{
74        listener::{
75            BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76            TransactionListener,
77        },
78        state::SubPool,
79        txpool::{SenderInfo, TxPool},
80        update::UpdateOutcome,
81    },
82    traits::{
83        AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84        NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85    },
86    validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87    CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88    TransactionValidator,
89};
90
91use alloy_primitives::{Address, TxHash, B256};
92use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
93use reth_eth_wire_types::HandleMempoolData;
94use reth_execution_types::ChangedAccount;
95
96use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718};
97use reth_primitives_traits::Recovered;
98use rustc_hash::FxHashMap;
99use std::{
100    collections::HashSet,
101    fmt,
102    sync::{
103        atomic::{AtomicBool, Ordering},
104        Arc,
105    },
106    time::Instant,
107};
108use tokio::sync::mpsc;
109use tracing::{debug, trace, warn};
110mod events;
111pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
112pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
113pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
114pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
115pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
116pub use pending::PendingPool;
117
118mod best;
119pub use best::BestTransactions;
120
121mod blob;
122pub mod listener;
123mod parked;
124pub mod pending;
125pub mod size;
126pub(crate) mod state;
127pub mod txpool;
128mod update;
129
130/// Bound on number of pending transactions from `reth_network::TransactionsManager` to buffer.
131pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
132/// Bound on number of new transactions from `reth_network::TransactionsManager` to buffer.
133pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
134
135const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
136
137/// Transaction pool internals.
138pub struct PoolInner<V, T, S>
139where
140    T: TransactionOrdering,
141{
142    /// Internal mapping of addresses to plain ints.
143    identifiers: RwLock<SenderIdentifiers>,
144    /// Transaction validator.
145    validator: V,
146    /// Storage for blob transactions
147    blob_store: S,
148    /// The internal pool that manages all transactions.
149    pool: RwLock<TxPool<T>>,
150    /// Pool settings.
151    config: PoolConfig,
152    /// Manages listeners for transaction state change events.
153    event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
154    /// Tracks whether any event listeners have ever been installed.
155    has_event_listeners: AtomicBool,
156    /// Listeners for new _full_ pending transactions.
157    pending_transaction_listener: RwLock<Vec<PendingTransactionHashListener>>,
158    /// Listeners for new transactions added to the pool.
159    transaction_listener: RwLock<Vec<TransactionListener<T::Transaction>>>,
160    /// Listener for new blob transaction sidecars added to the pool.
161    blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
162    /// Metrics for the blob store
163    blob_store_metrics: BlobStoreMetrics,
164}
165
166// === impl PoolInner ===
167
168impl<V, T, S> PoolInner<V, T, S>
169where
170    V: TransactionValidator,
171    T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
172    S: BlobStore,
173{
174    /// Create a new transaction pool instance.
175    pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
176        Self {
177            identifiers: Default::default(),
178            validator,
179            event_listener: Default::default(),
180            has_event_listeners: AtomicBool::new(false),
181            pool: RwLock::new(TxPool::new(ordering, config.clone())),
182            pending_transaction_listener: Default::default(),
183            transaction_listener: Default::default(),
184            blob_transaction_sidecar_listener: Default::default(),
185            config,
186            blob_store,
187            blob_store_metrics: Default::default(),
188        }
189    }
190
191    /// Returns the configured blob store.
192    pub const fn blob_store(&self) -> &S {
193        &self.blob_store
194    }
195
196    /// Returns stats about the size of the pool.
197    pub fn size(&self) -> PoolSize {
198        self.get_pool_data().size()
199    }
200
201    /// Returns the currently tracked block
202    pub fn block_info(&self) -> BlockInfo {
203        self.get_pool_data().block_info()
204    }
205    /// Sets the currently tracked block
206    pub fn set_block_info(&self, info: BlockInfo) {
207        self.pool.write().set_block_info(info)
208    }
209
210    /// Returns the internal [`SenderId`] for this address
211    pub fn get_sender_id(&self, addr: Address) -> SenderId {
212        self.identifiers.write().sender_id_or_create(addr)
213    }
214
215    /// Returns the internal [`SenderId`]s for the given addresses.
216    pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
217        self.identifiers.write().sender_ids_or_create(addrs)
218    }
219
220    /// Returns all senders in the pool
221    pub fn unique_senders(&self) -> HashSet<Address> {
222        self.get_pool_data().unique_senders()
223    }
224
225    /// Converts the changed accounts to a map of sender ids to sender info (internal identifier
226    /// used for __tracked__ accounts)
227    fn changed_senders(
228        &self,
229        accs: impl Iterator<Item = ChangedAccount>,
230    ) -> FxHashMap<SenderId, SenderInfo> {
231        let identifiers = self.identifiers.read();
232        accs.into_iter()
233            .filter_map(|acc| {
234                let ChangedAccount { address, nonce, balance } = acc;
235                let sender_id = identifiers.sender_id(&address)?;
236                Some((sender_id, SenderInfo { state_nonce: nonce, balance }))
237            })
238            .collect()
239    }
240
241    /// Get the config the pool was configured with.
242    pub const fn config(&self) -> &PoolConfig {
243        &self.config
244    }
245
246    /// Get the validator reference.
247    pub const fn validator(&self) -> &V {
248        &self.validator
249    }
250
251    /// Adds a new transaction listener to the pool that gets notified about every new _pending_
252    /// transaction inserted into the pool
253    pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
254        let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
255        let listener = PendingTransactionHashListener { sender, kind };
256
257        let mut listeners = self.pending_transaction_listener.write();
258        // Clean up dead listeners before adding new one
259        listeners.retain(|l| !l.sender.is_closed());
260        listeners.push(listener);
261
262        rx
263    }
264
265    /// Adds a new transaction listener to the pool that gets notified about every new transaction.
266    pub fn add_new_transaction_listener(
267        &self,
268        kind: TransactionListenerKind,
269    ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
270        let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
271        let listener = TransactionListener { sender, kind };
272
273        let mut listeners = self.transaction_listener.write();
274        // Clean up dead listeners before adding new one
275        listeners.retain(|l| !l.sender.is_closed());
276        listeners.push(listener);
277
278        rx
279    }
280    /// Adds a new blob sidecar listener to the pool that gets notified about every new
281    /// eip4844 transaction's blob sidecar.
282    pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
283        let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
284        let listener = BlobTransactionSidecarListener { sender };
285        self.blob_transaction_sidecar_listener.lock().push(listener);
286        rx
287    }
288
289    /// If the pool contains the transaction, this adds a new listener that gets notified about
290    /// transaction events.
291    pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
292        if !self.get_pool_data().contains(&tx_hash) {
293            return None
294        }
295        let mut listener = self.event_listener.write();
296        let events = listener.subscribe(tx_hash);
297        self.mark_event_listener_installed();
298        Some(events)
299    }
300
301    /// Adds a listener for all transaction events.
302    pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
303        let mut listener = self.event_listener.write();
304        let events = listener.subscribe_all();
305        self.mark_event_listener_installed();
306        events
307    }
308
309    #[inline]
310    fn has_event_listeners(&self) -> bool {
311        self.has_event_listeners.load(Ordering::Relaxed)
312    }
313
314    #[inline]
315    fn mark_event_listener_installed(&self) {
316        self.has_event_listeners.store(true, Ordering::Relaxed);
317    }
318
319    #[inline]
320    fn update_event_listener_state(&self, listener: &PoolEventBroadcast<T::Transaction>) {
321        if listener.is_empty() {
322            self.has_event_listeners.store(false, Ordering::Relaxed);
323        }
324    }
325
326    #[inline]
327    fn with_event_listener<F>(&self, emit: F)
328    where
329        F: FnOnce(&mut PoolEventBroadcast<T::Transaction>),
330    {
331        if !self.has_event_listeners() {
332            return
333        }
334        let mut listener = self.event_listener.write();
335        if !listener.is_empty() {
336            emit(&mut listener);
337        }
338        self.update_event_listener_state(&listener);
339    }
340
341    /// Returns a read lock to the pool's data.
342    pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
343        self.pool.read()
344    }
345
346    /// Returns transactions in the pool that can be propagated
347    pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
348        let mut out = Vec::new();
349        self.append_pooled_transactions(&mut out);
350        out
351    }
352
353    /// Returns hashes of transactions in the pool that can be propagated.
354    pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
355        let mut out = Vec::new();
356        self.append_pooled_transactions_hashes(&mut out);
357        out
358    }
359
360    /// Returns only the first `max` transactions in the pool that can be propagated.
361    pub fn pooled_transactions_max(
362        &self,
363        max: usize,
364    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
365        let mut out = Vec::new();
366        self.append_pooled_transactions_max(max, &mut out);
367        out
368    }
369
370    /// Extends the given vector with all transactions in the pool that can be propagated.
371    pub fn append_pooled_transactions(
372        &self,
373        out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
374    ) {
375        out.extend(
376            self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned(),
377        );
378    }
379
380    /// Extends the given vector with pooled transactions for the given hashes that are allowed to
381    /// be propagated.
382    pub fn append_pooled_transaction_elements(
383        &self,
384        tx_hashes: &[TxHash],
385        limit: GetPooledTransactionLimit,
386        out: &mut Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>,
387    ) where
388        <V as TransactionValidator>::Transaction: EthPoolTransaction,
389    {
390        let transactions = self.get_all_propagatable(tx_hashes);
391        let mut size = 0;
392        for transaction in transactions {
393            let encoded_len = transaction.encoded_length();
394            let Some(pooled) = self.to_pooled_transaction(transaction) else {
395                continue;
396            };
397
398            size += encoded_len;
399            out.push(pooled.into_inner());
400
401            if limit.exceeds(size) {
402                break
403            }
404        }
405    }
406
407    /// Extends the given vector with the hashes of all transactions in the pool that can be
408    /// propagated.
409    pub fn append_pooled_transactions_hashes(&self, out: &mut Vec<TxHash>) {
410        out.extend(
411            self.get_pool_data()
412                .all()
413                .transactions_iter()
414                .filter(|tx| tx.propagate)
415                .map(|tx| *tx.hash()),
416        );
417    }
418
419    /// Extends the given vector with only the first `max` transactions in the pool that can be
420    /// propagated.
421    pub fn append_pooled_transactions_max(
422        &self,
423        max: usize,
424        out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
425    ) {
426        out.extend(
427            self.get_pool_data()
428                .all()
429                .transactions_iter()
430                .filter(|tx| tx.propagate)
431                .take(max)
432                .cloned(),
433        );
434    }
435
436    /// Returns only the first `max` hashes of transactions in the pool that can be propagated.
437    pub fn pooled_transactions_hashes_max(&self, max: usize) -> Vec<TxHash> {
438        if max == 0 {
439            return Vec::new();
440        }
441        self.get_pool_data()
442            .all()
443            .transactions_iter()
444            .filter(|tx| tx.propagate)
445            .take(max)
446            .map(|tx| *tx.hash())
447            .collect()
448    }
449
450    /// Converts the internally tracked transaction to the pooled format.
451    ///
452    /// If the transaction is an EIP-4844 transaction, the blob sidecar is fetched from the blob
453    /// store and attached to the transaction.
454    fn to_pooled_transaction(
455        &self,
456        transaction: Arc<ValidPoolTransaction<T::Transaction>>,
457    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
458    where
459        <V as TransactionValidator>::Transaction: EthPoolTransaction,
460    {
461        if transaction.is_eip4844() {
462            let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
463            transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
464        } else {
465            transaction
466                .transaction
467                .clone_into_pooled()
468                .inspect_err(|err| {
469                    debug!(
470                        target: "txpool", %err,
471                        "failed to convert transaction to pooled element; skipping",
472                    );
473                })
474                .ok()
475        }
476    }
477
478    /// Returns pooled transactions for the given transaction hashes that are allowed to be
479    /// propagated.
480    pub fn get_pooled_transaction_elements(
481        &self,
482        tx_hashes: Vec<TxHash>,
483        limit: GetPooledTransactionLimit,
484    ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
485    where
486        <V as TransactionValidator>::Transaction: EthPoolTransaction,
487    {
488        let mut elements = Vec::new();
489        self.append_pooled_transaction_elements(&tx_hashes, limit, &mut elements);
490        elements.shrink_to_fit();
491        elements
492    }
493
494    /// Returns converted pooled transaction for the given transaction hash.
495    pub fn get_pooled_transaction_element(
496        &self,
497        tx_hash: TxHash,
498    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
499    where
500        <V as TransactionValidator>::Transaction: EthPoolTransaction,
501    {
502        self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
503    }
504
505    /// Updates the entire pool after a new block was executed.
506    pub fn on_canonical_state_change(&self, update: CanonicalStateUpdate<'_, V::Block>) {
507        trace!(target: "txpool", ?update, "updating pool on canonical state change");
508
509        let block_info = update.block_info();
510        let CanonicalStateUpdate {
511            new_tip, changed_accounts, mined_transactions, update_kind, ..
512        } = update;
513        self.validator.on_new_head_block(new_tip);
514
515        let changed_senders = self.changed_senders(changed_accounts.into_iter());
516
517        // update the pool
518        let outcome = self.pool.write().on_canonical_state_change(
519            block_info,
520            mined_transactions,
521            changed_senders,
522            update_kind,
523        );
524
525        // This will discard outdated transactions based on the account's nonce
526        self.delete_discarded_blobs(outcome.discarded.iter());
527
528        // notify listeners about updates
529        self.notify_on_new_state(outcome);
530    }
531
532    /// Performs account updates on the pool.
533    ///
534    /// This will either promote or discard transactions based on the new account state.
535    ///
536    /// This should be invoked when the pool drifted and accounts are updated manually
537    pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
538        let changed_senders = self.changed_senders(accounts.into_iter());
539        let UpdateOutcome { promoted, discarded } =
540            self.pool.write().update_accounts(changed_senders);
541
542        self.notify_on_transaction_updates(promoted, discarded);
543    }
544
545    /// Add a single validated transaction into the pool.
546    ///
547    /// Returns the outcome and optionally metadata to be processed after the pool lock is
548    /// released.
549    ///
550    /// Note: this is only used internally by [`Self::add_transactions()`], all new transaction(s)
551    /// come in through that function, either as a batch or `std::iter::once`.
552    fn add_transaction(
553        &self,
554        pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
555        origin: TransactionOrigin,
556        tx: TransactionValidationOutcome<T::Transaction>,
557    ) -> (PoolResult<AddedTransactionOutcome>, Option<AddedTransactionMeta<T::Transaction>>) {
558        match tx {
559            TransactionValidationOutcome::Valid {
560                balance,
561                state_nonce,
562                transaction,
563                propagate,
564                bytecode_hash,
565                authorities,
566            } => {
567                let sender_id = self.get_sender_id(transaction.sender());
568                let transaction_id = TransactionId::new(sender_id, transaction.nonce());
569
570                // split the valid transaction and the blob sidecar if it has any
571                let (transaction, blob_sidecar) = match transaction {
572                    ValidTransaction::Valid(tx) => (tx, None),
573                    ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
574                        debug_assert!(
575                            transaction.is_eip4844(),
576                            "validator returned sidecar for non EIP-4844 transaction"
577                        );
578                        (transaction, Some(sidecar))
579                    }
580                };
581
582                let tx = ValidPoolTransaction {
583                    transaction,
584                    transaction_id,
585                    propagate,
586                    timestamp: Instant::now(),
587                    origin,
588                    authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
589                };
590
591                let added = match pool.add_transaction(tx, balance, state_nonce, bytecode_hash) {
592                    Ok(added) => added,
593                    Err(err) => return (Err(err), None),
594                };
595                let hash = *added.hash();
596                let state = added.transaction_state();
597
598                let meta = AddedTransactionMeta { added, blob_sidecar };
599
600                (Ok(AddedTransactionOutcome { hash, state }), Some(meta))
601            }
602            TransactionValidationOutcome::Invalid(tx, err) => {
603                self.with_event_listener(|listener| listener.invalid(tx.hash()));
604                (Err(PoolError::new(*tx.hash(), err)), None)
605            }
606            TransactionValidationOutcome::Error(tx_hash, err) => {
607                self.with_event_listener(|listener| listener.discarded(&tx_hash));
608                (Err(PoolError::other(tx_hash, err)), None)
609            }
610        }
611    }
612
613    /// Adds a transaction and returns the event stream.
614    pub fn add_transaction_and_subscribe(
615        &self,
616        origin: TransactionOrigin,
617        tx: TransactionValidationOutcome<T::Transaction>,
618    ) -> PoolResult<TransactionEvents> {
619        let listener = {
620            let mut listener = self.event_listener.write();
621            let events = listener.subscribe(tx.tx_hash());
622            self.mark_event_listener_installed();
623            events
624        };
625        let mut results = self.add_transactions(origin, std::iter::once(tx));
626        results.pop().expect("result length is the same as the input")?;
627        Ok(listener)
628    }
629
630    /// Adds all transactions in the iterator to the pool, returning a list of results.
631    pub fn add_transactions(
632        &self,
633        origin: TransactionOrigin,
634        transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
635    ) -> Vec<PoolResult<AddedTransactionOutcome>> {
636        // Collect results and metadata while holding the pool write lock
637        let (mut results, added_metas, discarded) = {
638            let mut pool = self.pool.write();
639            let mut added_metas = Vec::new();
640
641            let results = transactions
642                .into_iter()
643                .map(|tx| {
644                    let (result, meta) = self.add_transaction(&mut pool, origin, tx);
645
646                    // Only collect metadata for successful insertions
647                    if result.is_ok() &&
648                        let Some(meta) = meta
649                    {
650                        added_metas.push(meta);
651                    }
652
653                    result
654                })
655                .collect::<Vec<_>>();
656
657            // Enforce the pool size limits if at least one transaction was added successfully
658            let discarded = if results.iter().any(Result::is_ok) {
659                pool.discard_worst()
660            } else {
661                Default::default()
662            };
663
664            (results, added_metas, discarded)
665        };
666
667        for meta in added_metas {
668            self.on_added_transaction(meta);
669        }
670
671        if !discarded.is_empty() {
672            // Delete any blobs associated with discarded blob transactions
673            self.delete_discarded_blobs(discarded.iter());
674            self.with_event_listener(|listener| listener.discarded_many(&discarded));
675
676            let discarded_hashes =
677                discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
678
679            // A newly added transaction may be immediately discarded, so we need to
680            // adjust the result here
681            for res in &mut results {
682                if let Ok(AddedTransactionOutcome { hash, .. }) = res &&
683                    discarded_hashes.contains(hash)
684                {
685                    *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
686                }
687            }
688        };
689
690        results
691    }
692
693    /// Process a transaction that was added to the pool.
694    ///
695    /// Performs blob storage operations and sends all notifications. This should be called
696    /// after the pool write lock has been released to avoid blocking pool operations.
697    fn on_added_transaction(&self, meta: AddedTransactionMeta<T::Transaction>) {
698        // Handle blob sidecar storage and notifications for EIP-4844 transactions
699        if let Some(sidecar) = meta.blob_sidecar {
700            let hash = *meta.added.hash();
701            self.on_new_blob_sidecar(&hash, &sidecar);
702            self.insert_blob(hash, sidecar);
703        }
704
705        // Delete replaced blob sidecar if any
706        if let Some(replaced) = meta.added.replaced_blob_transaction() {
707            debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
708            self.delete_blob(replaced);
709        }
710
711        // Delete discarded blob sidecars if any, this doesnt do any IO.
712        if let Some(discarded) = meta.added.discarded_transactions() {
713            self.delete_discarded_blobs(discarded.iter());
714        }
715
716        // Notify pending transaction listeners
717        if let Some(pending) = meta.added.as_pending() {
718            self.on_new_pending_transaction(pending);
719        }
720
721        // Notify event listeners
722        self.notify_event_listeners(&meta.added);
723
724        // Notify new transaction listeners
725        self.on_new_transaction(meta.added.into_new_transaction_event());
726    }
727
728    /// Notify all listeners about a new pending transaction.
729    ///
730    /// See also [`Self::add_pending_listener`]
731    ///
732    /// CAUTION: This function is only intended to be used manually in order to use this type's
733    /// pending transaction receivers when manually implementing the
734    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
735    /// [`TransactionPool::pending_transactions_listener_for`](crate::TransactionPool).
736    pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
737        let mut needs_cleanup = false;
738
739        {
740            let listeners = self.pending_transaction_listener.read();
741            for listener in listeners.iter() {
742                if !listener.send_all(pending.pending_transactions(listener.kind)) {
743                    needs_cleanup = true;
744                }
745            }
746        }
747
748        // Clean up dead listeners if we detected any closed channels
749        if needs_cleanup {
750            self.pending_transaction_listener
751                .write()
752                .retain(|listener| !listener.sender.is_closed());
753        }
754    }
755
756    /// Notify all listeners about a newly inserted pending transaction.
757    ///
758    /// See also [`Self::add_new_transaction_listener`]
759    ///
760    /// CAUTION: This function is only intended to be used manually in order to use this type's
761    /// transaction receivers when manually implementing the
762    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
763    /// [`TransactionPool::new_transactions_listener_for`](crate::TransactionPool).
764    pub fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
765        let mut needs_cleanup = false;
766
767        {
768            let listeners = self.transaction_listener.read();
769            for listener in listeners.iter() {
770                if listener.kind.is_propagate_only() && !event.transaction.propagate {
771                    if listener.sender.is_closed() {
772                        needs_cleanup = true;
773                    }
774                    // Skip non-propagate transactions for propagate-only listeners
775                    continue
776                }
777
778                if !listener.send(event.clone()) {
779                    needs_cleanup = true;
780                }
781            }
782        }
783
784        // Clean up dead listeners if we detected any closed channels
785        if needs_cleanup {
786            self.transaction_listener.write().retain(|listener| !listener.sender.is_closed());
787        }
788    }
789
790    /// Notify all listeners about a blob sidecar for a newly inserted blob (eip4844) transaction.
791    fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecarVariant) {
792        let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
793        if sidecar_listeners.is_empty() {
794            return
795        }
796        let sidecar = Arc::new(sidecar.clone());
797        sidecar_listeners.retain_mut(|listener| {
798            let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
799            match listener.sender.try_send(new_blob_event) {
800                Ok(()) => true,
801                Err(err) => {
802                    if matches!(err, mpsc::error::TrySendError::Full(_)) {
803                        debug!(
804                            target: "txpool",
805                            "[{:?}] failed to send blob sidecar; channel full",
806                            sidecar,
807                        );
808                        true
809                    } else {
810                        false
811                    }
812                }
813            }
814        })
815    }
816
817    /// Notifies transaction listeners about changes once a block was processed.
818    fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
819        trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
820
821        // notify about promoted pending transactions - emit hashes
822        let mut needs_pending_cleanup = false;
823        {
824            let listeners = self.pending_transaction_listener.read();
825            for listener in listeners.iter() {
826                if !listener.send_all(outcome.pending_transactions(listener.kind)) {
827                    needs_pending_cleanup = true;
828                }
829            }
830        }
831        if needs_pending_cleanup {
832            self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
833        }
834
835        // emit full transactions
836        let mut needs_tx_cleanup = false;
837        {
838            let listeners = self.transaction_listener.read();
839            for listener in listeners.iter() {
840                if !listener.send_all(outcome.full_pending_transactions(listener.kind)) {
841                    needs_tx_cleanup = true;
842                }
843            }
844        }
845        if needs_tx_cleanup {
846            self.transaction_listener.write().retain(|l| !l.sender.is_closed());
847        }
848
849        let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
850
851        // broadcast specific transaction events
852        self.with_event_listener(|listener| {
853            for tx in &mined {
854                listener.mined(tx, block_hash);
855            }
856            for tx in &promoted {
857                listener.pending(tx.hash(), None);
858            }
859            for tx in &discarded {
860                listener.discarded(tx.hash());
861            }
862        })
863    }
864
865    /// Notifies all listeners about the transaction movements.
866    ///
867    /// This will emit events according to the provided changes.
868    ///
869    /// CAUTION: This function is only intended to be used manually in order to use this type's
870    /// [`TransactionEvents`] receivers when manually implementing the
871    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
872    /// [`TransactionPool::transaction_event_listener`](crate::TransactionPool).
873    #[allow(clippy::type_complexity)]
874    pub fn notify_on_transaction_updates(
875        &self,
876        promoted: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
877        discarded: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
878    ) {
879        // Notify about promoted pending transactions (similar to notify_on_new_state)
880        if !promoted.is_empty() {
881            let mut needs_pending_cleanup = false;
882            {
883                let listeners = self.pending_transaction_listener.read();
884                for listener in listeners.iter() {
885                    let promoted_hashes = promoted.iter().filter_map(|tx| {
886                        if listener.kind.is_propagate_only() && !tx.propagate {
887                            None
888                        } else {
889                            Some(*tx.hash())
890                        }
891                    });
892                    if !listener.send_all(promoted_hashes) {
893                        needs_pending_cleanup = true;
894                    }
895                }
896            }
897            if needs_pending_cleanup {
898                self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
899            }
900
901            // in this case we should also emit promoted transactions in full
902            let mut needs_tx_cleanup = false;
903            {
904                let listeners = self.transaction_listener.read();
905                for listener in listeners.iter() {
906                    let promoted_txs = promoted.iter().filter_map(|tx| {
907                        if listener.kind.is_propagate_only() && !tx.propagate {
908                            None
909                        } else {
910                            Some(NewTransactionEvent::pending(tx.clone()))
911                        }
912                    });
913                    if !listener.send_all(promoted_txs) {
914                        needs_tx_cleanup = true;
915                    }
916                }
917            }
918            if needs_tx_cleanup {
919                self.transaction_listener.write().retain(|l| !l.sender.is_closed());
920            }
921        }
922
923        self.with_event_listener(|listener| {
924            for tx in &promoted {
925                listener.pending(tx.hash(), None);
926            }
927            for tx in &discarded {
928                listener.discarded(tx.hash());
929            }
930        });
931
932        if !discarded.is_empty() {
933            // This deletes outdated blob txs from the blob store, based on the account's nonce.
934            // This is called during txpool maintenance when the pool drifted.
935            self.delete_discarded_blobs(discarded.iter());
936        }
937    }
938
939    /// Fire events for the newly added transaction if there are any.
940    ///
941    /// See also [`Self::add_transaction_event_listener`].
942    ///
943    /// CAUTION: This function is only intended to be used manually in order to use this type's
944    /// [`TransactionEvents`] receivers when manually implementing the
945    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
946    /// [`TransactionPool::transaction_event_listener`](crate::TransactionPool).
947    pub fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
948        self.with_event_listener(|listener| match tx {
949            AddedTransaction::Pending(tx) => {
950                let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
951
952                listener.pending(transaction.hash(), replaced.clone());
953                for tx in promoted {
954                    listener.pending(tx.hash(), None);
955                }
956                for tx in discarded {
957                    listener.discarded(tx.hash());
958                }
959            }
960            AddedTransaction::Parked { transaction, replaced, queued_reason, .. } => {
961                listener.queued(transaction.hash(), queued_reason.clone());
962                if let Some(replaced) = replaced {
963                    listener.replaced(replaced.clone(), *transaction.hash());
964                }
965            }
966        });
967    }
968
969    /// Returns an iterator that yields transactions that are ready to be included in the block.
970    pub fn best_transactions(&self) -> BestTransactions<T> {
971        self.get_pool_data().best_transactions()
972    }
973
974    /// Returns an iterator that yields transactions that are ready to be included in the block with
975    /// the given base fee and optional blob fee attributes.
976    pub fn best_transactions_with_attributes(
977        &self,
978        best_transactions_attributes: BestTransactionsAttributes,
979    ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
980    {
981        self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
982    }
983
984    /// Returns only the first `max` transactions in the pending pool.
985    pub fn pending_transactions_max(
986        &self,
987        max: usize,
988    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
989        self.get_pool_data().pending_transactions_iter().take(max).collect()
990    }
991
992    /// Returns all transactions from the pending sub-pool
993    pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
994        self.get_pool_data().pending_transactions()
995    }
996
997    /// Returns all transactions from parked pools
998    pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
999        self.get_pool_data().queued_transactions()
1000    }
1001
1002    /// Returns all transactions in the pool
1003    pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
1004        let pool = self.get_pool_data();
1005        AllPoolTransactions {
1006            pending: pool.pending_transactions(),
1007            queued: pool.queued_transactions(),
1008        }
1009    }
1010
1011    /// Returns _all_ transactions in the pool
1012    pub fn all_transaction_hashes(&self) -> Vec<TxHash> {
1013        self.get_pool_data().all().transactions_iter().map(|tx| *tx.hash()).collect()
1014    }
1015
1016    /// Removes and returns all matching transactions from the pool.
1017    ///
1018    /// This behaves as if the transactions got discarded (_not_ mined), effectively introducing a
1019    /// nonce gap for the given transactions.
1020    pub fn remove_transactions(
1021        &self,
1022        hashes: Vec<TxHash>,
1023    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1024        if hashes.is_empty() {
1025            return Vec::new()
1026        }
1027        let removed = self.pool.write().remove_transactions(hashes);
1028
1029        self.with_event_listener(|listener| listener.discarded_many(&removed));
1030
1031        removed
1032    }
1033
1034    /// Removes and returns all matching transactions and their dependent transactions from the
1035    /// pool.
1036    pub fn remove_transactions_and_descendants(
1037        &self,
1038        hashes: Vec<TxHash>,
1039    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1040        if hashes.is_empty() {
1041            return Vec::new()
1042        }
1043        let removed = self.pool.write().remove_transactions_and_descendants(hashes);
1044
1045        self.with_event_listener(|listener| {
1046            for tx in &removed {
1047                listener.discarded(tx.hash());
1048            }
1049        });
1050
1051        removed
1052    }
1053
1054    /// Removes and returns all transactions by the specified sender from the pool.
1055    pub fn remove_transactions_by_sender(
1056        &self,
1057        sender: Address,
1058    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1059        let sender_id = self.get_sender_id(sender);
1060        let removed = self.pool.write().remove_transactions_by_sender(sender_id);
1061
1062        self.with_event_listener(|listener| listener.discarded_many(&removed));
1063
1064        removed
1065    }
1066
1067    /// Removes and returns all transactions that are present in the pool.
1068    pub fn retain_unknown<A>(&self, announcement: &mut A)
1069    where
1070        A: HandleMempoolData,
1071    {
1072        if announcement.is_empty() {
1073            return
1074        }
1075        let pool = self.get_pool_data();
1076        announcement.retain_by_hash(|tx| !pool.contains(tx))
1077    }
1078
1079    /// Returns the transaction by hash.
1080    pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1081        self.get_pool_data().get(tx_hash)
1082    }
1083
1084    /// Returns all transactions of the address
1085    pub fn get_transactions_by_sender(
1086        &self,
1087        sender: Address,
1088    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1089        let sender_id = self.get_sender_id(sender);
1090        self.get_pool_data().get_transactions_by_sender(sender_id)
1091    }
1092
1093    /// Returns all queued transactions of the address by sender
1094    pub fn get_queued_transactions_by_sender(
1095        &self,
1096        sender: Address,
1097    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1098        let sender_id = self.get_sender_id(sender);
1099        self.get_pool_data().queued_txs_by_sender(sender_id)
1100    }
1101
1102    /// Returns all pending transactions filtered by predicate
1103    pub fn pending_transactions_with_predicate(
1104        &self,
1105        predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
1106    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1107        self.get_pool_data().pending_transactions_with_predicate(predicate)
1108    }
1109
1110    /// Returns all pending transactions of the address by sender
1111    pub fn get_pending_transactions_by_sender(
1112        &self,
1113        sender: Address,
1114    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1115        let sender_id = self.get_sender_id(sender);
1116        self.get_pool_data().pending_txs_by_sender(sender_id)
1117    }
1118
1119    /// Returns the highest transaction of the address
1120    pub fn get_highest_transaction_by_sender(
1121        &self,
1122        sender: Address,
1123    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1124        let sender_id = self.get_sender_id(sender);
1125        self.get_pool_data().get_highest_transaction_by_sender(sender_id)
1126    }
1127
1128    /// Returns the transaction with the highest nonce that is executable given the on chain nonce.
1129    pub fn get_highest_consecutive_transaction_by_sender(
1130        &self,
1131        sender: Address,
1132        on_chain_nonce: u64,
1133    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1134        let sender_id = self.get_sender_id(sender);
1135        self.get_pool_data().get_highest_consecutive_transaction_by_sender(
1136            sender_id.into_transaction_id(on_chain_nonce),
1137        )
1138    }
1139
1140    /// Returns the transaction given a [`TransactionId`]
1141    pub fn get_transaction_by_transaction_id(
1142        &self,
1143        transaction_id: &TransactionId,
1144    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1145        self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
1146    }
1147
1148    /// Returns all transactions that where submitted with the given [`TransactionOrigin`]
1149    pub fn get_transactions_by_origin(
1150        &self,
1151        origin: TransactionOrigin,
1152    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1153        self.get_pool_data()
1154            .all()
1155            .transactions_iter()
1156            .filter(|tx| tx.origin == origin)
1157            .cloned()
1158            .collect()
1159    }
1160
1161    /// Returns all pending transactions filtered by [`TransactionOrigin`]
1162    pub fn get_pending_transactions_by_origin(
1163        &self,
1164        origin: TransactionOrigin,
1165    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1166        self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
1167    }
1168
1169    /// Returns all the transactions belonging to the hashes.
1170    ///
1171    /// If no transaction exists, it is skipped.
1172    pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1173        if txs.is_empty() {
1174            return Vec::new()
1175        }
1176        self.get_pool_data().get_all(txs).collect()
1177    }
1178
1179    /// Returns all the transactions belonging to the hashes that are propagatable.
1180    ///
1181    /// If no transaction exists, it is skipped.
1182    fn get_all_propagatable(
1183        &self,
1184        txs: &[TxHash],
1185    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1186        if txs.is_empty() {
1187            return Vec::new()
1188        }
1189        let pool = self.get_pool_data();
1190        txs.iter().filter_map(|tx| pool.get(tx).filter(|tx| tx.propagate)).collect()
1191    }
1192
1193    /// Notify about propagated transactions.
1194    pub fn on_propagated(&self, txs: PropagatedTransactions) {
1195        if txs.0.is_empty() {
1196            return
1197        }
1198        self.with_event_listener(|listener| {
1199            txs.0.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers));
1200        });
1201    }
1202
1203    /// Number of transactions in the entire pool
1204    pub fn len(&self) -> usize {
1205        self.get_pool_data().len()
1206    }
1207
1208    /// Whether the pool is empty
1209    pub fn is_empty(&self) -> bool {
1210        self.get_pool_data().is_empty()
1211    }
1212
1213    /// Returns whether or not the pool is over its configured size and transaction count limits.
1214    pub fn is_exceeded(&self) -> bool {
1215        self.pool.read().is_exceeded()
1216    }
1217
1218    /// Inserts a blob transaction into the blob store
1219    fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecarVariant) {
1220        debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
1221        if let Err(err) = self.blob_store.insert(hash, blob) {
1222            warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
1223            self.blob_store_metrics.blobstore_failed_inserts.increment(1);
1224        }
1225        self.update_blob_store_metrics();
1226    }
1227
1228    /// Delete a blob from the blob store
1229    pub fn delete_blob(&self, blob: TxHash) {
1230        let _ = self.blob_store.delete(blob);
1231    }
1232
1233    /// Delete all blobs from the blob store
1234    pub fn delete_blobs(&self, txs: Vec<TxHash>) {
1235        let _ = self.blob_store.delete_all(txs);
1236    }
1237
1238    /// Cleans up the blob store
1239    pub fn cleanup_blobs(&self) {
1240        let stat = self.blob_store.cleanup();
1241        self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
1242        self.update_blob_store_metrics();
1243    }
1244
1245    fn update_blob_store_metrics(&self) {
1246        if let Some(data_size) = self.blob_store.data_size_hint() {
1247            self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
1248        }
1249        self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
1250    }
1251
1252    /// Deletes all blob transactions that were discarded.
1253    fn delete_discarded_blobs<'a>(
1254        &'a self,
1255        transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
1256    ) {
1257        let blob_txs = transactions
1258            .into_iter()
1259            .filter(|tx| tx.transaction.is_eip4844())
1260            .map(|tx| *tx.hash())
1261            .collect();
1262        self.delete_blobs(blob_txs);
1263    }
1264}
1265
1266impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
1267    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1268        f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
1269    }
1270}
1271
1272/// Metadata for a transaction that was added to the pool.
1273///
1274/// This holds all the data needed to complete post-insertion operations (notifications,
1275/// blob storage).
1276#[derive(Debug)]
1277struct AddedTransactionMeta<T: PoolTransaction> {
1278    /// The transaction that was added to the pool
1279    added: AddedTransaction<T>,
1280    /// Optional blob sidecar for EIP-4844 transactions
1281    blob_sidecar: Option<BlobTransactionSidecarVariant>,
1282}
1283
1284/// Tracks an added transaction and all graph changes caused by adding it.
1285#[derive(Debug, Clone)]
1286pub struct AddedPendingTransaction<T: PoolTransaction> {
1287    /// Inserted transaction.
1288    pub transaction: Arc<ValidPoolTransaction<T>>,
1289    /// Replaced transaction.
1290    pub replaced: Option<Arc<ValidPoolTransaction<T>>>,
1291    /// transactions promoted to the pending queue
1292    pub promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1293    /// transactions that failed and became discarded
1294    pub discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1295}
1296
1297impl<T: PoolTransaction> AddedPendingTransaction<T> {
1298    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1299    /// [`TransactionListenerKind`].
1300    ///
1301    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1302    /// are allowed to be propagated are returned.
1303    pub(crate) fn pending_transactions(
1304        &self,
1305        kind: TransactionListenerKind,
1306    ) -> impl Iterator<Item = B256> + '_ {
1307        let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1308        PendingTransactionIter { kind, iter }
1309    }
1310}
1311
1312pub(crate) struct PendingTransactionIter<Iter> {
1313    kind: TransactionListenerKind,
1314    iter: Iter,
1315}
1316
1317impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1318where
1319    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1320    T: PoolTransaction + 'a,
1321{
1322    type Item = B256;
1323
1324    fn next(&mut self) -> Option<Self::Item> {
1325        loop {
1326            let next = self.iter.next()?;
1327            if self.kind.is_propagate_only() && !next.propagate {
1328                continue
1329            }
1330            return Some(*next.hash())
1331        }
1332    }
1333}
1334
1335/// An iterator over full pending transactions
1336pub(crate) struct FullPendingTransactionIter<Iter> {
1337    kind: TransactionListenerKind,
1338    iter: Iter,
1339}
1340
1341impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1342where
1343    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1344    T: PoolTransaction + 'a,
1345{
1346    type Item = NewTransactionEvent<T>;
1347
1348    fn next(&mut self) -> Option<Self::Item> {
1349        loop {
1350            let next = self.iter.next()?;
1351            if self.kind.is_propagate_only() && !next.propagate {
1352                continue
1353            }
1354            return Some(NewTransactionEvent {
1355                subpool: SubPool::Pending,
1356                transaction: next.clone(),
1357            })
1358        }
1359    }
1360}
1361
1362/// Represents a transaction that was added into the pool and its state
1363#[derive(Debug, Clone)]
1364pub enum AddedTransaction<T: PoolTransaction> {
1365    /// Transaction was successfully added and moved to the pending pool.
1366    Pending(AddedPendingTransaction<T>),
1367    /// Transaction was successfully added but not yet ready for processing and moved to a
1368    /// parked pool instead.
1369    Parked {
1370        /// Inserted transaction.
1371        transaction: Arc<ValidPoolTransaction<T>>,
1372        /// Replaced transaction.
1373        replaced: Option<Arc<ValidPoolTransaction<T>>>,
1374        /// The subpool it was moved to.
1375        subpool: SubPool,
1376        /// The specific reason why the transaction is queued (if applicable).
1377        queued_reason: Option<QueuedReason>,
1378    },
1379}
1380
1381impl<T: PoolTransaction> AddedTransaction<T> {
1382    /// Returns whether the transaction has been added to the pending pool.
1383    pub const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1384        match self {
1385            Self::Pending(tx) => Some(tx),
1386            _ => None,
1387        }
1388    }
1389
1390    /// Returns the replaced transaction if there was one
1391    pub const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1392        match self {
1393            Self::Pending(tx) => tx.replaced.as_ref(),
1394            Self::Parked { replaced, .. } => replaced.as_ref(),
1395        }
1396    }
1397
1398    /// Returns the discarded transactions if there were any
1399    pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1400        match self {
1401            Self::Pending(tx) => Some(&tx.discarded),
1402            Self::Parked { .. } => None,
1403        }
1404    }
1405
1406    /// Returns the hash of the replaced transaction if it is a blob transaction.
1407    pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1408        self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1409    }
1410
1411    /// Returns the hash of the transaction
1412    pub fn hash(&self) -> &TxHash {
1413        match self {
1414            Self::Pending(tx) => tx.transaction.hash(),
1415            Self::Parked { transaction, .. } => transaction.hash(),
1416        }
1417    }
1418
1419    /// Converts this type into the event type for listeners
1420    pub fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1421        match self {
1422            Self::Pending(tx) => {
1423                NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1424            }
1425            Self::Parked { transaction, subpool, .. } => {
1426                NewTransactionEvent { transaction, subpool }
1427            }
1428        }
1429    }
1430
1431    /// Returns the subpool this transaction was added to
1432    pub(crate) const fn subpool(&self) -> SubPool {
1433        match self {
1434            Self::Pending(_) => SubPool::Pending,
1435            Self::Parked { subpool, .. } => *subpool,
1436        }
1437    }
1438
1439    /// Returns the [`TransactionId`] of the added transaction
1440    #[cfg(test)]
1441    pub(crate) fn id(&self) -> &TransactionId {
1442        match self {
1443            Self::Pending(added) => added.transaction.id(),
1444            Self::Parked { transaction, .. } => transaction.id(),
1445        }
1446    }
1447
1448    /// Returns the queued reason if the transaction is parked with a queued reason.
1449    pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1450        match self {
1451            Self::Pending(_) => None,
1452            Self::Parked { queued_reason, .. } => queued_reason.as_ref(),
1453        }
1454    }
1455
1456    /// Returns the transaction state based on the subpool and queued reason.
1457    pub fn transaction_state(&self) -> AddedTransactionState {
1458        match self.subpool() {
1459            SubPool::Pending => AddedTransactionState::Pending,
1460            _ => {
1461                // For non-pending transactions, use the queued reason directly from the
1462                // AddedTransaction
1463                if let Some(reason) = self.queued_reason() {
1464                    AddedTransactionState::Queued(reason.clone())
1465                } else {
1466                    // Fallback - this shouldn't happen with the new implementation
1467                    AddedTransactionState::Queued(QueuedReason::NonceGap)
1468                }
1469            }
1470        }
1471    }
1472}
1473
1474/// The specific reason why a transaction is queued (not ready for execution)
1475#[derive(Debug, Clone, PartialEq, Eq)]
1476pub enum QueuedReason {
1477    /// Transaction has a nonce gap - missing prior transactions
1478    NonceGap,
1479    /// Transaction has parked ancestors - waiting for other transactions to be mined
1480    ParkedAncestors,
1481    /// Sender has insufficient balance to cover the transaction cost
1482    InsufficientBalance,
1483    /// Transaction exceeds the block gas limit
1484    TooMuchGas,
1485    /// Transaction doesn't meet the base fee requirement
1486    InsufficientBaseFee,
1487    /// Transaction doesn't meet the blob fee requirement (EIP-4844)
1488    InsufficientBlobFee,
1489}
1490
1491/// The state of a transaction when is was added to the pool
1492#[derive(Debug, Clone, PartialEq, Eq)]
1493pub enum AddedTransactionState {
1494    /// Ready for execution
1495    Pending,
1496    /// Not ready for execution due to a specific condition
1497    Queued(QueuedReason),
1498}
1499
1500impl AddedTransactionState {
1501    /// Returns whether the transaction was submitted as queued.
1502    pub const fn is_queued(&self) -> bool {
1503        matches!(self, Self::Queued(_))
1504    }
1505
1506    /// Returns whether the transaction was submitted as pending.
1507    pub const fn is_pending(&self) -> bool {
1508        matches!(self, Self::Pending)
1509    }
1510
1511    /// Returns the specific queued reason if the transaction is queued.
1512    pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1513        match self {
1514            Self::Queued(reason) => Some(reason),
1515            Self::Pending => None,
1516        }
1517    }
1518}
1519
1520/// The outcome of a successful transaction addition
1521#[derive(Debug, Clone, PartialEq, Eq)]
1522pub struct AddedTransactionOutcome {
1523    /// The hash of the transaction
1524    pub hash: TxHash,
1525    /// The state of the transaction
1526    pub state: AddedTransactionState,
1527}
1528
1529impl AddedTransactionOutcome {
1530    /// Returns whether the transaction was submitted as queued.
1531    pub const fn is_queued(&self) -> bool {
1532        self.state.is_queued()
1533    }
1534
1535    /// Returns whether the transaction was submitted as pending.
1536    pub const fn is_pending(&self) -> bool {
1537        self.state.is_pending()
1538    }
1539}
1540
1541/// Contains all state changes after a [`CanonicalStateUpdate`] was processed
1542#[derive(Debug)]
1543pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1544    /// Hash of the block.
1545    pub(crate) block_hash: B256,
1546    /// All mined transactions.
1547    pub(crate) mined: Vec<TxHash>,
1548    /// Transactions promoted to the pending pool.
1549    pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1550    /// transaction that were discarded during the update
1551    pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1552}
1553
1554impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1555    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1556    /// [`TransactionListenerKind`].
1557    ///
1558    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1559    /// are allowed to be propagated are returned.
1560    pub(crate) fn pending_transactions(
1561        &self,
1562        kind: TransactionListenerKind,
1563    ) -> impl Iterator<Item = B256> + '_ {
1564        let iter = self.promoted.iter();
1565        PendingTransactionIter { kind, iter }
1566    }
1567
1568    /// Returns all FULL transactions that were promoted to the pending pool and adhere to the given
1569    /// [`TransactionListenerKind`].
1570    ///
1571    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1572    /// are allowed to be propagated are returned.
1573    pub(crate) fn full_pending_transactions(
1574        &self,
1575        kind: TransactionListenerKind,
1576    ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1577        let iter = self.promoted.iter();
1578        FullPendingTransactionIter { kind, iter }
1579    }
1580}
1581
1582#[cfg(test)]
1583mod tests {
1584    use crate::{
1585        blobstore::{BlobStore, InMemoryBlobStore},
1586        identifier::SenderId,
1587        test_utils::{MockTransaction, TestPoolBuilder},
1588        validate::ValidTransaction,
1589        BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1590    };
1591    use alloy_eips::{eip4844::BlobTransactionSidecar, eip7594::BlobTransactionSidecarVariant};
1592    use alloy_primitives::Address;
1593    use std::{fs, path::PathBuf};
1594
1595    #[test]
1596    fn test_discard_blobs_on_blob_tx_eviction() {
1597        let blobs = {
1598            // Read the contents of the JSON file into a string.
1599            let json_content = fs::read_to_string(
1600                PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1601            )
1602            .expect("Failed to read the blob data file");
1603
1604            // Parse the JSON contents into a serde_json::Value.
1605            let json_value: serde_json::Value =
1606                serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1607
1608            // Extract blob data from JSON and convert it to Blob.
1609            vec![
1610                // Extract the "data" field from the JSON and parse it as a string.
1611                json_value
1612                    .get("data")
1613                    .unwrap()
1614                    .as_str()
1615                    .expect("Data is not a valid string")
1616                    .to_string(),
1617            ]
1618        };
1619
1620        // Generate a BlobTransactionSidecar from the blobs.
1621        let sidecar = BlobTransactionSidecarVariant::Eip4844(
1622            BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap(),
1623        );
1624
1625        // Define the maximum limit for blobs in the sub-pool.
1626        let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1627
1628        // Create a test pool with default configuration and the specified blob limit.
1629        let test_pool = &TestPoolBuilder::default()
1630            .with_config(PoolConfig { blob_limit, ..Default::default() })
1631            .pool;
1632
1633        // Set the block info for the pool, including a pending blob fee.
1634        test_pool
1635            .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1636
1637        // Create an in-memory blob store.
1638        let blob_store = InMemoryBlobStore::default();
1639
1640        // Loop to add transactions to the pool and test blob eviction.
1641        for n in 0..blob_limit.max_txs + 10 {
1642            // Create a mock transaction with the generated blob sidecar.
1643            let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1644
1645            // Set non zero size
1646            tx.set_size(1844674407370951);
1647
1648            // Insert the sidecar into the blob store if the current index is within the blob limit.
1649            if n < blob_limit.max_txs {
1650                blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1651            }
1652
1653            // Add the transaction to the pool with external origin and valid outcome.
1654            test_pool.add_transactions(
1655                TransactionOrigin::External,
1656                [TransactionValidationOutcome::Valid {
1657                    balance: U256::from(1_000),
1658                    state_nonce: 0,
1659                    bytecode_hash: None,
1660                    transaction: ValidTransaction::ValidWithSidecar {
1661                        transaction: tx,
1662                        sidecar: sidecar.clone(),
1663                    },
1664                    propagate: true,
1665                    authorities: None,
1666                }],
1667            );
1668        }
1669
1670        // Assert that the size of the pool's blob component is equal to the maximum blob limit.
1671        assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1672
1673        // Assert that the size of the pool's blob_size component matches the expected value.
1674        assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1675
1676        // Assert that the pool's blob store matches the expected blob store.
1677        assert_eq!(*test_pool.blob_store(), blob_store);
1678    }
1679
1680    #[test]
1681    fn test_auths_stored_in_identifiers() {
1682        // Create a test pool with default configuration.
1683        let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1684
1685        let auth = Address::new([1; 20]);
1686        let tx = MockTransaction::eip7702();
1687
1688        test_pool.add_transactions(
1689            TransactionOrigin::Local,
1690            [TransactionValidationOutcome::Valid {
1691                balance: U256::from(1_000),
1692                state_nonce: 0,
1693                bytecode_hash: None,
1694                transaction: ValidTransaction::Valid(tx),
1695                propagate: true,
1696                authorities: Some(vec![auth]),
1697            }],
1698        );
1699
1700        let identifiers = test_pool.identifiers.read();
1701        assert_eq!(identifiers.sender_id(&auth), Some(SenderId::from(1)));
1702    }
1703}