reth_transaction_pool/pool/
mod.rs

1//! Transaction Pool internals.
2//!
3//! Incoming transactions are validated before they enter the pool first. The validation outcome can
4//! have 3 states:
5//!
6//!  1. Transaction can _never_ be valid
7//!  2. Transaction is _currently_ valid
8//!  3. Transaction is _currently_ invalid, but could potentially become valid in the future
9//!
10//! However, (2.) and (3.) of a transaction can only be determined on the basis of the current
11//! state, whereas (1.) holds indefinitely. This means once the state changes (2.) and (3.) the
12//! state of a transaction needs to be reevaluated again.
13//!
14//! The transaction pool is responsible for storing new, valid transactions and providing the next
15//! best transactions sorted by their priority. Where priority is determined by the transaction's
16//! score ([`TransactionOrdering`]).
17//!
18//! Furthermore, the following characteristics fall under (3.):
19//!
20//!  a) Nonce of a transaction is higher than the expected nonce for the next transaction of its
21//! sender. A distinction is made here whether multiple transactions from the same sender have
22//! gapless nonce increments.
23//!
24//!  a)(1) If _no_ transaction is missing in a chain of multiple
25//! transactions from the same sender (all nonce in row), all of them can in principle be executed
26//! on the current state one after the other.
27//!
28//!  a)(2) If there's a nonce gap, then all
29//! transactions after the missing transaction are blocked until the missing transaction arrives.
30//!
31//!  b) Transaction does not meet the dynamic fee cap requirement introduced by EIP-1559: The
32//! fee cap of the transaction needs to be no less than the base fee of block.
33//!
34//!
35//! In essence the transaction pool is made of three separate sub-pools:
36//!
37//!  - Pending Pool: Contains all transactions that are valid on the current state and satisfy (3.
38//!    a)(1): _No_ nonce gaps. A _pending_ transaction is considered _ready_ when it has the lowest
39//!    nonce of all transactions from the same sender. Once a _ready_ transaction with nonce `n` has
40//!    been executed, the next highest transaction from the same sender `n + 1` becomes ready.
41//!
42//!  - Queued Pool: Contains all transactions that are currently blocked by missing transactions:
43//!    (3. a)(2): _With_ nonce gaps or due to lack of funds.
44//!
45//!  - Basefee Pool: To account for the dynamic base fee requirement (3. b) which could render an
46//!    EIP-1559 and all subsequent transactions of the sender currently invalid.
47//!
48//! The classification of transactions is always dependent on the current state that is changed as
49//! soon as a new block is mined. Once a new block is mined, the account changeset must be applied
50//! to the transaction pool.
51//!
52//!
53//! Depending on the use case, consumers of the [`TransactionPool`](crate::traits::TransactionPool)
54//! are interested in (2.) and/or (3.).
55
56//! A generic [`TransactionPool`](crate::traits::TransactionPool) that only handles transactions.
57//!
58//! This Pool maintains two separate sub-pools for (2.) and (3.)
59//!
60//! ## Terminology
61//!
62//!  - _Pending_: pending transactions are transactions that fall under (2.). These transactions can
63//!    currently be executed and are stored in the pending sub-pool
64//!  - _Queued_: queued transactions are transactions that fall under category (3.). Those
65//!    transactions are _currently_ waiting for state changes that eventually move them into
66//!    category (2.) and become pending.
67
68use crate::{
69    blobstore::BlobStore,
70    error::{PoolError, PoolErrorKind, PoolResult},
71    identifier::{SenderId, SenderIdentifiers, TransactionId},
72    metrics::BlobStoreMetrics,
73    pool::{
74        listener::{
75            BlobTransactionSidecarListener, PendingTransactionHashListener, PoolEventBroadcast,
76            TransactionListener,
77        },
78        state::SubPool,
79        txpool::{SenderInfo, TxPool},
80        update::UpdateOutcome,
81    },
82    traits::{
83        AllPoolTransactions, BestTransactionsAttributes, BlockInfo, GetPooledTransactionLimit,
84        NewBlobSidecar, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
85    },
86    validate::{TransactionValidationOutcome, ValidPoolTransaction, ValidTransaction},
87    CanonicalStateUpdate, EthPoolTransaction, PoolConfig, TransactionOrdering,
88    TransactionValidator,
89};
90
91use alloy_primitives::{Address, TxHash, B256};
92use best::BestTransactions;
93use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
94use reth_eth_wire_types::HandleMempoolData;
95use reth_execution_types::ChangedAccount;
96
97use alloy_eips::{eip7594::BlobTransactionSidecarVariant, Typed2718};
98use reth_primitives_traits::Recovered;
99use rustc_hash::FxHashMap;
100use std::{collections::HashSet, fmt, sync::Arc, time::Instant};
101use tokio::sync::mpsc;
102use tracing::{debug, trace, warn};
103mod events;
104pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
105pub use blob::{blob_tx_priority, fee_delta, BlobOrd, BlobTransactions};
106pub use events::{FullTransactionEvent, NewTransactionEvent, TransactionEvent};
107pub use listener::{AllTransactionsEvents, TransactionEvents, TransactionListenerKind};
108pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool, QueuedOrd};
109pub use pending::PendingPool;
110use reth_primitives_traits::Block;
111
112mod best;
113mod blob;
114mod listener;
115mod parked;
116pub mod pending;
117pub(crate) mod size;
118pub(crate) mod state;
119pub mod txpool;
120mod update;
121
122/// Bound on number of pending transactions from `reth_network::TransactionsManager` to buffer.
123pub const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
124/// Bound on number of new transactions from `reth_network::TransactionsManager` to buffer.
125pub const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
126
127const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
128
129/// Transaction pool internals.
130pub struct PoolInner<V, T, S>
131where
132    T: TransactionOrdering,
133{
134    /// Internal mapping of addresses to plain ints.
135    identifiers: RwLock<SenderIdentifiers>,
136    /// Transaction validator.
137    validator: V,
138    /// Storage for blob transactions
139    blob_store: S,
140    /// The internal pool that manages all transactions.
141    pool: RwLock<TxPool<T>>,
142    /// Pool settings.
143    config: PoolConfig,
144    /// Manages listeners for transaction state change events.
145    event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
146    /// Listeners for new _full_ pending transactions.
147    pending_transaction_listener: Mutex<Vec<PendingTransactionHashListener>>,
148    /// Listeners for new transactions added to the pool.
149    transaction_listener: Mutex<Vec<TransactionListener<T::Transaction>>>,
150    /// Listener for new blob transaction sidecars added to the pool.
151    blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
152    /// Metrics for the blob store
153    blob_store_metrics: BlobStoreMetrics,
154}
155
156// === impl PoolInner ===
157
158impl<V, T, S> PoolInner<V, T, S>
159where
160    V: TransactionValidator,
161    T: TransactionOrdering<Transaction = <V as TransactionValidator>::Transaction>,
162    S: BlobStore,
163{
164    /// Create a new transaction pool instance.
165    pub fn new(validator: V, ordering: T, blob_store: S, config: PoolConfig) -> Self {
166        Self {
167            identifiers: Default::default(),
168            validator,
169            event_listener: Default::default(),
170            pool: RwLock::new(TxPool::new(ordering, config.clone())),
171            pending_transaction_listener: Default::default(),
172            transaction_listener: Default::default(),
173            blob_transaction_sidecar_listener: Default::default(),
174            config,
175            blob_store,
176            blob_store_metrics: Default::default(),
177        }
178    }
179
180    /// Returns the configured blob store.
181    pub const fn blob_store(&self) -> &S {
182        &self.blob_store
183    }
184
185    /// Returns stats about the size of the pool.
186    pub fn size(&self) -> PoolSize {
187        self.get_pool_data().size()
188    }
189
190    /// Returns the currently tracked block
191    pub fn block_info(&self) -> BlockInfo {
192        self.get_pool_data().block_info()
193    }
194    /// Sets the currently tracked block
195    pub fn set_block_info(&self, info: BlockInfo) {
196        self.pool.write().set_block_info(info)
197    }
198
199    /// Returns the internal [`SenderId`] for this address
200    pub fn get_sender_id(&self, addr: Address) -> SenderId {
201        self.identifiers.write().sender_id_or_create(addr)
202    }
203
204    /// Returns the internal [`SenderId`]s for the given addresses.
205    pub fn get_sender_ids(&self, addrs: impl IntoIterator<Item = Address>) -> Vec<SenderId> {
206        self.identifiers.write().sender_ids_or_create(addrs)
207    }
208
209    /// Returns all senders in the pool
210    pub fn unique_senders(&self) -> HashSet<Address> {
211        self.get_pool_data().unique_senders()
212    }
213
214    /// Converts the changed accounts to a map of sender ids to sender info (internal identifier
215    /// used for accounts)
216    fn changed_senders(
217        &self,
218        accs: impl Iterator<Item = ChangedAccount>,
219    ) -> FxHashMap<SenderId, SenderInfo> {
220        let mut identifiers = self.identifiers.write();
221        accs.into_iter()
222            .map(|acc| {
223                let ChangedAccount { address, nonce, balance } = acc;
224                let sender_id = identifiers.sender_id_or_create(address);
225                (sender_id, SenderInfo { state_nonce: nonce, balance })
226            })
227            .collect()
228    }
229
230    /// Get the config the pool was configured with.
231    pub const fn config(&self) -> &PoolConfig {
232        &self.config
233    }
234
235    /// Get the validator reference.
236    pub const fn validator(&self) -> &V {
237        &self.validator
238    }
239
240    /// Adds a new transaction listener to the pool that gets notified about every new _pending_
241    /// transaction inserted into the pool
242    pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
243        let (sender, rx) = mpsc::channel(self.config.pending_tx_listener_buffer_size);
244        let listener = PendingTransactionHashListener { sender, kind };
245        self.pending_transaction_listener.lock().push(listener);
246        rx
247    }
248
249    /// Adds a new transaction listener to the pool that gets notified about every new transaction.
250    pub fn add_new_transaction_listener(
251        &self,
252        kind: TransactionListenerKind,
253    ) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
254        let (sender, rx) = mpsc::channel(self.config.new_tx_listener_buffer_size);
255        let listener = TransactionListener { sender, kind };
256        self.transaction_listener.lock().push(listener);
257        rx
258    }
259    /// Adds a new blob sidecar listener to the pool that gets notified about every new
260    /// eip4844 transaction's blob sidecar.
261    pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
262        let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
263        let listener = BlobTransactionSidecarListener { sender };
264        self.blob_transaction_sidecar_listener.lock().push(listener);
265        rx
266    }
267
268    /// If the pool contains the transaction, this adds a new listener that gets notified about
269    /// transaction events.
270    pub fn add_transaction_event_listener(&self, tx_hash: TxHash) -> Option<TransactionEvents> {
271        self.get_pool_data()
272            .contains(&tx_hash)
273            .then(|| self.event_listener.write().subscribe(tx_hash))
274    }
275
276    /// Adds a listener for all transaction events.
277    pub fn add_all_transactions_event_listener(&self) -> AllTransactionsEvents<T::Transaction> {
278        self.event_listener.write().subscribe_all()
279    }
280
281    /// Returns a read lock to the pool's data.
282    pub fn get_pool_data(&self) -> RwLockReadGuard<'_, TxPool<T>> {
283        self.pool.read()
284    }
285
286    /// Returns hashes of transactions in the pool that can be propagated.
287    pub fn pooled_transactions_hashes(&self) -> Vec<TxHash> {
288        self.get_pool_data()
289            .all()
290            .transactions_iter()
291            .filter(|tx| tx.propagate)
292            .map(|tx| *tx.hash())
293            .collect()
294    }
295
296    /// Returns transactions in the pool that can be propagated
297    pub fn pooled_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
298        self.get_pool_data().all().transactions_iter().filter(|tx| tx.propagate).cloned().collect()
299    }
300
301    /// Returns only the first `max` transactions in the pool that can be propagated.
302    pub fn pooled_transactions_max(
303        &self,
304        max: usize,
305    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
306        self.get_pool_data()
307            .all()
308            .transactions_iter()
309            .filter(|tx| tx.propagate)
310            .take(max)
311            .cloned()
312            .collect()
313    }
314
315    /// Converts the internally tracked transaction to the pooled format.
316    ///
317    /// If the transaction is an EIP-4844 transaction, the blob sidecar is fetched from the blob
318    /// store and attached to the transaction.
319    fn to_pooled_transaction(
320        &self,
321        transaction: Arc<ValidPoolTransaction<T::Transaction>>,
322    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
323    where
324        <V as TransactionValidator>::Transaction: EthPoolTransaction,
325    {
326        if transaction.is_eip4844() {
327            let sidecar = self.blob_store.get(*transaction.hash()).ok()??;
328            transaction.transaction.clone().try_into_pooled_eip4844(sidecar)
329        } else {
330            transaction
331                .transaction
332                .clone()
333                .try_into_pooled()
334                .inspect_err(|err| {
335                    debug!(
336                        target: "txpool", %err,
337                        "failed to convert transaction to pooled element; skipping",
338                    );
339                })
340                .ok()
341        }
342    }
343
344    /// Returns pooled transactions for the given transaction hashes that are allowed to be
345    /// propagated.
346    pub fn get_pooled_transaction_elements(
347        &self,
348        tx_hashes: Vec<TxHash>,
349        limit: GetPooledTransactionLimit,
350    ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
351    where
352        <V as TransactionValidator>::Transaction: EthPoolTransaction,
353    {
354        let transactions = self.get_all_propagatable(tx_hashes);
355        let mut elements = Vec::with_capacity(transactions.len());
356        let mut size = 0;
357        for transaction in transactions {
358            let encoded_len = transaction.encoded_length();
359            let Some(pooled) = self.to_pooled_transaction(transaction) else {
360                continue;
361            };
362
363            size += encoded_len;
364            elements.push(pooled.into_inner());
365
366            if limit.exceeds(size) {
367                break
368            }
369        }
370
371        elements
372    }
373
374    /// Returns converted pooled transaction for the given transaction hash.
375    pub fn get_pooled_transaction_element(
376        &self,
377        tx_hash: TxHash,
378    ) -> Option<Recovered<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
379    where
380        <V as TransactionValidator>::Transaction: EthPoolTransaction,
381    {
382        self.get(&tx_hash).and_then(|tx| self.to_pooled_transaction(tx))
383    }
384
385    /// Updates the entire pool after a new block was executed.
386    pub fn on_canonical_state_change<B>(&self, update: CanonicalStateUpdate<'_, B>)
387    where
388        B: Block,
389    {
390        trace!(target: "txpool", ?update, "updating pool on canonical state change");
391
392        let block_info = update.block_info();
393        let CanonicalStateUpdate {
394            new_tip, changed_accounts, mined_transactions, update_kind, ..
395        } = update;
396        self.validator.on_new_head_block(new_tip);
397
398        let changed_senders = self.changed_senders(changed_accounts.into_iter());
399
400        // update the pool
401        let outcome = self.pool.write().on_canonical_state_change(
402            block_info,
403            mined_transactions,
404            changed_senders,
405            update_kind,
406        );
407
408        // This will discard outdated transactions based on the account's nonce
409        self.delete_discarded_blobs(outcome.discarded.iter());
410
411        // notify listeners about updates
412        self.notify_on_new_state(outcome);
413    }
414
415    /// Performs account updates on the pool.
416    ///
417    /// This will either promote or discard transactions based on the new account state.
418    ///
419    /// This should be invoked when the pool drifted and accounts are updated manually
420    pub fn update_accounts(&self, accounts: Vec<ChangedAccount>) {
421        let changed_senders = self.changed_senders(accounts.into_iter());
422        let UpdateOutcome { promoted, discarded } =
423            self.pool.write().update_accounts(changed_senders);
424
425        // Notify about promoted pending transactions (similar to notify_on_new_state)
426        if !promoted.is_empty() {
427            self.pending_transaction_listener.lock().retain_mut(|listener| {
428                let promoted_hashes = promoted.iter().filter_map(|tx| {
429                    if listener.kind.is_propagate_only() && !tx.propagate {
430                        None
431                    } else {
432                        Some(*tx.hash())
433                    }
434                });
435                listener.send_all(promoted_hashes)
436            });
437
438            // in this case we should also emit promoted transactions in full
439            self.transaction_listener.lock().retain_mut(|listener| {
440                let promoted_txs = promoted.iter().filter_map(|tx| {
441                    if listener.kind.is_propagate_only() && !tx.propagate {
442                        None
443                    } else {
444                        Some(NewTransactionEvent::pending(tx.clone()))
445                    }
446                });
447                listener.send_all(promoted_txs)
448            });
449        }
450
451        {
452            let mut listener = self.event_listener.write();
453            if !listener.is_empty() {
454                for tx in &promoted {
455                    listener.pending(tx.hash(), None);
456                }
457                for tx in &discarded {
458                    listener.discarded(tx.hash());
459                }
460            }
461        }
462
463        // This deletes outdated blob txs from the blob store, based on the account's nonce. This is
464        // called during txpool maintenance when the pool drifted.
465        self.delete_discarded_blobs(discarded.iter());
466    }
467
468    /// Add a single validated transaction into the pool.
469    ///
470    /// Note: this is only used internally by [`Self::add_transactions()`], all new transaction(s)
471    /// come in through that function, either as a batch or `std::iter::once`.
472    fn add_transaction(
473        &self,
474        pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
475        origin: TransactionOrigin,
476        tx: TransactionValidationOutcome<T::Transaction>,
477    ) -> PoolResult<AddedTransactionOutcome> {
478        match tx {
479            TransactionValidationOutcome::Valid {
480                balance,
481                state_nonce,
482                transaction,
483                propagate,
484                bytecode_hash,
485                authorities,
486            } => {
487                let sender_id = self.get_sender_id(transaction.sender());
488                let transaction_id = TransactionId::new(sender_id, transaction.nonce());
489
490                // split the valid transaction and the blob sidecar if it has any
491                let (transaction, maybe_sidecar) = match transaction {
492                    ValidTransaction::Valid(tx) => (tx, None),
493                    ValidTransaction::ValidWithSidecar { transaction, sidecar } => {
494                        debug_assert!(
495                            transaction.is_eip4844(),
496                            "validator returned sidecar for non EIP-4844 transaction"
497                        );
498                        (transaction, Some(sidecar))
499                    }
500                };
501
502                let tx = ValidPoolTransaction {
503                    transaction,
504                    transaction_id,
505                    propagate,
506                    timestamp: Instant::now(),
507                    origin,
508                    authority_ids: authorities.map(|auths| self.get_sender_ids(auths)),
509                };
510
511                let added = pool.add_transaction(tx, balance, state_nonce, bytecode_hash)?;
512                let hash = *added.hash();
513                let state = added.transaction_state();
514
515                // transaction was successfully inserted into the pool
516                if let Some(sidecar) = maybe_sidecar {
517                    // notify blob sidecar listeners
518                    self.on_new_blob_sidecar(&hash, &sidecar);
519                    // store the sidecar in the blob store
520                    self.insert_blob(hash, sidecar);
521                }
522
523                if let Some(replaced) = added.replaced_blob_transaction() {
524                    debug!(target: "txpool", "[{:?}] delete replaced blob sidecar", replaced);
525                    // delete the replaced transaction from the blob store
526                    self.delete_blob(replaced);
527                }
528
529                // Notify about new pending transactions
530                if let Some(pending) = added.as_pending() {
531                    self.on_new_pending_transaction(pending);
532                }
533
534                // Notify tx event listeners
535                self.notify_event_listeners(&added);
536
537                if let Some(discarded) = added.discarded_transactions() {
538                    self.delete_discarded_blobs(discarded.iter());
539                }
540
541                // Notify listeners for _all_ transactions
542                self.on_new_transaction(added.into_new_transaction_event());
543
544                Ok(AddedTransactionOutcome { hash, state })
545            }
546            TransactionValidationOutcome::Invalid(tx, err) => {
547                let mut listener = self.event_listener.write();
548                listener.invalid(tx.hash());
549                Err(PoolError::new(*tx.hash(), err))
550            }
551            TransactionValidationOutcome::Error(tx_hash, err) => {
552                let mut listener = self.event_listener.write();
553                listener.discarded(&tx_hash);
554                Err(PoolError::other(tx_hash, err))
555            }
556        }
557    }
558
559    /// Adds a transaction and returns the event stream.
560    pub fn add_transaction_and_subscribe(
561        &self,
562        origin: TransactionOrigin,
563        tx: TransactionValidationOutcome<T::Transaction>,
564    ) -> PoolResult<TransactionEvents> {
565        let listener = {
566            let mut listener = self.event_listener.write();
567            listener.subscribe(tx.tx_hash())
568        };
569        let mut results = self.add_transactions(origin, std::iter::once(tx));
570        results.pop().expect("result length is the same as the input")?;
571        Ok(listener)
572    }
573
574    /// Adds all transactions in the iterator to the pool, returning a list of results.
575    ///
576    /// Note: A large batch may lock the pool for a long time that blocks important operations
577    /// like updating the pool on canonical state changes. The caller should consider having
578    /// a max batch size to balance transaction insertions with other updates.
579    pub fn add_transactions(
580        &self,
581        origin: TransactionOrigin,
582        transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
583    ) -> Vec<PoolResult<AddedTransactionOutcome>> {
584        // Process all transactions in one write lock, maintaining individual origins
585        let (mut added, discarded) = {
586            let mut pool = self.pool.write();
587            let added = transactions
588                .into_iter()
589                .map(|tx| self.add_transaction(&mut pool, origin, tx))
590                .collect::<Vec<_>>();
591
592            // Enforce the pool size limits if at least one transaction was added successfully
593            let discarded = if added.iter().any(Result::is_ok) {
594                pool.discard_worst()
595            } else {
596                Default::default()
597            };
598
599            (added, discarded)
600        };
601
602        if !discarded.is_empty() {
603            // Delete any blobs associated with discarded blob transactions
604            self.delete_discarded_blobs(discarded.iter());
605            self.event_listener.write().discarded_many(&discarded);
606
607            let discarded_hashes =
608                discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
609
610            // A newly added transaction may be immediately discarded, so we need to
611            // adjust the result here
612            for res in &mut added {
613                if let Ok(AddedTransactionOutcome { hash, .. }) = res &&
614                    discarded_hashes.contains(hash)
615                {
616                    *res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
617                }
618            }
619        };
620
621        added
622    }
623
624    /// Notify all listeners about a new pending transaction.
625    ///
626    /// See also [`Self::add_pending_listener`]
627    ///
628    /// CAUTION: This function is only intended to be used manually in order to use this type's
629    /// pending transaction receivers when manually implementing the
630    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
631    /// [`TransactionPool::pending_transactions_listener_for`](crate::TransactionPool).
632    pub fn on_new_pending_transaction(&self, pending: &AddedPendingTransaction<T::Transaction>) {
633        let propagate_allowed = pending.is_propagate_allowed();
634
635        let mut transaction_listeners = self.pending_transaction_listener.lock();
636        transaction_listeners.retain_mut(|listener| {
637            if listener.kind.is_propagate_only() && !propagate_allowed {
638                // only emit this hash to listeners that are only allowed to receive propagate only
639                // transactions, such as network
640                return !listener.sender.is_closed()
641            }
642
643            // broadcast all pending transactions to the listener
644            listener.send_all(pending.pending_transactions(listener.kind))
645        });
646    }
647
648    /// Notify all listeners about a newly inserted pending transaction.
649    ///
650    /// See also [`Self::add_new_transaction_listener`]
651    ///
652    /// CAUTION: This function is only intended to be used manually in order to use this type's
653    /// transaction receivers when manually implementing the
654    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
655    /// [`TransactionPool::new_transactions_listener_for`](crate::TransactionPool).
656    pub fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
657        let mut transaction_listeners = self.transaction_listener.lock();
658        transaction_listeners.retain_mut(|listener| {
659            if listener.kind.is_propagate_only() && !event.transaction.propagate {
660                // only emit this hash to listeners that are only allowed to receive propagate only
661                // transactions, such as network
662                return !listener.sender.is_closed()
663            }
664
665            listener.send(event.clone())
666        });
667    }
668
669    /// Notify all listeners about a blob sidecar for a newly inserted blob (eip4844) transaction.
670    fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecarVariant) {
671        let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
672        if sidecar_listeners.is_empty() {
673            return
674        }
675        let sidecar = Arc::new(sidecar.clone());
676        sidecar_listeners.retain_mut(|listener| {
677            let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
678            match listener.sender.try_send(new_blob_event) {
679                Ok(()) => true,
680                Err(err) => {
681                    if matches!(err, mpsc::error::TrySendError::Full(_)) {
682                        debug!(
683                            target: "txpool",
684                            "[{:?}] failed to send blob sidecar; channel full",
685                            sidecar,
686                        );
687                        true
688                    } else {
689                        false
690                    }
691                }
692            }
693        })
694    }
695
696    /// Notifies transaction listeners about changes once a block was processed.
697    fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
698        trace!(target: "txpool", promoted=outcome.promoted.len(), discarded= outcome.discarded.len() ,"notifying listeners on state change");
699
700        // notify about promoted pending transactions
701        // emit hashes
702        self.pending_transaction_listener
703            .lock()
704            .retain_mut(|listener| listener.send_all(outcome.pending_transactions(listener.kind)));
705
706        // emit full transactions
707        self.transaction_listener.lock().retain_mut(|listener| {
708            listener.send_all(outcome.full_pending_transactions(listener.kind))
709        });
710
711        let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
712
713        // broadcast specific transaction events
714        let mut listener = self.event_listener.write();
715
716        if !listener.is_empty() {
717            for tx in &mined {
718                listener.mined(tx, block_hash);
719            }
720            for tx in &promoted {
721                listener.pending(tx.hash(), None);
722            }
723            for tx in &discarded {
724                listener.discarded(tx.hash());
725            }
726        }
727    }
728
729    /// Fire events for the newly added transaction if there are any.
730    ///
731    /// See also [`Self::add_transaction_event_listener`].
732    ///
733    /// CAUTION: This function is only intended to be used manually in order to use this type's
734    /// [`TransactionEvents`] receivers when manually implementing the
735    /// [`TransactionPool`](crate::TransactionPool) trait for a custom pool implementation
736    /// [`TransactionPool::transaction_event_listener`](crate::TransactionPool).
737    pub fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
738        let mut listener = self.event_listener.write();
739        if listener.is_empty() {
740            // nothing to notify
741            return
742        }
743
744        match tx {
745            AddedTransaction::Pending(tx) => {
746                let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
747
748                listener.pending(transaction.hash(), replaced.clone());
749                for tx in promoted {
750                    listener.pending(tx.hash(), None);
751                }
752                for tx in discarded {
753                    listener.discarded(tx.hash());
754                }
755            }
756            AddedTransaction::Parked { transaction, replaced, queued_reason, .. } => {
757                listener.queued(transaction.hash(), queued_reason.clone());
758                if let Some(replaced) = replaced {
759                    listener.replaced(replaced.clone(), *transaction.hash());
760                }
761            }
762        }
763    }
764
765    /// Returns an iterator that yields transactions that are ready to be included in the block.
766    pub fn best_transactions(&self) -> BestTransactions<T> {
767        self.get_pool_data().best_transactions()
768    }
769
770    /// Returns an iterator that yields transactions that are ready to be included in the block with
771    /// the given base fee and optional blob fee attributes.
772    pub fn best_transactions_with_attributes(
773        &self,
774        best_transactions_attributes: BestTransactionsAttributes,
775    ) -> Box<dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<T::Transaction>>>>
776    {
777        self.get_pool_data().best_transactions_with_attributes(best_transactions_attributes)
778    }
779
780    /// Returns only the first `max` transactions in the pending pool.
781    pub fn pending_transactions_max(
782        &self,
783        max: usize,
784    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
785        self.get_pool_data().pending_transactions_iter().take(max).collect()
786    }
787
788    /// Returns all transactions from the pending sub-pool
789    pub fn pending_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
790        self.get_pool_data().pending_transactions()
791    }
792
793    /// Returns all transactions from parked pools
794    pub fn queued_transactions(&self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
795        self.get_pool_data().queued_transactions()
796    }
797
798    /// Returns all transactions in the pool
799    pub fn all_transactions(&self) -> AllPoolTransactions<T::Transaction> {
800        let pool = self.get_pool_data();
801        AllPoolTransactions {
802            pending: pool.pending_transactions(),
803            queued: pool.queued_transactions(),
804        }
805    }
806
807    /// Returns _all_ transactions in the pool
808    pub fn all_transaction_hashes(&self) -> Vec<TxHash> {
809        self.get_pool_data().all().transactions_iter().map(|tx| *tx.hash()).collect()
810    }
811
812    /// Removes and returns all matching transactions from the pool.
813    ///
814    /// This behaves as if the transactions got discarded (_not_ mined), effectively introducing a
815    /// nonce gap for the given transactions.
816    pub fn remove_transactions(
817        &self,
818        hashes: Vec<TxHash>,
819    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
820        if hashes.is_empty() {
821            return Vec::new()
822        }
823        let removed = self.pool.write().remove_transactions(hashes);
824
825        self.event_listener.write().discarded_many(&removed);
826
827        removed
828    }
829
830    /// Removes and returns all matching transactions and their dependent transactions from the
831    /// pool.
832    pub fn remove_transactions_and_descendants(
833        &self,
834        hashes: Vec<TxHash>,
835    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
836        if hashes.is_empty() {
837            return Vec::new()
838        }
839        let removed = self.pool.write().remove_transactions_and_descendants(hashes);
840
841        let mut listener = self.event_listener.write();
842
843        for tx in &removed {
844            listener.discarded(tx.hash());
845        }
846
847        removed
848    }
849
850    /// Removes and returns all transactions by the specified sender from the pool.
851    pub fn remove_transactions_by_sender(
852        &self,
853        sender: Address,
854    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
855        let sender_id = self.get_sender_id(sender);
856        let removed = self.pool.write().remove_transactions_by_sender(sender_id);
857
858        self.event_listener.write().discarded_many(&removed);
859
860        removed
861    }
862
863    /// Removes and returns all transactions that are present in the pool.
864    pub fn retain_unknown<A>(&self, announcement: &mut A)
865    where
866        A: HandleMempoolData,
867    {
868        if announcement.is_empty() {
869            return
870        }
871        let pool = self.get_pool_data();
872        announcement.retain_by_hash(|tx| !pool.contains(tx))
873    }
874
875    /// Returns the transaction by hash.
876    pub fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
877        self.get_pool_data().get(tx_hash)
878    }
879
880    /// Returns all transactions of the address
881    pub fn get_transactions_by_sender(
882        &self,
883        sender: Address,
884    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
885        let sender_id = self.get_sender_id(sender);
886        self.get_pool_data().get_transactions_by_sender(sender_id)
887    }
888
889    /// Returns all queued transactions of the address by sender
890    pub fn get_queued_transactions_by_sender(
891        &self,
892        sender: Address,
893    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
894        let sender_id = self.get_sender_id(sender);
895        self.get_pool_data().queued_txs_by_sender(sender_id)
896    }
897
898    /// Returns all pending transactions filtered by predicate
899    pub fn pending_transactions_with_predicate(
900        &self,
901        predicate: impl FnMut(&ValidPoolTransaction<T::Transaction>) -> bool,
902    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
903        self.get_pool_data().pending_transactions_with_predicate(predicate)
904    }
905
906    /// Returns all pending transactions of the address by sender
907    pub fn get_pending_transactions_by_sender(
908        &self,
909        sender: Address,
910    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
911        let sender_id = self.get_sender_id(sender);
912        self.get_pool_data().pending_txs_by_sender(sender_id)
913    }
914
915    /// Returns the highest transaction of the address
916    pub fn get_highest_transaction_by_sender(
917        &self,
918        sender: Address,
919    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
920        let sender_id = self.get_sender_id(sender);
921        self.get_pool_data().get_highest_transaction_by_sender(sender_id)
922    }
923
924    /// Returns the transaction with the highest nonce that is executable given the on chain nonce.
925    pub fn get_highest_consecutive_transaction_by_sender(
926        &self,
927        sender: Address,
928        on_chain_nonce: u64,
929    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
930        let sender_id = self.get_sender_id(sender);
931        self.get_pool_data().get_highest_consecutive_transaction_by_sender(
932            sender_id.into_transaction_id(on_chain_nonce),
933        )
934    }
935
936    /// Returns the transaction given a [`TransactionId`]
937    pub fn get_transaction_by_transaction_id(
938        &self,
939        transaction_id: &TransactionId,
940    ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
941        self.get_pool_data().all().get(transaction_id).map(|tx| tx.transaction.clone())
942    }
943
944    /// Returns all transactions that where submitted with the given [`TransactionOrigin`]
945    pub fn get_transactions_by_origin(
946        &self,
947        origin: TransactionOrigin,
948    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
949        self.get_pool_data()
950            .all()
951            .transactions_iter()
952            .filter(|tx| tx.origin == origin)
953            .cloned()
954            .collect()
955    }
956
957    /// Returns all pending transactions filtered by [`TransactionOrigin`]
958    pub fn get_pending_transactions_by_origin(
959        &self,
960        origin: TransactionOrigin,
961    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
962        self.get_pool_data().pending_transactions_iter().filter(|tx| tx.origin == origin).collect()
963    }
964
965    /// Returns all the transactions belonging to the hashes.
966    ///
967    /// If no transaction exists, it is skipped.
968    pub fn get_all(&self, txs: Vec<TxHash>) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
969        if txs.is_empty() {
970            return Vec::new()
971        }
972        self.get_pool_data().get_all(txs).collect()
973    }
974
975    /// Returns all the transactions belonging to the hashes that are propagatable.
976    ///
977    /// If no transaction exists, it is skipped.
978    fn get_all_propagatable(
979        &self,
980        txs: Vec<TxHash>,
981    ) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
982        if txs.is_empty() {
983            return Vec::new()
984        }
985        self.get_pool_data().get_all(txs).filter(|tx| tx.propagate).collect()
986    }
987
988    /// Notify about propagated transactions.
989    pub fn on_propagated(&self, txs: PropagatedTransactions) {
990        if txs.0.is_empty() {
991            return
992        }
993        let mut listener = self.event_listener.write();
994
995        if !listener.is_empty() {
996            txs.0.into_iter().for_each(|(hash, peers)| listener.propagated(&hash, peers));
997        }
998    }
999
1000    /// Number of transactions in the entire pool
1001    pub fn len(&self) -> usize {
1002        self.get_pool_data().len()
1003    }
1004
1005    /// Whether the pool is empty
1006    pub fn is_empty(&self) -> bool {
1007        self.get_pool_data().is_empty()
1008    }
1009
1010    /// Returns whether or not the pool is over its configured size and transaction count limits.
1011    pub fn is_exceeded(&self) -> bool {
1012        self.pool.read().is_exceeded()
1013    }
1014
1015    /// Inserts a blob transaction into the blob store
1016    fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecarVariant) {
1017        debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
1018        if let Err(err) = self.blob_store.insert(hash, blob) {
1019            warn!(target: "txpool", %err, "[{:?}] failed to insert blob", hash);
1020            self.blob_store_metrics.blobstore_failed_inserts.increment(1);
1021        }
1022        self.update_blob_store_metrics();
1023    }
1024
1025    /// Delete a blob from the blob store
1026    pub fn delete_blob(&self, blob: TxHash) {
1027        let _ = self.blob_store.delete(blob);
1028    }
1029
1030    /// Delete all blobs from the blob store
1031    pub fn delete_blobs(&self, txs: Vec<TxHash>) {
1032        let _ = self.blob_store.delete_all(txs);
1033    }
1034
1035    /// Cleans up the blob store
1036    pub fn cleanup_blobs(&self) {
1037        let stat = self.blob_store.cleanup();
1038        self.blob_store_metrics.blobstore_failed_deletes.increment(stat.delete_failed as u64);
1039        self.update_blob_store_metrics();
1040    }
1041
1042    fn update_blob_store_metrics(&self) {
1043        if let Some(data_size) = self.blob_store.data_size_hint() {
1044            self.blob_store_metrics.blobstore_byte_size.set(data_size as f64);
1045        }
1046        self.blob_store_metrics.blobstore_entries.set(self.blob_store.blobs_len() as f64);
1047    }
1048
1049    /// Deletes all blob transactions that were discarded.
1050    fn delete_discarded_blobs<'a>(
1051        &'a self,
1052        transactions: impl IntoIterator<Item = &'a Arc<ValidPoolTransaction<T::Transaction>>>,
1053    ) {
1054        let blob_txs = transactions
1055            .into_iter()
1056            .filter(|tx| tx.transaction.is_eip4844())
1057            .map(|tx| *tx.hash())
1058            .collect();
1059        self.delete_blobs(blob_txs);
1060    }
1061}
1062
1063impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
1064    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1065        f.debug_struct("PoolInner").field("config", &self.config).finish_non_exhaustive()
1066    }
1067}
1068
1069/// Tracks an added transaction and all graph changes caused by adding it.
1070#[derive(Debug, Clone)]
1071pub struct AddedPendingTransaction<T: PoolTransaction> {
1072    /// Inserted transaction.
1073    transaction: Arc<ValidPoolTransaction<T>>,
1074    /// Replaced transaction.
1075    replaced: Option<Arc<ValidPoolTransaction<T>>>,
1076    /// transactions promoted to the pending queue
1077    promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1078    /// transactions that failed and became discarded
1079    discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1080}
1081
1082impl<T: PoolTransaction> AddedPendingTransaction<T> {
1083    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1084    /// [`TransactionListenerKind`].
1085    ///
1086    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1087    /// are allowed to be propagated are returned.
1088    pub(crate) fn pending_transactions(
1089        &self,
1090        kind: TransactionListenerKind,
1091    ) -> impl Iterator<Item = B256> + '_ {
1092        let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
1093        PendingTransactionIter { kind, iter }
1094    }
1095
1096    /// Returns if the transaction should be propagated.
1097    pub(crate) fn is_propagate_allowed(&self) -> bool {
1098        self.transaction.propagate
1099    }
1100}
1101
1102pub(crate) struct PendingTransactionIter<Iter> {
1103    kind: TransactionListenerKind,
1104    iter: Iter,
1105}
1106
1107impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
1108where
1109    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1110    T: PoolTransaction + 'a,
1111{
1112    type Item = B256;
1113
1114    fn next(&mut self) -> Option<Self::Item> {
1115        loop {
1116            let next = self.iter.next()?;
1117            if self.kind.is_propagate_only() && !next.propagate {
1118                continue
1119            }
1120            return Some(*next.hash())
1121        }
1122    }
1123}
1124
1125/// An iterator over full pending transactions
1126pub(crate) struct FullPendingTransactionIter<Iter> {
1127    kind: TransactionListenerKind,
1128    iter: Iter,
1129}
1130
1131impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
1132where
1133    Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
1134    T: PoolTransaction + 'a,
1135{
1136    type Item = NewTransactionEvent<T>;
1137
1138    fn next(&mut self) -> Option<Self::Item> {
1139        loop {
1140            let next = self.iter.next()?;
1141            if self.kind.is_propagate_only() && !next.propagate {
1142                continue
1143            }
1144            return Some(NewTransactionEvent {
1145                subpool: SubPool::Pending,
1146                transaction: next.clone(),
1147            })
1148        }
1149    }
1150}
1151
1152/// Represents a transaction that was added into the pool and its state
1153#[derive(Debug, Clone)]
1154pub enum AddedTransaction<T: PoolTransaction> {
1155    /// Transaction was successfully added and moved to the pending pool.
1156    Pending(AddedPendingTransaction<T>),
1157    /// Transaction was successfully added but not yet ready for processing and moved to a
1158    /// parked pool instead.
1159    Parked {
1160        /// Inserted transaction.
1161        transaction: Arc<ValidPoolTransaction<T>>,
1162        /// Replaced transaction.
1163        replaced: Option<Arc<ValidPoolTransaction<T>>>,
1164        /// The subpool it was moved to.
1165        subpool: SubPool,
1166        /// The specific reason why the transaction is queued (if applicable).
1167        queued_reason: Option<QueuedReason>,
1168    },
1169}
1170
1171impl<T: PoolTransaction> AddedTransaction<T> {
1172    /// Returns whether the transaction has been added to the pending pool.
1173    pub(crate) const fn as_pending(&self) -> Option<&AddedPendingTransaction<T>> {
1174        match self {
1175            Self::Pending(tx) => Some(tx),
1176            _ => None,
1177        }
1178    }
1179
1180    /// Returns the replaced transaction if there was one
1181    pub(crate) const fn replaced(&self) -> Option<&Arc<ValidPoolTransaction<T>>> {
1182        match self {
1183            Self::Pending(tx) => tx.replaced.as_ref(),
1184            Self::Parked { replaced, .. } => replaced.as_ref(),
1185        }
1186    }
1187
1188    /// Returns the discarded transactions if there were any
1189    pub(crate) fn discarded_transactions(&self) -> Option<&[Arc<ValidPoolTransaction<T>>]> {
1190        match self {
1191            Self::Pending(tx) => Some(&tx.discarded),
1192            Self::Parked { .. } => None,
1193        }
1194    }
1195
1196    /// Returns the hash of the replaced transaction if it is a blob transaction.
1197    pub(crate) fn replaced_blob_transaction(&self) -> Option<B256> {
1198        self.replaced().filter(|tx| tx.transaction.is_eip4844()).map(|tx| *tx.transaction.hash())
1199    }
1200
1201    /// Returns the hash of the transaction
1202    pub(crate) fn hash(&self) -> &TxHash {
1203        match self {
1204            Self::Pending(tx) => tx.transaction.hash(),
1205            Self::Parked { transaction, .. } => transaction.hash(),
1206        }
1207    }
1208
1209    /// Converts this type into the event type for listeners
1210    pub(crate) fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
1211        match self {
1212            Self::Pending(tx) => {
1213                NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
1214            }
1215            Self::Parked { transaction, subpool, .. } => {
1216                NewTransactionEvent { transaction, subpool }
1217            }
1218        }
1219    }
1220
1221    /// Returns the subpool this transaction was added to
1222    pub(crate) const fn subpool(&self) -> SubPool {
1223        match self {
1224            Self::Pending(_) => SubPool::Pending,
1225            Self::Parked { subpool, .. } => *subpool,
1226        }
1227    }
1228
1229    /// Returns the [`TransactionId`] of the added transaction
1230    #[cfg(test)]
1231    pub(crate) fn id(&self) -> &TransactionId {
1232        match self {
1233            Self::Pending(added) => added.transaction.id(),
1234            Self::Parked { transaction, .. } => transaction.id(),
1235        }
1236    }
1237
1238    /// Returns the queued reason if the transaction is parked with a queued reason.
1239    pub(crate) const fn queued_reason(&self) -> Option<&QueuedReason> {
1240        match self {
1241            Self::Pending(_) => None,
1242            Self::Parked { queued_reason, .. } => queued_reason.as_ref(),
1243        }
1244    }
1245
1246    /// Returns the transaction state based on the subpool and queued reason.
1247    pub(crate) fn transaction_state(&self) -> AddedTransactionState {
1248        match self.subpool() {
1249            SubPool::Pending => AddedTransactionState::Pending,
1250            _ => {
1251                // For non-pending transactions, use the queued reason directly from the
1252                // AddedTransaction
1253                if let Some(reason) = self.queued_reason() {
1254                    AddedTransactionState::Queued(reason.clone())
1255                } else {
1256                    // Fallback - this shouldn't happen with the new implementation
1257                    AddedTransactionState::Queued(QueuedReason::NonceGap)
1258                }
1259            }
1260        }
1261    }
1262}
1263
1264/// The specific reason why a transaction is queued (not ready for execution)
1265#[derive(Debug, Clone, PartialEq, Eq)]
1266pub enum QueuedReason {
1267    /// Transaction has a nonce gap - missing prior transactions
1268    NonceGap,
1269    /// Transaction has parked ancestors - waiting for other transactions to be mined
1270    ParkedAncestors,
1271    /// Sender has insufficient balance to cover the transaction cost
1272    InsufficientBalance,
1273    /// Transaction exceeds the block gas limit
1274    TooMuchGas,
1275    /// Transaction doesn't meet the base fee requirement
1276    InsufficientBaseFee,
1277    /// Transaction doesn't meet the blob fee requirement (EIP-4844)
1278    InsufficientBlobFee,
1279}
1280
1281/// The state of a transaction when is was added to the pool
1282#[derive(Debug, Clone, PartialEq, Eq)]
1283pub enum AddedTransactionState {
1284    /// Ready for execution
1285    Pending,
1286    /// Not ready for execution due to a specific condition
1287    Queued(QueuedReason),
1288}
1289
1290impl AddedTransactionState {
1291    /// Returns whether the transaction was submitted as queued.
1292    pub const fn is_queued(&self) -> bool {
1293        matches!(self, Self::Queued(_))
1294    }
1295
1296    /// Returns whether the transaction was submitted as pending.
1297    pub const fn is_pending(&self) -> bool {
1298        matches!(self, Self::Pending)
1299    }
1300
1301    /// Returns the specific queued reason if the transaction is queued.
1302    pub const fn queued_reason(&self) -> Option<&QueuedReason> {
1303        match self {
1304            Self::Queued(reason) => Some(reason),
1305            Self::Pending => None,
1306        }
1307    }
1308}
1309
1310/// The outcome of a successful transaction addition
1311#[derive(Debug, Clone, PartialEq, Eq)]
1312pub struct AddedTransactionOutcome {
1313    /// The hash of the transaction
1314    pub hash: TxHash,
1315    /// The state of the transaction
1316    pub state: AddedTransactionState,
1317}
1318
1319impl AddedTransactionOutcome {
1320    /// Returns whether the transaction was submitted as queued.
1321    pub const fn is_queued(&self) -> bool {
1322        self.state.is_queued()
1323    }
1324
1325    /// Returns whether the transaction was submitted as pending.
1326    pub const fn is_pending(&self) -> bool {
1327        self.state.is_pending()
1328    }
1329}
1330
1331/// Contains all state changes after a [`CanonicalStateUpdate`] was processed
1332#[derive(Debug)]
1333pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
1334    /// Hash of the block.
1335    pub(crate) block_hash: B256,
1336    /// All mined transactions.
1337    pub(crate) mined: Vec<TxHash>,
1338    /// Transactions promoted to the pending pool.
1339    pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
1340    /// transaction that were discarded during the update
1341    pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
1342}
1343
1344impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
1345    /// Returns all transactions that were promoted to the pending pool and adhere to the given
1346    /// [`TransactionListenerKind`].
1347    ///
1348    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1349    /// are allowed to be propagated are returned.
1350    pub(crate) fn pending_transactions(
1351        &self,
1352        kind: TransactionListenerKind,
1353    ) -> impl Iterator<Item = B256> + '_ {
1354        let iter = self.promoted.iter();
1355        PendingTransactionIter { kind, iter }
1356    }
1357
1358    /// Returns all FULL transactions that were promoted to the pending pool and adhere to the given
1359    /// [`TransactionListenerKind`].
1360    ///
1361    /// If the kind is [`TransactionListenerKind::PropagateOnly`], then only transactions that
1362    /// are allowed to be propagated are returned.
1363    pub(crate) fn full_pending_transactions(
1364        &self,
1365        kind: TransactionListenerKind,
1366    ) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
1367        let iter = self.promoted.iter();
1368        FullPendingTransactionIter { kind, iter }
1369    }
1370}
1371
1372#[cfg(test)]
1373mod tests {
1374    use crate::{
1375        blobstore::{BlobStore, InMemoryBlobStore},
1376        identifier::SenderId,
1377        test_utils::{MockTransaction, TestPoolBuilder},
1378        validate::ValidTransaction,
1379        BlockInfo, PoolConfig, SubPoolLimit, TransactionOrigin, TransactionValidationOutcome, U256,
1380    };
1381    use alloy_eips::{eip4844::BlobTransactionSidecar, eip7594::BlobTransactionSidecarVariant};
1382    use alloy_primitives::Address;
1383    use std::{fs, path::PathBuf};
1384
1385    #[test]
1386    fn test_discard_blobs_on_blob_tx_eviction() {
1387        let blobs = {
1388            // Read the contents of the JSON file into a string.
1389            let json_content = fs::read_to_string(
1390                PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test_data/blob1.json"),
1391            )
1392            .expect("Failed to read the blob data file");
1393
1394            // Parse the JSON contents into a serde_json::Value.
1395            let json_value: serde_json::Value =
1396                serde_json::from_str(&json_content).expect("Failed to deserialize JSON");
1397
1398            // Extract blob data from JSON and convert it to Blob.
1399            vec![
1400                // Extract the "data" field from the JSON and parse it as a string.
1401                json_value
1402                    .get("data")
1403                    .unwrap()
1404                    .as_str()
1405                    .expect("Data is not a valid string")
1406                    .to_string(),
1407            ]
1408        };
1409
1410        // Generate a BlobTransactionSidecar from the blobs.
1411        let sidecar = BlobTransactionSidecarVariant::Eip4844(
1412            BlobTransactionSidecar::try_from_blobs_hex(blobs).unwrap(),
1413        );
1414
1415        // Define the maximum limit for blobs in the sub-pool.
1416        let blob_limit = SubPoolLimit::new(1000, usize::MAX);
1417
1418        // Create a test pool with default configuration and the specified blob limit.
1419        let test_pool = &TestPoolBuilder::default()
1420            .with_config(PoolConfig { blob_limit, ..Default::default() })
1421            .pool;
1422
1423        // Set the block info for the pool, including a pending blob fee.
1424        test_pool
1425            .set_block_info(BlockInfo { pending_blob_fee: Some(10_000_000), ..Default::default() });
1426
1427        // Create an in-memory blob store.
1428        let blob_store = InMemoryBlobStore::default();
1429
1430        // Loop to add transactions to the pool and test blob eviction.
1431        for n in 0..blob_limit.max_txs + 10 {
1432            // Create a mock transaction with the generated blob sidecar.
1433            let mut tx = MockTransaction::eip4844_with_sidecar(sidecar.clone());
1434
1435            // Set non zero size
1436            tx.set_size(1844674407370951);
1437
1438            // Insert the sidecar into the blob store if the current index is within the blob limit.
1439            if n < blob_limit.max_txs {
1440                blob_store.insert(*tx.get_hash(), sidecar.clone()).unwrap();
1441            }
1442
1443            // Add the transaction to the pool with external origin and valid outcome.
1444            test_pool.add_transactions(
1445                TransactionOrigin::External,
1446                [TransactionValidationOutcome::Valid {
1447                    balance: U256::from(1_000),
1448                    state_nonce: 0,
1449                    bytecode_hash: None,
1450                    transaction: ValidTransaction::ValidWithSidecar {
1451                        transaction: tx,
1452                        sidecar: sidecar.clone(),
1453                    },
1454                    propagate: true,
1455                    authorities: None,
1456                }],
1457            );
1458        }
1459
1460        // Assert that the size of the pool's blob component is equal to the maximum blob limit.
1461        assert_eq!(test_pool.size().blob, blob_limit.max_txs);
1462
1463        // Assert that the size of the pool's blob_size component matches the expected value.
1464        assert_eq!(test_pool.size().blob_size, 1844674407370951000);
1465
1466        // Assert that the pool's blob store matches the expected blob store.
1467        assert_eq!(*test_pool.blob_store(), blob_store);
1468    }
1469
1470    #[test]
1471    fn test_auths_stored_in_identifiers() {
1472        // Create a test pool with default configuration.
1473        let test_pool = &TestPoolBuilder::default().with_config(Default::default()).pool;
1474
1475        let auth = Address::new([1; 20]);
1476        let tx = MockTransaction::eip7702();
1477
1478        test_pool.add_transactions(
1479            TransactionOrigin::Local,
1480            [TransactionValidationOutcome::Valid {
1481                balance: U256::from(1_000),
1482                state_nonce: 0,
1483                bytecode_hash: None,
1484                transaction: ValidTransaction::Valid(tx),
1485                propagate: true,
1486                authorities: Some(vec![auth]),
1487            }],
1488        );
1489
1490        let identifiers = test_pool.identifiers.read();
1491        assert_eq!(identifiers.sender_id(&auth), Some(SenderId::from(1)));
1492    }
1493}