reth_transaction_pool/pool/
mod.rs

1//! Transaction Pool internals.
2//!
3//! Incoming transactions are validated before they enter the pool first. The validation outcome can
4//! have 3 states:
5//!
6//!  1. Transaction can _never_ be valid
7//!  2. Transaction is _currently_ valid
8//!  3. Transaction is _currently_ invalid, but could potentially become valid in the future
9//!
10//! However, (2.) and (3.) of a transaction can only be determined on the basis of the current
11//! state, whereas (1.) holds indefinitely. This means once the state changes (2.) and (3.) the
12//! state of a transaction needs to be reevaluated again.
13//!
14//! The transaction pool is responsible for storing new, valid transactions and providing the next
15//! best transactions sorted by their priority. Where priority is determined by the transaction's
16//! score ([`TransactionOrdering`]).
17//!
18//! Furthermore, the following characteristics fall under (3.):
19//!
20//!  a) Nonce of a transaction is higher than the expected nonce for the next transaction of its
21//! sender. A distinction is made here whether multiple transactions from the same sender have
22//! gapless nonce increments.
23//!
24//!  a)(1) If _no_ transaction is missing in a chain of multiple
25//! transactions from the same sender (all nonce in row), all of them can in principle be executed
26//! on the current state one after the other.
27//!
28//!  a)(2) If there's a nonce gap, then all
29//! transactions after the missing transaction are blocked until the missing transaction arrives.
30//!
31//!  b) Transaction does not meet the dynamic fee cap requirement introduced by EIP-1559: The
32//! fee cap of the transaction needs to be no less than the base fee of block.
33//!
34//!
35//! In essence the transaction pool is made of three separate sub-pools:
36//!
37//!  - Pending Pool: Contains all transactions that are valid on the current state and satisfy (3.
38//!    a)(1): _No_ nonce gaps. A _pending_ transaction is considered _ready_ when it has the lowest
39//!    nonce of all transactions from the same sender. Once a _ready_ transaction with nonce `n` has
40//!    been executed, the next highest transaction from the same sender `n + 1` becomes ready.
41//!
42//!  - Queued Pool: Contains all transactions that are currently blocked by missing transactions:
43//!    (3. a)(2): _With_ nonce gaps or due to lack of funds.
44//!
45//!  - Basefee Pool: To account for the dynamic base fee requirement (3. b) which could render an
46//!    EIP-1559 and all subsequent transactions of the sender currently invalid.
47//!
48//! The classification of transactions is always dependent on the current state that is changed as
49//! soon as a new block is mined. Once a new block is mined, the account changeset must be applied
50//! to the transaction pool.
51//!
52//!
53//! Depending on the use case, consumers of the [`TransactionPool`](crate::traits::TransactionPool)
54//! are interested in (2.) and/or (3.).
55
56//! A generic [`TransactionPool`](crate::traits::TransactionPool) that only handles transactions.
57//!
58//! This Pool maintains two separate sub-pools for (2.) and (3.)
59//!
60//! ## Terminology
61//!
62//!  - _Pending_: pending transactions are transactions that fall under (2.). These transactions can
63//!    currently be executed and are stored in the pending sub-pool
64//!  - _Queued_: queued transactions are transactions that fall under category (3.). Those
65//!    transactions are _currently_ waiting for state changes that eventually move them into
66//!    category (2.) and become pending.
67
68use crate::{
69    blobstore::BlobStore,
70    error::{PoolError, PoolErrorKind, PoolResult},
71    identifier::{SenderId, SenderIdentifiers, TransactionId},
72    metrics::BlobStoreMetrics,
73    pool::{
74        listener::{
75            BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76            TransactionListener,
77        },
78        state::SubPool,
79        txpool::{SenderInfo, TxPool},
80        update::UpdateOutcome,
81    },
82    traits::{
83        AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84        NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85    },
86    validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87    CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88    TransactionValidator,
89};
90
91use alloy_primitives::{Address, TxHash, B256};
92use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
93use reth_eth_wire_types::HandleMempoolData;
94use reth_execution_types::ChangedAccount;
95
96use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718};
97use reth_primitives_traits::Recovered;
98use rustc_hash::FxHashMap;
99use std::{
100    collections::HashSet,
101    fmt,
102    sync::{
103        atomic::{AtomicBool, Ordering},
104        Arc,
105    },
106    time::Instant,
107};
108use tokio::sync::mpsc;
109use tracing::{debug, trace, warn};
110mod events;
111pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
112pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
113pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
114pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
115pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
116pub use pending::PendingPool;
117use reth_primitives_traits::Block;
118
119mod best;
120pub use best::BestTransactions;
121
122mod blob;
123pub mod listener;
124mod parked;
125pub mod pending;
126pub mod size;
127pub(crate) mod state;
128pub mod txpool;
129mod update;
130
131/// Bound on number of pending transactions from `reth_network::TransactionsManager` to buffer.
132pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
133/// Bound on number of new transactions from `reth_network::TransactionsManager` to buffer.
134pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
135
136const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
137
138/// Transaction pool internals.
139pub struct PoolInner<V, T, S>
140where
141    T: TransactionOrdering,
142{
143    /// Internal mapping of addresses to plain ints.
144    identifiers: RwLock<SenderIdentifiers>,
145    /// Transaction validator.
146    validator: V,
147    /// Storage for blob transactions
148    blob_store: S,
149    /// The internal pool that manages all transactions.
150    pool: RwLock<TxPool<T>>,
151    /// Pool settings.
152    config: PoolConfig,
153    /// Manages listeners for transaction state change events.
154    event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
155    /// Tracks whether any event listeners have ever been installed.
156    has_event_listeners: AtomicBool,
157    /// Listeners for new _full_ pending transactions.
158    pending_transaction_listener: RwLock<Vec<PendingTransactionHashListener>>,
159    /// Listeners for new transactions added to the pool.
160    transaction_listener: RwLock<Vec<TransactionListener<T::Transaction>>>,
161    /// Listener for new blob transaction sidecars added to the pool.
162    blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
163    /// Metrics for the blob store
164    blob_store_metrics: BlobStoreMetrics,
165}
166
167// === impl PoolInner ===
168
169impl<V, T, S> PoolInner<V, T, S>
170where
171    V: TransactionValidator,
172    T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
173    S: BlobStore,
174{
175    /// Create a new transaction pool instance.
176    pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
177        Self {
178            identifiers: Default::default(),
179            validator,
180            event_listener: Default::default(),
181            has_event_listeners: AtomicBool::new(false),
182            pool: RwLock::new(TxPool::new(ordering, config.clone())),
183            pending_transaction_listener: Default::default(),
184            transaction_listener: Default::default(),
185            blob_transaction_sidecar_listener: Default::default(),
186            config,
187            blob_store,
188            blob_store_metrics: Default::default(),
189        }
190    }
191
192    /// Returns the configured blob store.
193    pub const fn blob_store(&self) -> &S {
194        &self.blob_store
195    }
196
197    /// Returns stats about the size of the pool.
198    pub fn size(&self) -> PoolSize {
199        self.get_pool_data().size()
200    }
201
202    /// Returns the currently tracked block
203    pub fn block_info(&self) -> BlockInfo {
204        self.get_pool_data().block_info()
205    }
206    /// Sets the currently tracked block
207    pub fn set_block_info(&self, info: BlockInfo) {
208        self.pool.write().set_block_info(info)
209    }
210
211    /// Returns the internal [`SenderId`] for this address
212    pub fn get_sender_id(&self, addr: Address) -> SenderId {
213        self.identifiers.write().sender_id_or_create(addr)
214    }
215
216    /// Returns the internal [`SenderId`]s for the given addresses.
217    pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
218        self.identifiers.write().sender_ids_or_create(addrs)
219    }
220
221    /// Returns all senders in the pool
222    pub fn unique_senders(&self) -> HashSet<Address> {
223        self.get_pool_data().unique_senders()
224    }
225
226    /// Converts the changed accounts to a map of sender ids to sender info (internal identifier
227    /// used for __tracked__ accounts)
228    fn changed_senders(
229        &self,
230        accs: impl Iterator<Item = ChangedAccount>,
231    ) -> FxHashMap<SenderId, SenderInfo> {
232        let identifiers = self.identifiers.read();
233        accs.into_iter()
234            .filter_map(|acc| {
235                let ChangedAccount { address, nonce, balance } = acc;
236                let sender_id = identifiers.sender_id(&address)?;
237                Some((sender_id, SenderInfo { state_nonce: nonce, balance }))
238            })
239            .collect()
240    }
241
242    /// Get the config the pool was configured with.
243    pub const fn config(&self) -> &PoolConfig {
244        &self.config
245    }
246
247    /// Get the validator reference.
248    pub const fn validator(&self) -> &V {
249        &self.validator
250    }
251
252    /// Adds a new transaction listener to the pool that gets notified about every new _pending_
253    /// transaction inserted into the pool
254    pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
255        let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
256        let listener = PendingTransactionHashListener { sender, kind };
257
258        let mut listeners = self.pending_transaction_listener.write();
259        // Clean up dead listeners before adding new one
260        listeners.retain(|l| !l.sender.is_closed());
261        listeners.push(listener);
262
263        rx
264    }
265
266    /// Adds a new transaction listener to the pool that gets notified about every new transaction.
267    pub fn add_new_transaction_listener(
268        &self,
269        kind: TransactionListenerKind,
270    ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
271        let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
272        let listener = TransactionListener { sender, kind };
273
274        let mut listeners = self.transaction_listener.write();
275        // Clean up dead listeners before adding new one
276        listeners.retain(|l| !l.sender.is_closed());
277        listeners.push(listener);
278
279        rx
280    }
281    /// Adds a new blob sidecar listener to the pool that gets notified about every new
282    /// eip4844 transaction's blob sidecar.
283    pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
284        let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
285        let listener = BlobTransactionSidecarListener { sender };
286        self.blob_transaction_sidecar_listener.lock().push(listener);
287        rx
288    }
289
290    /// If the pool contains the transaction, this adds a new listener that gets notified about
291    /// transaction events.
292    pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
293        if !self.get_pool_data().contains(&tx_hash) {
294            return None
295        }
296        let mut listener = self.event_listener.write();
297        let events = listener.subscribe(tx_hash);
298        self.mark_event_listener_installed();
299        Some(events)
300    }
301
302    /// Adds a listener for all transaction events.
303    pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
304        let mut listener = self.event_listener.write();
305        let events = listener.subscribe_all();
306        self.mark_event_listener_installed();
307        events
308    }
309
310    #[inline]
311    fn has_event_listeners(&self) -> bool {
312        self.has_event_listeners.load(Ordering::Relaxed)
313    }
314
315    #[inline]
316    fn mark_event_listener_installed(&self) {
317        self.has_event_listeners.store(true, Ordering::Relaxed);
318    }
319
320    #[inline]
321    fn update_event_listener_state(&self, listener: &PoolEventBroadcast<T::Transaction>) {
322        if listener.is_empty() {
323            self.has_event_listeners.store(false, Ordering::Relaxed);
324        }
325    }
326
327    #[inline]
328    fn with_event_listener<F>(&self, emit: F)
329    where
330        F: FnOnce(&mut PoolEventBroadcast<T::Transaction>),
331    {
332        if !self.has_event_listeners() {
333            return
334        }
335        let mut listener = self.event_listener.write();
336        if !listener.is_empty() {
337            emit(&mut listener);
338        }
339        self.update_event_listener_state(&listener);
340    }
341
342    /// Returns a read lock to the pool's data.
343    pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
344        self.pool.read()
345    }
346
347    /// Returns transactions in the pool that can be propagated
348    pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
349        let mut out = Vec::new();
350        self.append_pooled_transactions(&mut out);
351        out
352    }
353
354    /// Returns hashes of transactions in the pool that can be propagated.
355    pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
356        let mut out = Vec::new();
357        self.append_pooled_transactions_hashes(&mut out);
358        out
359    }
360
361    /// Returns only the first `max` transactions in the pool that can be propagated.
362    pub fn pooled_transactions_max(
363        &self,
364        max: usize,
365    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
366        let mut out = Vec::new();
367        self.append_pooled_transactions_max(max, &mut out);
368        out
369    }
370
371    /// Extends the given vector with all transactions in the pool that can be propagated.
372    pub fn append_pooled_transactions(
373        &self,
374        out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
375    ) {
376        out.extend(
377            self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned(),
378        );
379    }
380
381    /// Extends the given vector with pooled transactions for the given hashes that are allowed to
382    /// be propagated.
383    pub fn append_pooled_transaction_elements(
384        &self,
385        tx_hashes: &[TxHash],
386        limit: GetPooledTransactionLimit,
387        out: &mut Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>,
388    ) where
389        <V as TransactionValidator>::Transaction: EthPoolTransaction,
390    {
391        let transactions = self.get_all_propagatable(tx_hashes);
392        let mut size = 0;
393        for transaction in transactions {
394            let encoded_len = transaction.encoded_length();
395            let Some(pooled) = self.to_pooled_transaction(transaction) else {
396                continue;
397            };
398
399            size += encoded_len;
400            out.push(pooled.into_inner());
401
402            if limit.exceeds(size) {
403                break
404            }
405        }
406    }
407
408    /// Extends the given vector with the hashes of all transactions in the pool that can be
409    /// propagated.
410    pub fn append_pooled_transactions_hashes(&self, out: &mut Vec<TxHash>) {
411        out.extend(
412            self.get_pool_data()
413                .all()
414                .transactions_iter()
415                .filter(|tx| tx.propagate)
416                .map(|tx| *tx.hash()),
417        );
418    }
419
420    /// Extends the given vector with only the first `max` transactions in the pool that can be
421    /// propagated.
422    pub fn append_pooled_transactions_max(
423        &self,
424        max: usize,
425        out: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
426    ) {
427        out.extend(
428            self.get_pool_data()
429                .all()
430                .transactions_iter()
431                .filter(|tx| tx.propagate)
432                .take(max)
433                .cloned(),
434        );
435    }
436
437    /// Returns only the first `max` hashes of transactions in the pool that can be propagated.
438    pub fn pooled_transactions_hashes_max(&self, max: usize) -> Vec<TxHash> {
439        if max == 0 {
440            return Vec::new();
441        }
442        self.get_pool_data()
443            .all()
444            .transactions_iter()
445            .filter(|tx| tx.propagate)
446            .take(max)
447            .map(|tx| *tx.hash())
448            .collect()
449    }
450
451    /// Converts the internally tracked transaction to the pooled format.
452    ///
453    /// If the transaction is an EIP-4844 transaction, the blob sidecar is fetched from the blob
454    /// store and attached to the transaction.
455    fn to_pooled_transaction(
456        &self,
457        transaction: Arc<ValidPoolTransaction<T::Transaction>>,
458    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
459    where
460        <V as TransactionValidator>::Transaction: EthPoolTransaction,
461    {
462        if transaction.is_eip4844() {
463            let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
464            transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
465        } else {
466            transaction
467                .transaction
468                .clone_into_pooled()
469                .inspect_err(|err| {
470                    debug!(
471                        target: "txpool", %err,
472                        "failed to convert transaction to pooled element; skipping",
473                    );
474                })
475                .ok()
476        }
477    }
478
479    /// Returns pooled transactions for the given transaction hashes that are allowed to be
480    /// propagated.
481    pub fn get_pooled_transaction_elements(
482        &self,
483        tx_hashes: Vec<TxHash>,
484        limit: GetPooledTransactionLimit,
485    ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
486    where
487        <V as TransactionValidator>::Transaction: EthPoolTransaction,
488    {
489        let mut elements = Vec::new();
490        self.append_pooled_transaction_elements(&tx_hashes, limit, &mut elements);
491        elements.shrink_to_fit();
492        elements
493    }
494
495    /// Returns converted pooled transaction for the given transaction hash.
496    pub fn get_pooled_transaction_element(
497        &self,
498        tx_hash: TxHash,
499    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
500    where
501        <V as TransactionValidator>::Transaction: EthPoolTransaction,
502    {
503        self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
504    }
505
506    /// Updates the entire pool after a new block was executed.
507    pub fn on_canonical_state_change<B>(&self, update: CanonicalStateUpdate<'_, B>)
508    where
509        B: Block,
510    {
511        trace!(target: "txpool", ?update, "updating pool on canonical state change");
512
513        let block_info = update.block_info();
514        let CanonicalStateUpdate {
515            new_tip, changed_accounts, mined_transactions, update_kind, ..
516        } = update;
517        self.validator.on_new_head_block(new_tip);
518
519        let changed_senders = self.changed_senders(changed_accounts.into_iter());
520
521        // update the pool
522        let outcome = self.pool.write().on_canonical_state_change(
523            block_info,
524            mined_transactions,
525            changed_senders,
526            update_kind,
527        );
528
529        // This will discard outdated transactions based on the account's nonce
530        self.delete_discarded_blobs(outcome.discarded.iter());
531
532        // notify listeners about updates
533        self.notify_on_new_state(outcome);
534    }
535
536    /// Performs account updates on the pool.
537    ///
538    /// This will either promote or discard transactions based on the new account state.
539    ///
540    /// This should be invoked when the pool drifted and accounts are updated manually
541    pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
542        let changed_senders = self.changed_senders(accounts.into_iter());
543        let UpdateOutcome { promoted, discarded } =
544            self.pool.write().update_accounts(changed_senders);
545
546        self.notify_on_transaction_updates(promoted, discarded);
547    }
548
549    /// Add a single validated transaction into the pool.
550    ///
551    /// Returns the outcome and optionally metadata to be processed after the pool lock is
552    /// released.
553    ///
554    /// Note: this is only used internally by [`Self::add_transactions()`], all new transaction(s)
555    /// come in through that function, either as a batch or `std::iter::once`.
556    fn add_transaction(
557        &self,
558        pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
559        origin: TransactionOrigin,
560        tx: TransactionValidationOutcome<T::Transaction>,
561    ) -> (PoolResult<AddedTransactionOutcome>, Option<AddedTransactionMeta<T::Transaction>>) {
562        match tx {
563            TransactionValidationOutcome::Valid {
564                balance,
565                state_nonce,
566                transaction,
567                propagate,
568                bytecode_hash,
569                authorities,
570            } => {
571                let sender_id = self.get_sender_id(transaction.sender());
572                let transaction_id = TransactionId::new(sender_id, transaction.nonce());
573
574                // split the valid transaction and the blob sidecar if it has any
575                let (transaction, blob_sidecar) = match transaction {
576                    ValidTransaction::Valid(tx) => (tx, None),
577                    ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
578                        debug_assert!(
579                            transaction.is_eip4844(),
580                            "validator returned sidecar for non EIP-4844 transaction"
581                        );
582                        (transaction, Some(sidecar))
583                    }
584                };
585
586                let tx = ValidPoolTransaction {
587                    transaction,
588                    transaction_id,
589                    propagate,
590                    timestamp: Instant::now(),
591                    origin,
592                    authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
593                };
594
595                let added = match pool.add_transaction(tx, balance, state_nonce, bytecode_hash) {
596                    Ok(added) => added,
597                    Err(err) => return (Err(err), None),
598                };
599                let hash = *added.hash();
600                let state = added.transaction_state();
601
602                let meta = AddedTransactionMeta { added, blob_sidecar };
603
604                (Ok(AddedTransactionOutcome { hash, state }), Some(meta))
605            }
606            TransactionValidationOutcome::Invalid(tx, err) => {
607                self.with_event_listener(|listener| listener.invalid(tx.hash()));
608                (Err(PoolError::new(*tx.hash(), err)), None)
609            }
610            TransactionValidationOutcome::Error(tx_hash, err) => {
611                self.with_event_listener(|listener| listener.discarded(&tx_hash));
612                (Err(PoolError::other(tx_hash, err)), None)
613            }
614        }
615    }
616
617    /// Adds a transaction and returns the event stream.
618    pub fn add_transaction_and_subscribe(
619        &self,
620        origin: TransactionOrigin,
621        tx: TransactionValidationOutcome<T::Transaction>,
622    ) -> PoolResult<TransactionEvents> {
623        let listener = {
624            let mut listener = self.event_listener.write();
625            let events = listener.subscribe(tx.tx_hash());
626            self.mark_event_listener_installed();
627            events
628        };
629        let mut results = self.add_transactions(origin, std::iter::once(tx));
630        results.pop().expect("result length is the same as the input")?;
631        Ok(listener)
632    }
633
634    /// Adds all transactions in the iterator to the pool, returning a list of results.
635    pub fn add_transactions(
636        &self,
637        origin: TransactionOrigin,
638        transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
639    ) -> Vec<PoolResult<AddedTransactionOutcome>> {
640        // Collect results and metadata while holding the pool write lock
641        let (mut results, added_metas, discarded) = {
642            let mut pool = self.pool.write();
643            let mut added_metas = Vec::new();
644
645            let results = transactions
646                .into_iter()
647                .map(|tx| {
648                    let (result, meta) = self.add_transaction(&mut pool, origin, tx);
649
650                    // Only collect metadata for successful insertions
651                    if result.is_ok() &&
652                        let Some(meta) = meta
653                    {
654                        added_metas.push(meta);
655                    }
656
657                    result
658                })
659                .collect::<Vec<_>>();
660
661            // Enforce the pool size limits if at least one transaction was added successfully
662            let discarded = if results.iter().any(Result::is_ok) {
663                pool.discard_worst()
664            } else {
665                Default::default()
666            };
667
668            (results, added_metas, discarded)
669        };
670
671        for meta in added_metas {
672            self.on_added_transaction(meta);
673        }
674
675        if !discarded.is_empty() {
676            // Delete any blobs associated with discarded blob transactions
677            self.delete_discarded_blobs(discarded.iter());
678            self.with_event_listener(|listener| listener.discarded_many(&discarded));
679
680            let discarded_hashes =
681                discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
682
683            // A newly added transaction may be immediately discarded, so we need to
684            // adjust the result here
685            for res in &mut results {
686                if let Ok(AddedTransactionOutcome { hash, .. }) = res &&
687                    discarded_hashes.contains(hash)
688                {
689                    *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
690                }
691            }
692        };
693
694        results
695    }
696
697    /// Process a transaction that was added to the pool.
698    ///
699    /// Performs blob storage operations and sends all notifications. This should be called
700    /// after the pool write lock has been released to avoid blocking pool operations.
701    fn on_added_transaction(&self, meta: AddedTransactionMeta<T::Transaction>) {
702        // Handle blob sidecar storage and notifications for EIP-4844 transactions
703        if let Some(sidecar) = meta.blob_sidecar {
704            let hash = *meta.added.hash();
705            self.on_new_blob_sidecar(&hash, &sidecar);
706            self.insert_blob(hash, sidecar);
707        }
708
709        // Delete replaced blob sidecar if any
710        if let Some(replaced) = meta.added.replaced_blob_transaction() {
711            debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
712            self.delete_blob(replaced);
713        }
714
715        // Delete discarded blob sidecars if any, this doesnt do any IO.
716        if let Some(discarded) = meta.added.discarded_transactions() {
717            self.delete_discarded_blobs(discarded.iter());
718        }
719
720        // Notify pending transaction listeners
721        if let Some(pending) = meta.added.as_pending() {
722            self.on_new_pending_transaction(pending);
723        }
724
725        // Notify event listeners
726        self.notify_event_listeners(&meta.added);
727
728        // Notify new transaction listeners
729        self.on_new_transaction(meta.added.into_new_transaction_event());
730    }
731
732    /// Notify all listeners about a new pending transaction.
733    ///
734    /// See also [`Self::add_pending_listener`]
735    ///
736    /// CAUTION: This function is only intended to be used manually in order to use this type's
737    /// pending transaction receivers when manually implementing the
738    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
739    /// [`TransactionPool::pending_transactions_listener_for`](crate::TransactionPool).
740    pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
741        let mut needs_cleanup = false;
742
743        {
744            let listeners = self.pending_transaction_listener.read();
745            for listener in listeners.iter() {
746                if !listener.send_all(pending.pending_transactions(listener.kind)) {
747                    needs_cleanup = true;
748                }
749            }
750        }
751
752        // Clean up dead listeners if we detected any closed channels
753        if needs_cleanup {
754            self.pending_transaction_listener
755                .write()
756                .retain(|listener| !listener.sender.is_closed());
757        }
758    }
759
760    /// Notify all listeners about a newly inserted pending transaction.
761    ///
762    /// See also [`Self::add_new_transaction_listener`]
763    ///
764    /// CAUTION: This function is only intended to be used manually in order to use this type's
765    /// transaction receivers when manually implementing the
766    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
767    /// [`TransactionPool::new_transactions_listener_for`](crate::TransactionPool).
768    pub fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
769        let mut needs_cleanup = false;
770
771        {
772            let listeners = self.transaction_listener.read();
773            for listener in listeners.iter() {
774                if listener.kind.is_propagate_only() && !event.transaction.propagate {
775                    if listener.sender.is_closed() {
776                        needs_cleanup = true;
777                    }
778                    // Skip non-propagate transactions for propagate-only listeners
779                    continue
780                }
781
782                if !listener.send(event.clone()) {
783                    needs_cleanup = true;
784                }
785            }
786        }
787
788        // Clean up dead listeners if we detected any closed channels
789        if needs_cleanup {
790            self.transaction_listener.write().retain(|listener| !listener.sender.is_closed());
791        }
792    }
793
794    /// Notify all listeners about a blob sidecar for a newly inserted blob (eip4844) transaction.
795    fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecarVariant) {
796        let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
797        if sidecar_listeners.is_empty() {
798            return
799        }
800        let sidecar = Arc::new(sidecar.clone());
801        sidecar_listeners.retain_mut(|listener| {
802            let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
803            match listener.sender.try_send(new_blob_event) {
804                Ok(()) => true,
805                Err(err) => {
806                    if matches!(err, mpsc::error::TrySendError::Full(_)) {
807                        debug!(
808                            target: "txpool",
809                            "[{:?}] failed to send blob sidecar; channel full",
810                            sidecar,
811                        );
812                        true
813                    } else {
814                        false
815                    }
816                }
817            }
818        })
819    }
820
821    /// Notifies transaction listeners about changes once a block was processed.
822    fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
823        trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
824
825        // notify about promoted pending transactions - emit hashes
826        let mut needs_pending_cleanup = false;
827        {
828            let listeners = self.pending_transaction_listener.read();
829            for listener in listeners.iter() {
830                if !listener.send_all(outcome.pending_transactions(listener.kind)) {
831                    needs_pending_cleanup = true;
832                }
833            }
834        }
835        if needs_pending_cleanup {
836            self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
837        }
838
839        // emit full transactions
840        let mut needs_tx_cleanup = false;
841        {
842            let listeners = self.transaction_listener.read();
843            for listener in listeners.iter() {
844                if !listener.send_all(outcome.full_pending_transactions(listener.kind)) {
845                    needs_tx_cleanup = true;
846                }
847            }
848        }
849        if needs_tx_cleanup {
850            self.transaction_listener.write().retain(|l| !l.sender.is_closed());
851        }
852
853        let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
854
855        // broadcast specific transaction events
856        self.with_event_listener(|listener| {
857            for tx in &mined {
858                listener.mined(tx, block_hash);
859            }
860            for tx in &promoted {
861                listener.pending(tx.hash(), None);
862            }
863            for tx in &discarded {
864                listener.discarded(tx.hash());
865            }
866        })
867    }
868
869    /// Notifies all listeners about the transaction movements.
870    ///
871    /// This will emit events according to the provided changes.
872    ///
873    /// CAUTION: This function is only intended to be used manually in order to use this type's
874    /// [`TransactionEvents`] receivers when manually implementing the
875    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
876    /// [`TransactionPool::transaction_event_listener`](crate::TransactionPool).
877    #[allow(clippy::type_complexity)]
878    pub fn notify_on_transaction_updates(
879        &self,
880        promoted: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
881        discarded: Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
882    ) {
883        // Notify about promoted pending transactions (similar to notify_on_new_state)
884        if !promoted.is_empty() {
885            let mut needs_pending_cleanup = false;
886            {
887                let listeners = self.pending_transaction_listener.read();
888                for listener in listeners.iter() {
889                    let promoted_hashes = promoted.iter().filter_map(|tx| {
890                        if listener.kind.is_propagate_only() && !tx.propagate {
891                            None
892                        } else {
893                            Some(*tx.hash())
894                        }
895                    });
896                    if !listener.send_all(promoted_hashes) {
897                        needs_pending_cleanup = true;
898                    }
899                }
900            }
901            if needs_pending_cleanup {
902                self.pending_transaction_listener.write().retain(|l| !l.sender.is_closed());
903            }
904
905            // in this case we should also emit promoted transactions in full
906            let mut needs_tx_cleanup = false;
907            {
908                let listeners = self.transaction_listener.read();
909                for listener in listeners.iter() {
910                    let promoted_txs = promoted.iter().filter_map(|tx| {
911                        if listener.kind.is_propagate_only() && !tx.propagate {
912                            None
913                        } else {
914                            Some(NewTransactionEvent::pending(tx.clone()))
915                        }
916                    });
917                    if !listener.send_all(promoted_txs) {
918                        needs_tx_cleanup = true;
919                    }
920                }
921            }
922            if needs_tx_cleanup {
923                self.transaction_listener.write().retain(|l| !l.sender.is_closed());
924            }
925        }
926
927        self.with_event_listener(|listener| {
928            for tx in &promoted {
929                listener.pending(tx.hash(), None);
930            }
931            for tx in &discarded {
932                listener.discarded(tx.hash());
933            }
934        });
935
936        if !discarded.is_empty() {
937            // This deletes outdated blob txs from the blob store, based on the account's nonce.
938            // This is called during txpool maintenance when the pool drifted.
939            self.delete_discarded_blobs(discarded.iter());
940        }
941    }
942
943    /// Fire events for the newly added transaction if there are any.
944    ///
945    /// See also [`Self::add_transaction_event_listener`].
946    ///
947    /// CAUTION: This function is only intended to be used manually in order to use this type's
948    /// [`TransactionEvents`] receivers when manually implementing the
949    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
950    /// [`TransactionPool::transaction_event_listener`](crate::TransactionPool).
951    pub fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
952        self.with_event_listener(|listener| match tx {
953            AddedTransaction::Pending(tx) => {
954                let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
955
956                listener.pending(transaction.hash(), replaced.clone());
957                for tx in promoted {
958                    listener.pending(tx.hash(), None);
959                }
960                for tx in discarded {
961                    listener.discarded(tx.hash());
962                }
963            }
964            AddedTransaction::Parked { transaction, replaced, queued_reason, .. } => {
965                listener.queued(transaction.hash(), queued_reason.clone());
966                if let Some(replaced) = replaced {
967                    listener.replaced(replaced.clone(), *transaction.hash());
968                }
969            }
970        });
971    }
972
973    /// Returns an iterator that yields transactions that are ready to be included in the block.
974    pub fn best_transactions(&self) -> BestTransactions<T> {
975        self.get_pool_data().best_transactions()
976    }
977
978    /// Returns an iterator that yields transactions that are ready to be included in the block with
979    /// the given base fee and optional blob fee attributes.
980    pub fn best_transactions_with_attributes(
981        &self,
982        best_transactions_attributes: BestTransactionsAttributes,
983    ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
984    {
985        self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
986    }
987
988    /// Returns only the first `max` transactions in the pending pool.
989    pub fn pending_transactions_max(
990        &self,
991        max: usize,
992    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
993        self.get_pool_data().pending_transactions_iter().take(max).collect()
994    }
995
996    /// Returns all transactions from the pending sub-pool
997    pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
998        self.get_pool_data().pending_transactions()
999    }
1000
1001    /// Returns all transactions from parked pools
1002    pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1003        self.get_pool_data().queued_transactions()
1004    }
1005
1006    /// Returns all transactions in the pool
1007    pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
1008        let pool = self.get_pool_data();
1009        AllPoolTransactions {
1010            pending: pool.pending_transactions(),
1011            queued: pool.queued_transactions(),
1012        }
1013    }
1014
1015    /// Returns _all_ transactions in the pool
1016    pub fn all_transaction_hashes(&self) -> Vec<TxHash> {
1017        self.get_pool_data().all().transactions_iter().map(|tx| *tx.hash()).collect()
1018    }
1019
1020    /// Removes and returns all matching transactions from the pool.
1021    ///
1022    /// This behaves as if the transactions got discarded (_not_ mined), effectively introducing a
1023    /// nonce gap for the given transactions.
1024    pub fn remove_transactions(
1025        &self,
1026        hashes: Vec<TxHash>,
1027    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1028        if hashes.is_empty() {
1029            return Vec::new()
1030        }
1031        let removed = self.pool.write().remove_transactions(hashes);
1032
1033        self.with_event_listener(|listener| listener.discarded_many(&removed));
1034
1035        removed
1036    }
1037
1038    /// Removes and returns all matching transactions and their dependent transactions from the
1039    /// pool.
1040    pub fn remove_transactions_and_descendants(
1041        &self,
1042        hashes: Vec<TxHash>,
1043    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1044        if hashes.is_empty() {
1045            return Vec::new()
1046        }
1047        let removed = self.pool.write().remove_transactions_and_descendants(hashes);
1048
1049        self.with_event_listener(|listener| {
1050            for tx in &removed {
1051                listener.discarded(tx.hash());
1052            }
1053        });
1054
1055        removed
1056    }
1057
1058    /// Removes and returns all transactions by the specified sender from the pool.
1059    pub fn remove_transactions_by_sender(
1060        &self,
1061        sender: Address,
1062    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1063        let sender_id = self.get_sender_id(sender);
1064        let removed = self.pool.write().remove_transactions_by_sender(sender_id);
1065
1066        self.with_event_listener(|listener| listener.discarded_many(&removed));
1067
1068        removed
1069    }
1070
1071    /// Removes and returns all transactions that are present in the pool.
1072    pub fn retain_unknown<A>(&self, announcement: &mut A)
1073    where
1074        A: HandleMempoolData,
1075    {
1076        if announcement.is_empty() {
1077            return
1078        }
1079        let pool = self.get_pool_data();
1080        announcement.retain_by_hash(|tx| !pool.contains(tx))
1081    }
1082
1083    /// Returns the transaction by hash.
1084    pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1085        self.get_pool_data().get(tx_hash)
1086    }
1087
1088    /// Returns all transactions of the address
1089    pub fn get_transactions_by_sender(
1090        &self,
1091        sender: Address,
1092    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1093        let sender_id = self.get_sender_id(sender);
1094        self.get_pool_data().get_transactions_by_sender(sender_id)
1095    }
1096
1097    /// Returns all queued transactions of the address by sender
1098    pub fn get_queued_transactions_by_sender(
1099        &self,
1100        sender: Address,
1101    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1102        let sender_id = self.get_sender_id(sender);
1103        self.get_pool_data().queued_txs_by_sender(sender_id)
1104    }
1105
1106    /// Returns all pending transactions filtered by predicate
1107    pub fn pending_transactions_with_predicate(
1108        &self,
1109        predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
1110    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1111        self.get_pool_data().pending_transactions_with_predicate(predicate)
1112    }
1113
1114    /// Returns all pending transactions of the address by sender
1115    pub fn get_pending_transactions_by_sender(
1116        &self,
1117        sender: Address,
1118    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1119        let sender_id = self.get_sender_id(sender);
1120        self.get_pool_data().pending_txs_by_sender(sender_id)
1121    }
1122
1123    /// Returns the highest transaction of the address
1124    pub fn get_highest_transaction_by_sender(
1125        &self,
1126        sender: Address,
1127    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1128        let sender_id = self.get_sender_id(sender);
1129        self.get_pool_data().get_highest_transaction_by_sender(sender_id)
1130    }
1131
1132    /// Returns the transaction with the highest nonce that is executable given the on chain nonce.
1133    pub fn get_highest_consecutive_transaction_by_sender(
1134        &self,
1135        sender: Address,
1136        on_chain_nonce: u64,
1137    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1138        let sender_id = self.get_sender_id(sender);
1139        self.get_pool_data().get_highest_consecutive_transaction_by_sender(
1140            sender_id.into_transaction_id(on_chain_nonce),
1141        )
1142    }
1143
1144    /// Returns the transaction given a [`TransactionId`]
1145    pub fn get_transaction_by_transaction_id(
1146        &self,
1147        transaction_id: &TransactionId,
1148    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
1149        self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
1150    }
1151
1152    /// Returns all transactions that where submitted with the given [`TransactionOrigin`]
1153    pub fn get_transactions_by_origin(
1154        &self,
1155        origin: TransactionOrigin,
1156    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1157        self.get_pool_data()
1158            .all()
1159            .transactions_iter()
1160            .filter(|tx| tx.origin == origin)
1161            .cloned()
1162            .collect()
1163    }
1164
1165    /// Returns all pending transactions filtered by [`TransactionOrigin`]
1166    pub fn get_pending_transactions_by_origin(
1167        &self,
1168        origin: TransactionOrigin,
1169    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1170        self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
1171    }
1172
1173    /// Returns all the transactions belonging to the hashes.
1174    ///
1175    /// If no transaction exists, it is skipped.
1176    pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1177        if txs.is_empty() {
1178            return Vec::new()
1179        }
1180        self.get_pool_data().get_all(txs).collect()
1181    }
1182
1183    /// Returns all the transactions belonging to the hashes that are propagatable.
1184    ///
1185    /// If no transaction exists, it is skipped.
1186    fn get_all_propagatable(
1187        &self,
1188        txs: &[TxHash],
1189    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
1190        if txs.is_empty() {
1191            return Vec::new()
1192        }
1193        let pool = self.get_pool_data();
1194        txs.iter().filter_map(|tx| pool.get(tx).filter(|tx| tx.propagate)).collect()
1195    }
1196
1197    /// Notify about propagated transactions.
1198    pub fn on_propagated(&self, txs: PropagatedTransactions) {
1199        if txs.0.is_empty() {
1200            return
1201        }
1202        self.with_event_listener(|listener| {
1203            txs.0.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers));
1204        });
1205    }
1206
1207    /// Number of transactions in the entire pool
1208    pub fn len(&self) -> usize {
1209        self.get_pool_data().len()
1210    }
1211
1212    /// Whether the pool is empty
1213    pub fn is_empty(&self) -> bool {
1214        self.get_pool_data().is_empty()
1215    }
1216
1217    /// Returns whether or not the pool is over its configured size and transaction count limits.
1218    pub fn is_exceeded(&self) -> bool {
1219        self.pool.read().is_exceeded()
1220    }
1221
1222    /// Inserts a blob transaction into the blob store
1223    fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecarVariant) {
1224        debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
1225        if let Err(err) = self.blob_store.insert(hash, blob) {
1226            warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
1227            self.blob_store_metrics.blobstore_failed_inserts.increment(1);
1228        }
1229        self.update_blob_store_metrics();
1230    }
1231
1232    /// Delete a blob from the blob store
1233    pub fn delete_blob(&self, blob: TxHash) {
1234        let _ = self.blob_store.delete(blob);
1235    }
1236
1237    /// Delete all blobs from the blob store
1238    pub fn delete_blobs(&self, txs: Vec<TxHash>) {
1239        let _ = self.blob_store.delete_all(txs);
1240    }
1241
1242    /// Cleans up the blob store
1243    pub fn cleanup_blobs(&self) {
1244        let stat = self.blob_store.cleanup();
1245        self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
1246        self.update_blob_store_metrics();
1247    }
1248
1249    fn update_blob_store_metrics(&self) {
1250        if let Some(data_size) = self.blob_store.data_size_hint() {
1251            self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
1252        }
1253        self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
1254    }
1255
1256    /// Deletes all blob transactions that were discarded.
1257    fn delete_discarded_blobs<'a>(
1258        &'a self,
1259        transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
1260    ) {
1261        let blob_txs = transactions
1262            .into_iter()
1263            .filter(|tx| tx.transaction.is_eip4844())
1264            .map(|tx| *tx.hash())
1265            .collect();
1266        self.delete_blobs(blob_txs);
1267    }
1268}
1269
1270impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
1271    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1272        f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
1273    }
1274}
1275
1276/// Metadata for a transaction that was added to the pool.
1277///
1278/// This holds all the data needed to complete post-insertion operations (notifications,
1279/// blob storage).
1280#[derive(Debug)]
1281struct AddedTransactionMeta<T: PoolTransaction> {
1282    /// The transaction that was added to the pool
1283    added: AddedTransaction<T>,
1284    /// Optional blob sidecar for EIP-4844 transactions
1285    blob_sidecar: Option<BlobTransactionSidecarVariant>,
1286}
1287
1288/// Tracks an added transaction and all graph changes caused by adding it.
1289#[derive(Debug, Clone)]
1290pub struct AddedPendingTransaction<T: PoolTransaction> {
1291    /// Inserted transaction.
1292    pub transaction: Arc<ValidPoolTransaction<T>>,
1293    /// Replaced transaction.
1294    pub replaced: Option<Arc<ValidPoolTransaction<T>>>,
1295    /// transactions promoted to the pending queue
1296    pub promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1297    /// transactions that failed and became discarded
1298    pub discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1299}
1300
1301impl<T: PoolTransaction> AddedPendingTransaction<T> {
1302    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1303    /// [`TransactionListenerKind`].
1304    ///
1305    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1306    /// are allowed to be propagated are returned.
1307    pub(crate) fn pending_transactions(
1308        &self,
1309        kind: TransactionListenerKind,
1310    ) -> impl Iterator<Item = B256> + '_ {
1311        let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1312        PendingTransactionIter { kind, iter }
1313    }
1314}
1315
1316pub(crate) struct PendingTransactionIter<Iter> {
1317    kind: TransactionListenerKind,
1318    iter: Iter,
1319}
1320
1321impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1322where
1323    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1324    T: PoolTransaction + 'a,
1325{
1326    type Item = B256;
1327
1328    fn next(&mut self) -> Option<Self::Item> {
1329        loop {
1330            let next = self.iter.next()?;
1331            if self.kind.is_propagate_only() && !next.propagate {
1332                continue
1333            }
1334            return Some(*next.hash())
1335        }
1336    }
1337}
1338
1339/// An iterator over full pending transactions
1340pub(crate) struct FullPendingTransactionIter<Iter> {
1341    kind: TransactionListenerKind,
1342    iter: Iter,
1343}
1344
1345impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1346where
1347    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1348    T: PoolTransaction + 'a,
1349{
1350    type Item = NewTransactionEvent<T>;
1351
1352    fn next(&mut self) -> Option<Self::Item> {
1353        loop {
1354            let next = self.iter.next()?;
1355            if self.kind.is_propagate_only() && !next.propagate {
1356                continue
1357            }
1358            return Some(NewTransactionEvent {
1359                subpool: SubPool::Pending,
1360                transaction: next.clone(),
1361            })
1362        }
1363    }
1364}
1365
1366/// Represents a transaction that was added into the pool and its state
1367#[derive(Debug, Clone)]
1368pub enum AddedTransaction<T: PoolTransaction> {
1369    /// Transaction was successfully added and moved to the pending pool.
1370    Pending(AddedPendingTransaction<T>),
1371    /// Transaction was successfully added but not yet ready for processing and moved to a
1372    /// parked pool instead.
1373    Parked {
1374        /// Inserted transaction.
1375        transaction: Arc<ValidPoolTransaction<T>>,
1376        /// Replaced transaction.
1377        replaced: Option<Arc<ValidPoolTransaction<T>>>,
1378        /// The subpool it was moved to.
1379        subpool: SubPool,
1380        /// The specific reason why the transaction is queued (if applicable).
1381        queued_reason: Option<QueuedReason>,
1382    },
1383}
1384
1385impl<T: PoolTransaction> AddedTransaction<T> {
1386    /// Returns whether the transaction has been added to the pending pool.
1387    pub const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1388        match self {
1389            Self::Pending(tx) => Some(tx),
1390            _ => None,
1391        }
1392    }
1393
1394    /// Returns the replaced transaction if there was one
1395    pub const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1396        match self {
1397            Self::Pending(tx) => tx.replaced.as_ref(),
1398            Self::Parked { replaced, .. } => replaced.as_ref(),
1399        }
1400    }
1401
1402    /// Returns the discarded transactions if there were any
1403    pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1404        match self {
1405            Self::Pending(tx) => Some(&tx.discarded),
1406            Self::Parked { .. } => None,
1407        }
1408    }
1409
1410    /// Returns the hash of the replaced transaction if it is a blob transaction.
1411    pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1412        self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1413    }
1414
1415    /// Returns the hash of the transaction
1416    pub fn hash(&self) -> &TxHash {
1417        match self {
1418            Self::Pending(tx) => tx.transaction.hash(),
1419            Self::Parked { transaction, .. } => transaction.hash(),
1420        }
1421    }
1422
1423    /// Converts this type into the event type for listeners
1424    pub fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1425        match self {
1426            Self::Pending(tx) => {
1427                NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1428            }
1429            Self::Parked { transaction, subpool, .. } => {
1430                NewTransactionEvent { transaction, subpool }
1431            }
1432        }
1433    }
1434
1435    /// Returns the subpool this transaction was added to
1436    pub(crate) const fn subpool(&self) -> SubPool {
1437        match self {
1438            Self::Pending(_) => SubPool::Pending,
1439            Self::Parked { subpool, .. } => *subpool,
1440        }
1441    }
1442
1443    /// Returns the [`TransactionId`] of the added transaction
1444    #[cfg(test)]
1445    pub(crate) fn id(&self) -> &TransactionId {
1446        match self {
1447            Self::Pending(added) => added.transaction.id(),
1448            Self::Parked { transaction, .. } => transaction.id(),
1449        }
1450    }
1451
1452    /// Returns the queued reason if the transaction is parked with a queued reason.
1453    pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1454        match self {
1455            Self::Pending(_) => None,
1456            Self::Parked { queued_reason, .. } => queued_reason.as_ref(),
1457        }
1458    }
1459
1460    /// Returns the transaction state based on the subpool and queued reason.
1461    pub fn transaction_state(&self) -> AddedTransactionState {
1462        match self.subpool() {
1463            SubPool::Pending => AddedTransactionState::Pending,
1464            _ => {
1465                // For non-pending transactions, use the queued reason directly from the
1466                // AddedTransaction
1467                if let Some(reason) = self.queued_reason() {
1468                    AddedTransactionState::Queued(reason.clone())
1469                } else {
1470                    // Fallback - this shouldn't happen with the new implementation
1471                    AddedTransactionState::Queued(QueuedReason::NonceGap)
1472                }
1473            }
1474        }
1475    }
1476}
1477
1478/// The specific reason why a transaction is queued (not ready for execution)
1479#[derive(Debug, Clone, PartialEq, Eq)]
1480pub enum QueuedReason {
1481    /// Transaction has a nonce gap - missing prior transactions
1482    NonceGap,
1483    /// Transaction has parked ancestors - waiting for other transactions to be mined
1484    ParkedAncestors,
1485    /// Sender has insufficient balance to cover the transaction cost
1486    InsufficientBalance,
1487    /// Transaction exceeds the block gas limit
1488    TooMuchGas,
1489    /// Transaction doesn't meet the base fee requirement
1490    InsufficientBaseFee,
1491    /// Transaction doesn't meet the blob fee requirement (EIP-4844)
1492    InsufficientBlobFee,
1493}
1494
1495/// The state of a transaction when is was added to the pool
1496#[derive(Debug, Clone, PartialEq, Eq)]
1497pub enum AddedTransactionState {
1498    /// Ready for execution
1499    Pending,
1500    /// Not ready for execution due to a specific condition
1501    Queued(QueuedReason),
1502}
1503
1504impl AddedTransactionState {
1505    /// Returns whether the transaction was submitted as queued.
1506    pub const fn is_queued(&self) -> bool {
1507        matches!(self, Self::Queued(_))
1508    }
1509
1510    /// Returns whether the transaction was submitted as pending.
1511    pub const fn is_pending(&self) -> bool {
1512        matches!(self, Self::Pending)
1513    }
1514
1515    /// Returns the specific queued reason if the transaction is queued.
1516    pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1517        match self {
1518            Self::Queued(reason) => Some(reason),
1519            Self::Pending => None,
1520        }
1521    }
1522}
1523
1524/// The outcome of a successful transaction addition
1525#[derive(Debug, Clone, PartialEq, Eq)]
1526pub struct AddedTransactionOutcome {
1527    /// The hash of the transaction
1528    pub hash: TxHash,
1529    /// The state of the transaction
1530    pub state: AddedTransactionState,
1531}
1532
1533impl AddedTransactionOutcome {
1534    /// Returns whether the transaction was submitted as queued.
1535    pub const fn is_queued(&self) -> bool {
1536        self.state.is_queued()
1537    }
1538
1539    /// Returns whether the transaction was submitted as pending.
1540    pub const fn is_pending(&self) -> bool {
1541        self.state.is_pending()
1542    }
1543}
1544
1545/// Contains all state changes after a [`CanonicalStateUpdate`] was processed
1546#[derive(Debug)]
1547pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1548    /// Hash of the block.
1549    pub(crate) block_hash: B256,
1550    /// All mined transactions.
1551    pub(crate) mined: Vec<TxHash>,
1552    /// Transactions promoted to the pending pool.
1553    pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1554    /// transaction that were discarded during the update
1555    pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1556}
1557
1558impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1559    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1560    /// [`TransactionListenerKind`].
1561    ///
1562    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1563    /// are allowed to be propagated are returned.
1564    pub(crate) fn pending_transactions(
1565        &self,
1566        kind: TransactionListenerKind,
1567    ) -> impl Iterator<Item = B256> + '_ {
1568        let iter = self.promoted.iter();
1569        PendingTransactionIter { kind, iter }
1570    }
1571
1572    /// Returns all FULL transactions that were promoted to the pending pool and adhere to the given
1573    /// [`TransactionListenerKind`].
1574    ///
1575    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1576    /// are allowed to be propagated are returned.
1577    pub(crate) fn full_pending_transactions(
1578        &self,
1579        kind: TransactionListenerKind,
1580    ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1581        let iter = self.promoted.iter();
1582        FullPendingTransactionIter { kind, iter }
1583    }
1584}
1585
1586#[cfg(test)]
1587mod tests {
1588    use crate::{
1589        blobstore::{BlobStore, InMemoryBlobStore},
1590        identifier::SenderId,
1591        test_utils::{MockTransaction, TestPoolBuilder},
1592        validate::ValidTransaction,
1593        BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1594    };
1595    use alloy_eips::{eip4844::BlobTransactionSidecar, eip7594::BlobTransactionSidecarVariant};
1596    use alloy_primitives::Address;
1597    use std::{fs, path::PathBuf};
1598
1599    #[test]
1600    fn test_discard_blobs_on_blob_tx_eviction() {
1601        let blobs = {
1602            // Read the contents of the JSON file into a string.
1603            let json_content = fs::read_to_string(
1604                PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1605            )
1606            .expect("Failed to read the blob data file");
1607
1608            // Parse the JSON contents into a serde_json::Value.
1609            let json_value: serde_json::Value =
1610                serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1611
1612            // Extract blob data from JSON and convert it to Blob.
1613            vec![
1614                // Extract the "data" field from the JSON and parse it as a string.
1615                json_value
1616                    .get("data")
1617                    .unwrap()
1618                    .as_str()
1619                    .expect("Data is not a valid string")
1620                    .to_string(),
1621            ]
1622        };
1623
1624        // Generate a BlobTransactionSidecar from the blobs.
1625        let sidecar = BlobTransactionSidecarVariant::Eip4844(
1626            BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap(),
1627        );
1628
1629        // Define the maximum limit for blobs in the sub-pool.
1630        let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1631
1632        // Create a test pool with default configuration and the specified blob limit.
1633        let test_pool = &TestPoolBuilder::default()
1634            .with_config(PoolConfig { blob_limit, ..Default::default() })
1635            .pool;
1636
1637        // Set the block info for the pool, including a pending blob fee.
1638        test_pool
1639            .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1640
1641        // Create an in-memory blob store.
1642        let blob_store = InMemoryBlobStore::default();
1643
1644        // Loop to add transactions to the pool and test blob eviction.
1645        for n in 0..blob_limit.max_txs + 10 {
1646            // Create a mock transaction with the generated blob sidecar.
1647            let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1648
1649            // Set non zero size
1650            tx.set_size(1844674407370951);
1651
1652            // Insert the sidecar into the blob store if the current index is within the blob limit.
1653            if n < blob_limit.max_txs {
1654                blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1655            }
1656
1657            // Add the transaction to the pool with external origin and valid outcome.
1658            test_pool.add_transactions(
1659                TransactionOrigin::External,
1660                [TransactionValidationOutcome::Valid {
1661                    balance: U256::from(1_000),
1662                    state_nonce: 0,
1663                    bytecode_hash: None,
1664                    transaction: ValidTransaction::ValidWithSidecar {
1665                        transaction: tx,
1666                        sidecar: sidecar.clone(),
1667                    },
1668                    propagate: true,
1669                    authorities: None,
1670                }],
1671            );
1672        }
1673
1674        // Assert that the size of the pool's blob component is equal to the maximum blob limit.
1675        assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1676
1677        // Assert that the size of the pool's blob_size component matches the expected value.
1678        assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1679
1680        // Assert that the pool's blob store matches the expected blob store.
1681        assert_eq!(*test_pool.blob_store(), blob_store);
1682    }
1683
1684    #[test]
1685    fn test_auths_stored_in_identifiers() {
1686        // Create a test pool with default configuration.
1687        let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1688
1689        let auth = Address::new([1; 20]);
1690        let tx = MockTransaction::eip7702();
1691
1692        test_pool.add_transactions(
1693            TransactionOrigin::Local,
1694            [TransactionValidationOutcome::Valid {
1695                balance: U256::from(1_000),
1696                state_nonce: 0,
1697                bytecode_hash: None,
1698                transaction: ValidTransaction::Valid(tx),
1699                propagate: true,
1700                authorities: Some(vec![auth]),
1701            }],
1702        );
1703
1704        let identifiers = test_pool.identifiers.read();
1705        assert_eq!(identifiers.sender_id(&auth), Some(SenderId::from(1)));
1706    }
1707}